1 // -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "include/utime.h"
7 #include "objclass/objclass.h"
9 #include "cls_user_ops.h"
14 static int write_entry(cls_method_context_t hctx, const string& key, const cls_user_bucket_entry& entry)
19 int ret = cls_cxx_map_set_val(hctx, key, &bl);
26 static int remove_entry(cls_method_context_t hctx, const string& key)
28 int ret = cls_cxx_map_remove_key(hctx, key);
35 static void get_key_by_bucket_name(const string& bucket_name, string *key)
40 static int get_existing_bucket_entry(cls_method_context_t hctx, const string& bucket_name,
41 cls_user_bucket_entry& entry)
43 if (bucket_name.empty()) {
48 get_key_by_bucket_name(bucket_name, &key);
51 int rc = cls_cxx_map_get_val(hctx, key, &bl);
53 CLS_LOG(10, "could not read entry %s", key.c_str());
57 bufferlist::iterator iter = bl.begin();
58 ::decode(entry, iter);
59 } catch (buffer::error& err) {
60 CLS_LOG(0, "ERROR: failed to decode entry %s", key.c_str());
67 static int read_header(cls_method_context_t hctx, cls_user_header *header)
71 int ret = cls_cxx_map_read_header(hctx, &bl);
75 if (bl.length() == 0) {
76 *header = cls_user_header();
81 ::decode(*header, bl);
82 } catch (buffer::error& err) {
83 CLS_LOG(0, "ERROR: failed to decode user header");
90 static void add_header_stats(cls_user_stats *stats, cls_user_bucket_entry& entry)
92 stats->total_entries += entry.count;
93 stats->total_bytes += entry.size;
94 stats->total_bytes_rounded += entry.size_rounded;
97 static void dec_header_stats(cls_user_stats *stats, cls_user_bucket_entry& entry)
99 stats->total_bytes -= entry.size;
100 stats->total_bytes_rounded -= entry.size_rounded;
101 stats->total_entries -= entry.count;
104 static void apply_entry_stats(const cls_user_bucket_entry& src_entry, cls_user_bucket_entry *target_entry)
106 target_entry->size = src_entry.size;
107 target_entry->size_rounded = src_entry.size_rounded;
108 target_entry->count = src_entry.count;
111 static int cls_user_set_buckets_info(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
113 bufferlist::iterator in_iter = in->begin();
115 cls_user_set_buckets_op op;
117 ::decode(op, in_iter);
118 } catch (buffer::error& err) {
119 CLS_LOG(1, "ERROR: cls_user_add_op(): failed to decode op");
123 cls_user_header header;
124 int ret = read_header(hctx, &header);
126 CLS_LOG(0, "ERROR: failed to read user info header ret=%d", ret);
130 for (list<cls_user_bucket_entry>::iterator iter = op.entries.begin();
131 iter != op.entries.end(); ++iter) {
132 cls_user_bucket_entry& update_entry = *iter;
136 get_key_by_bucket_name(update_entry.bucket.name, &key);
138 cls_user_bucket_entry entry;
139 ret = get_existing_bucket_entry(hctx, key, entry);
141 if (ret == -ENOENT) {
143 continue; /* racing bucket removal */
145 entry = update_entry;
151 CLS_LOG(0, "ERROR: get_existing_bucket_entry() key=%s returned %d", key.c_str(), ret);
153 } else if (ret >= 0 && entry.user_stats_sync) {
154 dec_header_stats(&header.stats, entry);
157 CLS_LOG(20, "storing entry for key=%s size=%lld count=%lld",
158 key.c_str(), (long long)update_entry.size, (long long)update_entry.count);
160 // sync entry stats when not an op.add, as when the case is op.add if its a
161 // new entry we already have copied update_entry earlier, OTOH, for an existing entry
162 // we end up clobbering the existing stats for the bucket
164 apply_entry_stats(update_entry, &entry);
166 entry.user_stats_sync = true;
168 ret = write_entry(hctx, key, entry);
172 add_header_stats(&header.stats, entry);
177 CLS_LOG(20, "header: total bytes=%lld entries=%lld", (long long)header.stats.total_bytes, (long long)header.stats.total_entries);
179 if (header.last_stats_update < op.time)
180 header.last_stats_update = op.time;
182 ::encode(header, bl);
184 ret = cls_cxx_map_write_header(hctx, &bl);
191 static int cls_user_complete_stats_sync(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
193 bufferlist::iterator in_iter = in->begin();
195 cls_user_complete_stats_sync_op op;
197 ::decode(op, in_iter);
198 } catch (buffer::error& err) {
199 CLS_LOG(1, "ERROR: cls_user_add_op(): failed to decode op");
203 cls_user_header header;
204 int ret = read_header(hctx, &header);
206 CLS_LOG(0, "ERROR: failed to read user info header ret=%d", ret);
210 if (header.last_stats_sync < op.time)
211 header.last_stats_sync = op.time;
215 ::encode(header, bl);
217 ret = cls_cxx_map_write_header(hctx, &bl);
224 static int cls_user_remove_bucket(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
226 bufferlist::iterator in_iter = in->begin();
228 cls_user_remove_bucket_op op;
230 ::decode(op, in_iter);
231 } catch (buffer::error& err) {
232 CLS_LOG(1, "ERROR: cls_user_add_op(): failed to decode op");
236 cls_user_header header;
237 int ret = read_header(hctx, &header);
239 CLS_LOG(0, "ERROR: failed to read user info header ret=%d", ret);
245 get_key_by_bucket_name(op.bucket.name, &key);
247 cls_user_bucket_entry entry;
248 ret = get_existing_bucket_entry(hctx, key, entry);
249 if (ret == -ENOENT) {
250 return 0; /* idempotent removal */
253 CLS_LOG(0, "ERROR: get existing bucket entry, key=%s ret=%d", key.c_str(), ret);
257 if (entry.user_stats_sync) {
258 dec_header_stats(&header.stats, entry);
261 CLS_LOG(20, "removing entry at %s", key.c_str());
263 ret = remove_entry(hctx, key);
270 static int cls_user_list_buckets(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
272 bufferlist::iterator in_iter = in->begin();
274 cls_user_list_buckets_op op;
276 ::decode(op, in_iter);
277 } catch (buffer::error& err) {
278 CLS_LOG(1, "ERROR: cls_user_list_op(): failed to decode op");
282 map<string, bufferlist> keys;
284 const string& from_index = op.marker;
285 const string& to_index = op.end_marker;
286 const bool to_index_valid = !to_index.empty();
288 #define MAX_ENTRIES 1000
289 size_t max_entries = op.max_entries;
290 if (max_entries > MAX_ENTRIES)
291 max_entries = MAX_ENTRIES;
294 cls_user_list_buckets_ret ret;
296 int rc = cls_cxx_map_get_vals(hctx, from_index, match_prefix, max_entries, &keys, &ret.truncated);
300 CLS_LOG(20, "from_index=%s to_index=%s match_prefix=%s",
303 match_prefix.c_str());
305 list<cls_user_bucket_entry>& entries = ret.entries;
306 map<string, bufferlist>::iterator iter = keys.begin();
310 for (; iter != keys.end(); ++iter) {
311 const string& index = iter->first;
314 if (to_index_valid && to_index.compare(index) <= 0) {
315 ret.truncated = false;
319 bufferlist& bl = iter->second;
320 bufferlist::iterator biter = bl.begin();
322 cls_user_bucket_entry e;
324 entries.push_back(e);
325 } catch (buffer::error& err) {
326 CLS_LOG(0, "ERROR: cls_user_list: could not decode entry, index=%s", index.c_str());
339 static int cls_user_get_header(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
341 bufferlist::iterator in_iter = in->begin();
343 cls_user_get_header_op op;
345 ::decode(op, in_iter);
346 } catch (buffer::error& err) {
347 CLS_LOG(1, "ERROR: cls_user_get_header_op(): failed to decode op");
351 cls_user_get_header_ret op_ret;
353 int ret = read_header(hctx, &op_ret.header);
357 ::encode(op_ret, *out);
364 CLS_LOG(1, "Loaded user class!");
366 cls_handle_t h_class;
367 cls_method_handle_t h_user_set_buckets_info;
368 cls_method_handle_t h_user_complete_stats_sync;
369 cls_method_handle_t h_user_remove_bucket;
370 cls_method_handle_t h_user_list_buckets;
371 cls_method_handle_t h_user_get_header;
373 cls_register("user", &h_class);
376 cls_register_cxx_method(h_class, "set_buckets_info", CLS_METHOD_RD | CLS_METHOD_WR,
377 cls_user_set_buckets_info, &h_user_set_buckets_info);
378 cls_register_cxx_method(h_class, "complete_stats_sync", CLS_METHOD_RD | CLS_METHOD_WR,
379 cls_user_complete_stats_sync, &h_user_complete_stats_sync);
380 cls_register_cxx_method(h_class, "remove_bucket", CLS_METHOD_RD | CLS_METHOD_WR, cls_user_remove_bucket, &h_user_remove_bucket);
381 cls_register_cxx_method(h_class, "list_buckets", CLS_METHOD_RD, cls_user_list_buckets, &h_user_list_buckets);
382 cls_register_cxx_method(h_class, "get_header", CLS_METHOD_RD, cls_user_get_header, &h_user_get_header);