Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / key_value_store / cls_kvs.cc
1 /*
2  * OSD classes for the key value store
3  *
4  *  Created on: Aug 10, 2012
5  *      Author: Eleanor Cawthon
6  */
7
8 #include "objclass/objclass.h"
9 #include <errno.h>
10 #include "key_value_store/kvs_arg_types.h"
11 #include "include/types.h"
12 #include <iostream>
13 #include <climits>
14
15
16 /**
17  * finds the index_data where a key belongs.
18  *
19  * @param key: the key to search for
20  * @param idata: the index_data for the first index value such that idata.key
21  * is greater than key.
22  * @param next_idata: the index_data for the next index entry after idata
23  * @pre: key is not encoded
24  * @post: idata contains complete information
25  * stored
26  */
27 static int get_idata_from_key(cls_method_context_t hctx, const string &key,
28     index_data &idata, index_data &next_idata) {
29   bufferlist raw_val;
30   int r = 0;
31   std::map<std::string, bufferlist> kvmap;
32
33   bool more;
34
35   r = cls_cxx_map_get_vals(hctx, key_data(key).encoded(), "", 2, &kvmap, &more);
36   if (r < 0) {
37     CLS_LOG(20, "error reading index for range %s: %d", key.c_str(), r);
38     return r;
39   }
40
41   r = cls_cxx_map_get_val(hctx, key_data(key).encoded(), &raw_val);
42   if (r == 0){
43     CLS_LOG(20, "%s is already in the index: %d", key.c_str(), r);
44     bufferlist::iterator b = raw_val.begin();
45     idata.decode(b);
46     if (!kvmap.empty()) {
47       bufferlist::iterator b = kvmap.begin()->second.begin();
48       next_idata.decode(b);
49     }
50     return r;
51   } else if (r == -ENOENT || r == -ENODATA) {
52     bufferlist::iterator b = kvmap.begin()->second.begin();
53     idata.decode(b);
54     if (idata.kdata.prefix != "1") {
55       bufferlist::iterator nb = (++kvmap.begin())->second.begin();
56       next_idata.decode(nb);
57     }
58     r = 0;
59   } else if (r < 0) {
60     CLS_LOG(20, "error reading index for duplicates %s: %d", key.c_str(), r);
61     return r;
62   }
63
64   CLS_LOG(20, "idata is %s", idata.str().c_str());
65   return r;
66 }
67
68
69 static int get_idata_from_key_op(cls_method_context_t hctx,
70                    bufferlist *in, bufferlist *out) {
71   CLS_LOG(20, "get_idata_from_key_op");
72   idata_from_key_args op;
73   bufferlist::iterator it = in->begin();
74   try {
75     ::decode(op, it);
76   } catch (buffer::error& err) {
77     CLS_LOG(20, "error decoding idata_from_key_args.");
78     return -EINVAL;
79   }
80   int r = get_idata_from_key(hctx, op.key, op.idata, op.next_idata);
81   if (r < 0) {
82     return r;
83   } else {
84     ::encode(op, *out);
85     return 0;
86   }
87 }
88
89 /**
90  * finds the object in the index with the lowest key value that is greater
91  * than idata.key. If idata.key is the max key, returns -EOVERFLOW. If
92  * idata has a prefix and has timed out, cleans up.
93  *
94  * @param idata: idata for the object to search for.
95  * @param out_data: the idata for the next object.
96  *
97  * @pre: idata must contain a key.
98  * @post: out_data contains complete information
99  */
100 static int get_next_idata(cls_method_context_t hctx, const index_data &idata,
101     index_data &out_data) {
102   int r = 0;
103   std::map<std::string, bufferlist> kvs;
104   bool more;
105   r = cls_cxx_map_get_vals(hctx, idata.kdata.encoded(), "", 1, &kvs, &more);
106   if (r < 0){
107     CLS_LOG(20, "getting kvs failed with error %d", r);
108     return r;
109   }
110
111   if (!kvs.empty()) {
112     out_data.kdata.parse(kvs.begin()->first);
113     bufferlist::iterator b = kvs.begin()->second.begin();
114     out_data.decode(b);
115   } else {
116     r = -EOVERFLOW;
117   }
118
119   return r;
120 }
121
122 static int get_next_idata_op(cls_method_context_t hctx,
123                    bufferlist *in, bufferlist *out) {
124   CLS_LOG(20, "get_next_idata_op");
125   idata_from_idata_args op;
126   bufferlist::iterator it = in->begin();
127   try {
128     ::decode(op, it);
129   } catch (buffer::error& err) {
130     return -EINVAL;
131   }
132   int r = get_next_idata(hctx, op.idata, op.next_idata);
133   if (r < 0) {
134     return r;
135   } else {
136     op.encode(*out);
137     return 0;
138   }
139 }
140
141 /**
142  * finds the object in the index with the highest key value that is less
143  * than idata.key. If idata.key is the lowest key, returns -ERANGE If
144  * idata has a prefix and has timed out, cleans up.
145  *
146  * @param idata: idata for the object to search for.
147  * @param out_data: the idata for the next object.
148  *
149  * @pre: idata must contain a key.
150  * @ost: out_data contains complete information
151  */
152 static int get_prev_idata(cls_method_context_t hctx, const index_data &idata,
153     index_data &out_data) {
154   int r = 0;
155   std::map<std::string, bufferlist> kvs;
156   bool more;
157   r = cls_cxx_map_get_vals(hctx, "", "", LONG_MAX, &kvs, &more);
158   if (r < 0){
159     CLS_LOG(20, "getting kvs failed with error %d", r);
160     return r;
161   }
162
163   std::map<std::string, bufferlist>::iterator it =
164       kvs.lower_bound(idata.kdata.encoded());
165   if (it->first != idata.kdata.encoded()) {
166     CLS_LOG(20, "object %s not found in the index (expected %s, found %s)",
167         idata.str().c_str(), idata.kdata.encoded().c_str(),
168         it->first.c_str());
169     return -ENODATA;
170   }
171   if (it == kvs.begin()) {
172     //it is the first object, there is no previous.
173     return -ERANGE;
174   } else {
175     --it;
176   }
177   out_data.kdata.parse(it->first);
178   bufferlist::iterator b = it->second.begin();
179   out_data.decode(b);
180
181   return 0;
182 }
183
184 static int get_prev_idata_op(cls_method_context_t hctx,
185                    bufferlist *in, bufferlist *out) {
186   CLS_LOG(20, "get_next_idata_op");
187   idata_from_idata_args op;
188   bufferlist::iterator it = in->begin();
189   try {
190     ::decode(op, it);
191   } catch (buffer::error& err) {
192     return -EINVAL;
193   }
194   int r = get_prev_idata(hctx, op.idata, op.next_idata);
195   if (r < 0) {
196     return r;
197   } else {
198     op.encode(*out);
199     return 0;
200   }
201 }
202
203 /**
204  * Read all of the index entries where any keys in the map go
205  */
206 static int read_many(cls_method_context_t hctx, const set<string> &keys,
207     map<string, bufferlist> * out) {
208   int r = 0;
209   bool more;
210   CLS_ERR("reading from a map of size %d, first key encoded is %s",
211       (int)keys.size(), key_data(*keys.begin()).encoded().c_str());
212   r = cls_cxx_map_get_vals(hctx, key_data(*keys.begin()).encoded().c_str(),
213       "", LONG_MAX, out, &more);
214   if (r < 0) {
215     CLS_ERR("getting omap vals failed with error %d", r);
216   }
217
218   CLS_ERR("got map of size %d ", (int)out->size());
219   if (out->size() > 1) {
220     out->erase(out->upper_bound(key_data(*keys.rbegin()).encoded().c_str()),
221       out->end());
222   }
223   CLS_ERR("returning map of size %d", (int)out->size());
224   return r;
225 }
226
227 static int read_many_op(cls_method_context_t hctx, bufferlist *in,
228     bufferlist *out) {
229   CLS_LOG(20, "read_many_op");
230   set<string> op;
231   map<string, bufferlist> outmap;
232   bufferlist::iterator it = in->begin();
233   try {
234     ::decode(op, it);
235   } catch (buffer::error & err) {
236     return -EINVAL;
237   }
238   int r = read_many(hctx, op, &outmap);
239   if (r < 0) {
240     return r;
241   } else {
242     encode(outmap, *out);
243     return 0;
244   }
245 }
246
247 /**
248  * Checks the unwritable xattr. If it is "1" (i.e., it is unwritable), returns
249  * -EACCES. otherwise, returns 0.
250  */
251 static int check_writable(cls_method_context_t hctx) {
252   bufferlist bl;
253   int r = cls_cxx_getxattr(hctx, "unwritable", &bl);
254   if (r < 0) {
255     CLS_LOG(20, "error reading xattr %s: %d", "unwritable", r);
256     return r;
257   }
258   if (string(bl.c_str(), bl.length()) == "1") {
259     return -EACCES;
260   } else{
261     return 0;
262   }
263 }
264
265 static int check_writable_op(cls_method_context_t hctx,
266                    bufferlist *in, bufferlist *out) {
267   CLS_LOG(20, "check_writable_op");
268   return check_writable(hctx);
269 }
270
271 /**
272  * returns -EKEYREJECTED if size is outside of bound, according to comparator.
273  *
274  * @bound: the limit to test
275  * @comparator: should be CEPH_OSD_CMPXATTR_OP_[EQ|GT|LT]
276  */
277 static int assert_size_in_bound(cls_method_context_t hctx, int bound,
278     int comparator) {
279   //determine size
280   bufferlist size_bl;
281   int r = cls_cxx_getxattr(hctx, "size", &size_bl);
282   if (r < 0) {
283     CLS_LOG(20, "error reading xattr %s: %d", "size", r);
284     return r;
285   }
286
287   int size = atoi(string(size_bl.c_str(), size_bl.length()).c_str());
288   CLS_LOG(20, "size is %d, bound is %d", size, bound);
289
290   //compare size to comparator
291   switch (comparator) {
292   case CEPH_OSD_CMPXATTR_OP_EQ:
293     if (size != bound) {
294       return -EKEYREJECTED;
295     }
296     break;
297   case CEPH_OSD_CMPXATTR_OP_LT:
298     if (size >= bound) {
299       return -EKEYREJECTED;
300     }
301     break;
302   case CEPH_OSD_CMPXATTR_OP_GT:
303     if (size <= bound) {
304       return -EKEYREJECTED;
305     }
306     break;
307   default:
308     CLS_LOG(20, "invalid argument passed to assert_size_in_bound: %d",
309             comparator);
310     return -EINVAL;
311   }
312   return 0;
313 }
314
315 static int assert_size_in_bound_op(cls_method_context_t hctx,
316                    bufferlist *in, bufferlist *out) {
317   CLS_LOG(20, "assert_size_in_bound_op");
318   assert_size_args op;
319   bufferlist::iterator it = in->begin();
320   try {
321     ::decode(op, it);
322   } catch (buffer::error& err) {
323     return -EINVAL;
324   }
325   return assert_size_in_bound(hctx, op.bound, op.comparator);
326 }
327
328 /**
329  * Attempts to insert omap into this object's omap.
330  *
331  * @return:
332  * if unwritable, returns -EACCES.
333  * if size > bound and key doesn't already exist in the omap, returns -EBALANCE.
334  * if exclusive is true, returns -EEXIST if any keys already exist.
335  *
336  * @post: object has omap entries inserted, and size xattr is updated
337  */
338 static int omap_insert(cls_method_context_t hctx,
339     const map<string, bufferlist> &omap, int bound, bool exclusive) {
340
341   uint64_t size;
342   time_t time;
343   int r = cls_cxx_stat(hctx, &size, &time);
344   if (r < 0) {
345     return r;
346   }
347   CLS_LOG(20, "inserting %s", omap.begin()->first.c_str());
348   r = check_writable(hctx);
349   if (r < 0) {
350     CLS_LOG(20, "omap_insert: this object is unwritable: %d", r);
351     return r;
352   }
353
354   int assert_bound = bound;
355
356   //if this is an exclusive insert, make sure the key doesn't already exist.
357   for (map<string, bufferlist>::const_iterator it = omap.begin();
358       it != omap.end(); ++it) {
359     bufferlist bl;
360     r = cls_cxx_map_get_val(hctx, it->first, &bl);
361     if (r == 0 && string(bl.c_str(), bl.length()) != ""){
362       if (exclusive) {
363         CLS_LOG(20, "error: this is an exclusive insert and %s exists.",
364             it->first.c_str());
365         return -EEXIST;
366       }
367       assert_bound++;
368       CLS_LOG(20, "increased assert_bound to %d", assert_bound);
369     } else if (r != -ENODATA && r != -ENOENT) {
370       CLS_LOG(20, "error reading omap val for %s: %d", it->first.c_str(), r);
371       return r;
372     }
373   }
374
375   bufferlist old_size;
376   r = cls_cxx_getxattr(hctx, "size", &old_size);
377   if (r < 0) {
378     CLS_LOG(20, "error reading xattr %s: %d", "size", r);
379     return r;
380   }
381
382   int old_size_int = atoi(string(old_size.c_str(), old_size.length()).c_str());
383
384   CLS_LOG(20, "asserting size is less than %d (bound is %d)", assert_bound, bound);
385   if (old_size_int >= assert_bound) {
386     return -EKEYREJECTED;
387   }
388
389   int new_size_int = old_size_int + omap.size() - (assert_bound - bound);
390   CLS_LOG(20, "old size is %d, new size is %d", old_size_int, new_size_int);
391   bufferlist new_size;
392   stringstream s;
393   s << new_size_int;
394   new_size.append(s.str());
395
396   r = cls_cxx_map_set_vals(hctx, &omap);
397   if (r < 0) {
398     CLS_LOG(20, "error setting omap: %d", r);
399     return r;
400   }
401
402   r = cls_cxx_setxattr(hctx, "size", &new_size);
403   if (r < 0) {
404     CLS_LOG(20, "error setting xattr %s: %d", "size", r);
405     return r;
406   }
407   CLS_LOG(20, "successfully inserted %s", omap.begin()->first.c_str());
408   return 0;
409 }
410
411 static int omap_insert_op(cls_method_context_t hctx,
412                    bufferlist *in, bufferlist *out) {
413   CLS_LOG(20, "omap_insert");
414   omap_set_args op;
415   bufferlist::iterator it = in->begin();
416   try {
417     ::decode(op, it);
418   } catch (buffer::error& err) {
419     return -EINVAL;
420   }
421   return omap_insert(hctx, op.omap, op.bound, op.exclusive);
422 }
423
424 static int create_with_omap(cls_method_context_t hctx,
425     const map<string, bufferlist> &omap) {
426   CLS_LOG(20, "creating with omap: %s", omap.begin()->first.c_str());
427   //first make sure the object is writable
428   int r = cls_cxx_create(hctx, true);
429   if (r < 0) {
430     CLS_LOG(20, "omap create: creating failed: %d", r);
431     return r;
432   }
433
434   int new_size_int = omap.size();
435   CLS_LOG(20, "omap insert: new size is %d", new_size_int);
436   bufferlist new_size;
437   stringstream s;
438   s << new_size_int;
439   new_size.append(s.str());
440
441   r = cls_cxx_map_set_vals(hctx, &omap);
442   if (r < 0) {
443     CLS_LOG(20, "omap create: error setting omap: %d", r);
444     return r;
445   }
446
447   r = cls_cxx_setxattr(hctx, "size", &new_size);
448   if (r < 0) {
449     CLS_LOG(20, "omap create: error setting xattr %s: %d", "size", r);
450     return r;
451   }
452
453   bufferlist u;
454   u.append("0");
455   r = cls_cxx_setxattr(hctx, "unwritable", &u);
456   if (r < 0) {
457     CLS_LOG(20, "omap create: error setting xattr %s: %d", "unwritable", r);
458     return r;
459   }
460
461   CLS_LOG(20, "successfully created %s", omap.begin()->first.c_str());
462   return 0;
463 }
464
465 static int create_with_omap_op(cls_method_context_t hctx,
466                    bufferlist *in, bufferlist *out) {
467   CLS_LOG(20, "omap_insert");
468   map<string, bufferlist> omap;
469   bufferlist::iterator it = in->begin();
470   try {
471     ::decode(omap, it);
472   } catch (buffer::error& err) {
473     return -EINVAL;
474   }
475   return create_with_omap(hctx, omap);
476 }
477
478 /**
479  * Attempts to remove omap from this object's omap.
480  *
481  * @return:
482  * if unwritable, returns -EACCES.
483  * if size < bound and key doesn't already exist in the omap, returns -EBALANCE.
484  * if any of the keys are not in this object, returns -ENODATA.
485  *
486  * @post: object has omap entries removed, and size xattr is updated
487  */
488 static int omap_remove(cls_method_context_t hctx,
489     const std::set<string> &omap, int bound) {
490   int r;
491   uint64_t size;
492   time_t time;
493   r = cls_cxx_stat(hctx, &size, &time);
494   if (r < 0) {
495     return r;
496   }
497
498   //first make sure the object is writable
499   r = check_writable(hctx);
500   if (r < 0) {
501     return r;
502   }
503
504   //check for existance of the key first
505   for (set<string>::const_iterator it = omap.begin();
506       it != omap.end(); ++it) {
507     bufferlist bl;
508     r = cls_cxx_map_get_val(hctx, *it, &bl);
509     if (r == -ENOENT || r == -ENODATA
510         || string(bl.c_str(), bl.length()) == ""){
511       return -ENODATA;
512     } else if (r < 0) {
513       CLS_LOG(20, "error reading omap val for %s: %d", it->c_str(), r);
514       return r;
515     }
516   }
517
518   //fail if removing from an object with only bound entries.
519   bufferlist old_size;
520   r = cls_cxx_getxattr(hctx, "size", &old_size);
521   if (r < 0) {
522     CLS_LOG(20, "error reading xattr %s: %d", "size", r);
523     return r;
524   }
525   int old_size_int = atoi(string(old_size.c_str(), old_size.length()).c_str());
526
527   CLS_LOG(20, "asserting size is greater than %d", bound);
528   if (old_size_int <= bound) {
529     return -EKEYREJECTED;
530   }
531
532   int new_size_int = old_size_int - omap.size();
533   CLS_LOG(20, "old size is %d, new size is %d", old_size_int, new_size_int);
534   bufferlist new_size;
535   stringstream s;
536   s << new_size_int;
537   new_size.append(s.str());
538
539   r = cls_cxx_setxattr(hctx, "size", &new_size);
540   if (r < 0) {
541     CLS_LOG(20, "error setting xattr %s: %d", "unwritable", r);
542     return r;
543   }
544
545   for (std::set<string>::const_iterator it = omap.begin();
546       it != omap.end(); ++it) {
547     r = cls_cxx_map_remove_key(hctx, *it);
548     if (r < 0) {
549       CLS_LOG(20, "error removing omap: %d", r);
550       return r;
551     }
552   }
553   return 0;
554 }
555
556 static int omap_remove_op(cls_method_context_t hctx,
557                    bufferlist *in, bufferlist *out) {
558   CLS_LOG(20, "omap_remove");
559   omap_rm_args op;
560   bufferlist::iterator it = in->begin();
561   try {
562     ::decode(op, it);
563   } catch (buffer::error& err) {
564     return -EINVAL;
565   }
566   return omap_remove(hctx, op.omap, op.bound);
567 }
568
569 /**
570  * checks to see if this object needs to be split or rebalanced. if so, reads
571  * information about it.
572  *
573  * @post: if assert_size_in_bound(hctx, bound, comparator) succeeds,
574  * odata contains the size, omap, and unwritable attributes for this object.
575  * Otherwise, odata contains the size and unwritable attribute.
576  */
577 static int maybe_read_for_balance(cls_method_context_t hctx,
578     object_data &odata, int bound, int comparator) {
579   CLS_LOG(20, "rebalance reading");
580   //if unwritable, return
581   int r = check_writable(hctx);
582   if (r < 0) {
583     odata.unwritable = true;
584     CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "unwritable", r);
585     return r;
586   } else {
587     odata.unwritable = false;
588   }
589
590   //get the size attribute
591   bufferlist size;
592   r = cls_cxx_getxattr(hctx, "size", &size);
593   if (r < 0) {
594     CLS_LOG(20, "rebalance read: error getting xattr %s: %d", "size", r);
595     return r;
596   }
597   odata.size = atoi(string(size.c_str(), size.length()).c_str());
598
599   //check if it needs to be balanced
600   r = assert_size_in_bound(hctx, bound, comparator);
601   if (r < 0) {
602     CLS_LOG(20, "rebalance read: error on asserting size: %d", r);
603     return -EBALANCE;
604   }
605
606   //if the assert succeeded, it needs to be balanced
607   bool more;
608   r = cls_cxx_map_get_vals(hctx, "", "", LONG_MAX, &odata.omap, &more);
609   if (r < 0){
610     CLS_LOG(20, "rebalance read: getting kvs failed with error %d", r);
611     return r;
612   }
613
614   CLS_LOG(20, "rebalance read: size xattr is %llu, omap size is %llu",
615           (unsigned long long)odata.size,
616           (unsigned long long)odata.omap.size());
617   return 0;
618 }
619
620 static int maybe_read_for_balance_op(cls_method_context_t hctx,
621                    bufferlist *in, bufferlist *out) {
622   CLS_LOG(20, "maybe_read_for_balance");
623   rebalance_args op;
624   bufferlist::iterator it = in->begin();
625   try {
626     ::decode(op, it);
627   } catch (buffer::error& err) {
628     return -EINVAL;
629   }
630   int r = maybe_read_for_balance(hctx, op.odata, op.bound, op.comparator);
631   if (r < 0) {
632     return r;
633   } else {
634     op.encode(*out);
635     return 0;
636   }
637 }
638
639
640 CLS_INIT(kvs)
641 {
642   CLS_LOG(20, "Loaded assert condition class!");
643
644   cls_handle_t h_class;
645   cls_method_handle_t h_get_idata_from_key;
646   cls_method_handle_t h_get_next_idata;
647   cls_method_handle_t h_get_prev_idata;
648   cls_method_handle_t h_read_many;
649   cls_method_handle_t h_check_writable;
650   cls_method_handle_t h_assert_size_in_bound;
651   cls_method_handle_t h_omap_insert;
652   cls_method_handle_t h_create_with_omap;
653   cls_method_handle_t h_omap_remove;
654   cls_method_handle_t h_maybe_read_for_balance;
655
656   cls_register("kvs", &h_class);
657   cls_register_cxx_method(h_class, "get_idata_from_key",
658                           CLS_METHOD_RD,
659                           get_idata_from_key_op, &h_get_idata_from_key);
660   cls_register_cxx_method(h_class, "get_next_idata",
661                           CLS_METHOD_RD,
662                           get_next_idata_op, &h_get_next_idata);
663   cls_register_cxx_method(h_class, "get_prev_idata",
664                           CLS_METHOD_RD,
665                           get_prev_idata_op, &h_get_prev_idata);
666   cls_register_cxx_method(h_class, "read_many",
667                           CLS_METHOD_RD,
668                           read_many_op, &h_read_many);
669   cls_register_cxx_method(h_class, "check_writable",
670                           CLS_METHOD_RD | CLS_METHOD_WR,
671                           check_writable_op, &h_check_writable);
672   cls_register_cxx_method(h_class, "assert_size_in_bound",
673                           CLS_METHOD_WR,
674                           assert_size_in_bound_op, &h_assert_size_in_bound);
675   cls_register_cxx_method(h_class, "omap_insert",
676                           CLS_METHOD_WR,
677                           omap_insert_op, &h_omap_insert);
678   cls_register_cxx_method(h_class, "create_with_omap",
679                           CLS_METHOD_WR,
680                           create_with_omap_op, &h_create_with_omap);
681   cls_register_cxx_method(h_class, "omap_remove",
682                           CLS_METHOD_WR,
683                           omap_remove_op, &h_omap_remove);
684   cls_register_cxx_method(h_class, "maybe_read_for_balance",
685                           CLS_METHOD_RD,
686                           maybe_read_for_balance_op, &h_maybe_read_for_balance);
687
688   return;
689 }