X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fcls%2Fuser%2Fcls_user.cc;fp=src%2Fceph%2Fsrc%2Fcls%2Fuser%2Fcls_user.cc;h=840470e9f4d9b665a6b0d647046759a39f2099bb;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/cls/user/cls_user.cc b/src/ceph/src/cls/user/cls_user.cc new file mode 100644 index 0000000..840470e --- /dev/null +++ b/src/ceph/src/cls/user/cls_user.cc @@ -0,0 +1,386 @@ +// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include "include/utime.h" +#include "objclass/objclass.h" + +#include "cls_user_ops.h" + +CLS_VER(1,0) +CLS_NAME(user) + +static int write_entry(cls_method_context_t hctx, const string& key, const cls_user_bucket_entry& entry) +{ + bufferlist bl; + ::encode(entry, bl); + + int ret = cls_cxx_map_set_val(hctx, key, &bl); + if (ret < 0) + return ret; + + return 0; +} + +static int remove_entry(cls_method_context_t hctx, const string& key) +{ + int ret = cls_cxx_map_remove_key(hctx, key); + if (ret < 0) + return ret; + + return 0; +} + +static void get_key_by_bucket_name(const string& bucket_name, string *key) +{ + *key = bucket_name; +} + +static int get_existing_bucket_entry(cls_method_context_t hctx, const string& bucket_name, + cls_user_bucket_entry& entry) +{ + if (bucket_name.empty()) { + return -EINVAL; + } + + string key; + get_key_by_bucket_name(bucket_name, &key); + + bufferlist bl; + int rc = cls_cxx_map_get_val(hctx, key, &bl); + if (rc < 0) { + CLS_LOG(10, "could not read entry %s", key.c_str()); + return rc; + } + try { + bufferlist::iterator iter = bl.begin(); + ::decode(entry, iter); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: failed to decode entry %s", key.c_str()); + return -EIO; + } + + return 0; +} + +static int read_header(cls_method_context_t hctx, cls_user_header *header) +{ + bufferlist bl; + + int ret = cls_cxx_map_read_header(hctx, &bl); + if (ret < 0) + return ret; + + if (bl.length() == 0) { + *header = cls_user_header(); + return 0; + } + + try { + ::decode(*header, bl); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: failed to decode user header"); + return -EIO; + } + + return 0; +} + +static void add_header_stats(cls_user_stats *stats, cls_user_bucket_entry& entry) +{ + stats->total_entries += entry.count; + stats->total_bytes += entry.size; + stats->total_bytes_rounded += entry.size_rounded; +} + +static void dec_header_stats(cls_user_stats *stats, cls_user_bucket_entry& entry) +{ + stats->total_bytes -= entry.size; + stats->total_bytes_rounded -= entry.size_rounded; + stats->total_entries -= entry.count; +} + +static void apply_entry_stats(const cls_user_bucket_entry& src_entry, cls_user_bucket_entry *target_entry) +{ + target_entry->size = src_entry.size; + target_entry->size_rounded = src_entry.size_rounded; + target_entry->count = src_entry.count; +} + +static int cls_user_set_buckets_info(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_user_set_buckets_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_user_add_op(): failed to decode op"); + return -EINVAL; + } + + cls_user_header header; + int ret = read_header(hctx, &header); + if (ret < 0) { + CLS_LOG(0, "ERROR: failed to read user info header ret=%d", ret); + return ret; + } + + for (list::iterator iter = op.entries.begin(); + iter != op.entries.end(); ++iter) { + cls_user_bucket_entry& update_entry = *iter; + + string key; + + get_key_by_bucket_name(update_entry.bucket.name, &key); + + cls_user_bucket_entry entry; + ret = get_existing_bucket_entry(hctx, key, entry); + + if (ret == -ENOENT) { + if (!op.add) + continue; /* racing bucket removal */ + + entry = update_entry; + + ret = 0; + } + + if (ret < 0) { + CLS_LOG(0, "ERROR: get_existing_bucket_entry() key=%s returned %d", key.c_str(), ret); + return ret; + } else if (ret >= 0 && entry.user_stats_sync) { + dec_header_stats(&header.stats, entry); + } + + CLS_LOG(20, "storing entry for key=%s size=%lld count=%lld", + key.c_str(), (long long)update_entry.size, (long long)update_entry.count); + + // sync entry stats when not an op.add, as when the case is op.add if its a + // new entry we already have copied update_entry earlier, OTOH, for an existing entry + // we end up clobbering the existing stats for the bucket + if (!op.add){ + apply_entry_stats(update_entry, &entry); + } + entry.user_stats_sync = true; + + ret = write_entry(hctx, key, entry); + if (ret < 0) + return ret; + + add_header_stats(&header.stats, entry); + } + + bufferlist bl; + + CLS_LOG(20, "header: total bytes=%lld entries=%lld", (long long)header.stats.total_bytes, (long long)header.stats.total_entries); + + if (header.last_stats_update < op.time) + header.last_stats_update = op.time; + + ::encode(header, bl); + + ret = cls_cxx_map_write_header(hctx, &bl); + if (ret < 0) + return ret; + + return 0; +} + +static int cls_user_complete_stats_sync(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_user_complete_stats_sync_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_user_add_op(): failed to decode op"); + return -EINVAL; + } + + cls_user_header header; + int ret = read_header(hctx, &header); + if (ret < 0) { + CLS_LOG(0, "ERROR: failed to read user info header ret=%d", ret); + return ret; + } + + if (header.last_stats_sync < op.time) + header.last_stats_sync = op.time; + + bufferlist bl; + + ::encode(header, bl); + + ret = cls_cxx_map_write_header(hctx, &bl); + if (ret < 0) + return ret; + + return 0; +} + +static int cls_user_remove_bucket(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_user_remove_bucket_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_user_add_op(): failed to decode op"); + return -EINVAL; + } + + cls_user_header header; + int ret = read_header(hctx, &header); + if (ret < 0) { + CLS_LOG(0, "ERROR: failed to read user info header ret=%d", ret); + return ret; + } + + string key; + + get_key_by_bucket_name(op.bucket.name, &key); + + cls_user_bucket_entry entry; + ret = get_existing_bucket_entry(hctx, key, entry); + if (ret == -ENOENT) { + return 0; /* idempotent removal */ + } + if (ret < 0) { + CLS_LOG(0, "ERROR: get existing bucket entry, key=%s ret=%d", key.c_str(), ret); + return ret; + } + + if (entry.user_stats_sync) { + dec_header_stats(&header.stats, entry); + } + + CLS_LOG(20, "removing entry at %s", key.c_str()); + + ret = remove_entry(hctx, key); + if (ret < 0) + return ret; + + return 0; +} + +static int cls_user_list_buckets(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_user_list_buckets_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_user_list_op(): failed to decode op"); + return -EINVAL; + } + + map keys; + + const string& from_index = op.marker; + const string& to_index = op.end_marker; + const bool to_index_valid = !to_index.empty(); + +#define MAX_ENTRIES 1000 + size_t max_entries = op.max_entries; + if (max_entries > MAX_ENTRIES) + max_entries = MAX_ENTRIES; + + string match_prefix; + cls_user_list_buckets_ret ret; + + int rc = cls_cxx_map_get_vals(hctx, from_index, match_prefix, max_entries, &keys, &ret.truncated); + if (rc < 0) + return rc; + + CLS_LOG(20, "from_index=%s to_index=%s match_prefix=%s", + from_index.c_str(), + to_index.c_str(), + match_prefix.c_str()); + + list& entries = ret.entries; + map::iterator iter = keys.begin(); + + string marker; + + for (; iter != keys.end(); ++iter) { + const string& index = iter->first; + marker = index; + + if (to_index_valid && to_index.compare(index) <= 0) { + ret.truncated = false; + break; + } + + bufferlist& bl = iter->second; + bufferlist::iterator biter = bl.begin(); + try { + cls_user_bucket_entry e; + ::decode(e, biter); + entries.push_back(e); + } catch (buffer::error& err) { + CLS_LOG(0, "ERROR: cls_user_list: could not decode entry, index=%s", index.c_str()); + } + } + + if (ret.truncated) { + ret.marker = marker; + } + + ::encode(ret, *out); + + return 0; +} + +static int cls_user_get_header(cls_method_context_t hctx, bufferlist *in, bufferlist *out) +{ + bufferlist::iterator in_iter = in->begin(); + + cls_user_get_header_op op; + try { + ::decode(op, in_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: cls_user_get_header_op(): failed to decode op"); + return -EINVAL; + } + + cls_user_get_header_ret op_ret; + + int ret = read_header(hctx, &op_ret.header); + if (ret < 0) + return ret; + + ::encode(op_ret, *out); + + return 0; +} + +CLS_INIT(user) +{ + CLS_LOG(1, "Loaded user class!"); + + cls_handle_t h_class; + cls_method_handle_t h_user_set_buckets_info; + cls_method_handle_t h_user_complete_stats_sync; + cls_method_handle_t h_user_remove_bucket; + cls_method_handle_t h_user_list_buckets; + cls_method_handle_t h_user_get_header; + + cls_register("user", &h_class); + + /* log */ + cls_register_cxx_method(h_class, "set_buckets_info", CLS_METHOD_RD | CLS_METHOD_WR, + cls_user_set_buckets_info, &h_user_set_buckets_info); + cls_register_cxx_method(h_class, "complete_stats_sync", CLS_METHOD_RD | CLS_METHOD_WR, + cls_user_complete_stats_sync, &h_user_complete_stats_sync); + cls_register_cxx_method(h_class, "remove_bucket", CLS_METHOD_RD | CLS_METHOD_WR, cls_user_remove_bucket, &h_user_remove_bucket); + cls_register_cxx_method(h_class, "list_buckets", CLS_METHOD_RD, cls_user_list_buckets, &h_user_list_buckets); + cls_register_cxx_method(h_class, "get_header", CLS_METHOD_RD, cls_user_get_header, &h_user_get_header); + + return; +} +