Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / cls / rbd / cls_rbd.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /** \file
5  *
6  * This is an OSD class that implements methods for
7  * use with rbd.
8  *
9  * Most of these deal with the rbd header object. Methods prefixed
10  * with old_ deal with the original rbd design, in which clients read
11  * and interpreted the header object directly.
12  *
13  * The new format is meant to be opaque to clients - all their
14  * interactions with non-data objects should go through this
15  * class. The OSD class interface leaves the class to implement its
16  * own argument and payload serialization/deserialization, so for ease
17  * of implementation we use the existing ceph encoding/decoding
18  * methods. Something like json might be preferable, but the rbd
19  * kernel module has to be able to understand format as well. The
20  * datatypes exposed to the clients are strings, unsigned integers,
21  * and vectors of those types. The on-wire format can be found in
22  * src/include/encoding.h.
23  *
24  * The methods for interacting with the new format document their
25  * parameters as the client sees them - it would be silly to mention
26  * in each one that they take an input and an output bufferlist.
27  */
28 #include "include/types.h"
29
30 #include <algorithm>
31 #include <errno.h>
32 #include <sstream>
33
34 #include "common/bit_vector.hpp"
35 #include "common/errno.h"
36 #include "objclass/objclass.h"
37 #include "osd/osd_types.h"
38 #include "include/rbd_types.h"
39 #include "include/rbd/object_map_types.h"
40
41 #include "cls/rbd/cls_rbd.h"
42 #include "cls/rbd/cls_rbd_types.h"
43
44
45 /*
46  * Object keys:
47  *
48  * <partial list>
49  *
50  * stripe_unit: size in bytes of the stripe unit.  if not present,
51  *   the stripe unit is assumed to match the object size (1 << order).
52  *
53  * stripe_count: number of objects to stripe over before looping back.
54  *   if not present or 1, striping is disabled.  this is the default.
55  *
56  */
57
58 CLS_VER(2,0)
59 CLS_NAME(rbd)
60
61 #define RBD_MAX_KEYS_READ 64
62 #define RBD_SNAP_KEY_PREFIX "snapshot_"
63 #define RBD_DIR_ID_KEY_PREFIX "id_"
64 #define RBD_DIR_NAME_KEY_PREFIX "name_"
65 #define RBD_METADATA_KEY_PREFIX "metadata_"
66
67 #define GROUP_SNAP_SEQ "snap_seq"
68
69 static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
70 {
71   unsigned snap_count = 0;
72   uint64_t snap_names_len = 0;
73   struct rbd_obj_header_ondisk *header;
74
75   CLS_LOG(20, "snapshots_list");
76
77   while (1) {
78     int len = sizeof(*header) +
79       snap_count * sizeof(struct rbd_obj_snap_ondisk) +
80       snap_names_len;
81
82     int rc = cls_cxx_read(hctx, 0, len, &bl);
83     if (rc < 0)
84       return rc;
85
86     if (bl.length() < sizeof(*header))
87       return -EINVAL;
88
89     header = (struct rbd_obj_header_ondisk *)bl.c_str();
90     assert(header);
91
92     if ((snap_count != header->snap_count) ||
93         (snap_names_len != header->snap_names_len)) {
94       snap_count = header->snap_count;
95       snap_names_len = header->snap_names_len;
96       bl.clear();
97       continue;
98     }
99     break;
100   }
101
102   return 0;
103 }
104
105 static void key_from_snap_id(snapid_t snap_id, string *out)
106 {
107   ostringstream oss;
108   oss << RBD_SNAP_KEY_PREFIX
109       << std::setw(16) << std::setfill('0') << std::hex << snap_id;
110   *out = oss.str();
111 }
112
113 static snapid_t snap_id_from_key(const string &key)
114 {
115   istringstream iss(key);
116   uint64_t id;
117   iss.ignore(strlen(RBD_SNAP_KEY_PREFIX)) >> std::hex >> id;
118   return id;
119 }
120
121 template<typename T>
122 static int read_key(cls_method_context_t hctx, const string &key, T *out)
123 {
124   bufferlist bl;
125   int r = cls_cxx_map_get_val(hctx, key, &bl);
126   if (r < 0) {
127     if (r != -ENOENT) {
128       CLS_ERR("error reading omap key %s: %s", key.c_str(), cpp_strerror(r).c_str());
129     }
130     return r;
131   }
132
133   try {
134     bufferlist::iterator it = bl.begin();
135     ::decode(*out, it);
136   } catch (const buffer::error &err) {
137     CLS_ERR("error decoding %s", key.c_str());
138     return -EIO;
139   }
140
141   return 0;
142 }
143
144 static int remove_key(cls_method_context_t hctx, const string &key) {
145   int r = cls_cxx_map_remove_key(hctx, key);
146   if (r < 0 && r != -ENOENT) {
147       CLS_ERR("failed to remove key: %s", key.c_str());
148       return r;
149   }
150   return 0;
151 }
152
153 static bool is_valid_id(const string &id) {
154   if (!id.size())
155     return false;
156   for (size_t i = 0; i < id.size(); ++i) {
157     if (!isalnum(id[i])) {
158       return false;
159     }
160   }
161   return true;
162 }
163
164 /**
165  * Initialize the header with basic metadata.
166  * Extra features may initialize more fields in the future.
167  * Everything is stored as key/value pairs as omaps in the header object.
168  *
169  * If features the OSD does not understand are requested, -ENOSYS is
170  * returned.
171  *
172  * Input:
173  * @param size number of bytes in the image (uint64_t)
174  * @param order bits to shift to determine the size of data objects (uint8_t)
175  * @param features what optional things this image will use (uint64_t)
176  * @param object_prefix a prefix for all the data objects
177  * @param data_pool_id pool id where data objects is stored (int64_t)
178  *
179  * Output:
180  * @return 0 on success, negative error code on failure
181  */
182 int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
183 {
184   string object_prefix;
185   uint64_t features, size;
186   uint8_t order;
187   int64_t data_pool_id = -1;
188
189   try {
190     bufferlist::iterator iter = in->begin();
191     ::decode(size, iter);
192     ::decode(order, iter);
193     ::decode(features, iter);
194     ::decode(object_prefix, iter);
195     if (!iter.end()) {
196       ::decode(data_pool_id, iter);
197     }
198   } catch (const buffer::error &err) {
199     return -EINVAL;
200   }
201
202   CLS_LOG(20, "create object_prefix=%s size=%llu order=%u features=%llu",
203           object_prefix.c_str(), (unsigned long long)size, order,
204           (unsigned long long)features);
205
206   if (features & ~RBD_FEATURES_ALL) {
207     return -ENOSYS;
208   }
209
210   if (!object_prefix.size()) {
211     return -EINVAL;
212   }
213
214   bufferlist stored_prefixbl;
215   int r = cls_cxx_map_get_val(hctx, "object_prefix", &stored_prefixbl);
216   if (r != -ENOENT) {
217     CLS_ERR("reading object_prefix returned %d", r);
218     return -EEXIST;
219   }
220
221   bufferlist sizebl;
222   bufferlist orderbl;
223   bufferlist featuresbl;
224   bufferlist object_prefixbl;
225   bufferlist snap_seqbl;
226   bufferlist create_timestampbl;
227   uint64_t snap_seq = 0;
228   utime_t create_timestamp = ceph_clock_now();
229   ::encode(size, sizebl);
230   ::encode(order, orderbl);
231   ::encode(features, featuresbl);
232   ::encode(object_prefix, object_prefixbl);
233   ::encode(snap_seq, snap_seqbl);
234   ::encode(create_timestamp, create_timestampbl);
235
236   map<string, bufferlist> omap_vals;
237   omap_vals["size"] = sizebl;
238   omap_vals["order"] = orderbl;
239   omap_vals["features"] = featuresbl;
240   omap_vals["object_prefix"] = object_prefixbl;
241   omap_vals["snap_seq"] = snap_seqbl;
242   omap_vals["create_timestamp"] = create_timestampbl;
243
244   if (features & RBD_FEATURE_DATA_POOL) {
245     if (data_pool_id == -1) {
246       CLS_ERR("data pool not provided with feature enabled");
247       return -EINVAL;
248     }
249
250     bufferlist data_pool_id_bl;
251     ::encode(data_pool_id, data_pool_id_bl);
252     omap_vals["data_pool_id"] = data_pool_id_bl;
253   } else if (data_pool_id != -1) {
254     CLS_ERR("data pool provided with feature disabled");
255     return -EINVAL;
256   }
257
258   r = cls_cxx_map_set_vals(hctx, &omap_vals);
259   if (r < 0)
260     return r;
261
262   return 0;
263 }
264
265 /**
266  * Input:
267  * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t) (deprecated)
268  * @param read_only true if the image will be used read-only (bool)
269  *
270  * Output:
271  * @param features list of enabled features for the given snapshot (uint64_t)
272  * @param incompatible incompatible feature bits
273  * @returns 0 on success, negative error code on failure
274  */
275 int get_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
276 {
277   uint64_t snap_id;
278   bool read_only = false;
279
280   bufferlist::iterator iter = in->begin();
281   try {
282     ::decode(snap_id, iter);
283     if (!iter.end()) {
284       ::decode(read_only, iter);
285     }
286   } catch (const buffer::error &err) {
287     return -EINVAL;
288   }
289
290   CLS_LOG(20, "get_features snap_id=%" PRIu64 ", read_only=%d",
291           snap_id, read_only);
292
293   // NOTE: keep this deprecated snapshot logic to support negative
294   // test cases in older (pre-Infernalis) releases. Remove once older
295   // releases are no longer supported.
296   if (snap_id != CEPH_NOSNAP) {
297     cls_rbd_snap snap;
298     string snapshot_key;
299     key_from_snap_id(snap_id, &snapshot_key);
300     int r = read_key(hctx, snapshot_key, &snap);
301     if (r < 0) {
302       return r;
303     }
304   }
305
306   uint64_t features;
307   int r = read_key(hctx, "features", &features);
308   if (r < 0) {
309     CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
310     return r;
311   }
312
313   uint64_t incompatible = (read_only ? features & RBD_FEATURES_INCOMPATIBLE :
314                                        features & RBD_FEATURES_RW_INCOMPATIBLE);
315   ::encode(features, *out);
316   ::encode(incompatible, *out);
317   return 0;
318 }
319
320 /**
321  * set the image features
322  *
323  * Input:
324  * @param features image features
325  * @param mask image feature mask
326  *
327  * Output:
328  * none
329  *
330  * @returns 0 on success, negative error code upon failure
331  */
332 int set_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
333 {
334   uint64_t features;
335   uint64_t mask;
336   bufferlist::iterator iter = in->begin();
337   try {
338     ::decode(features, iter);
339     ::decode(mask, iter);
340   } catch (const buffer::error &err) {
341     return -EINVAL;
342   }
343
344   // check that features exists to make sure this is a header object
345   // that was created correctly
346   uint64_t orig_features = 0;
347   int r = read_key(hctx, "features", &orig_features);
348   if (r < 0 && r != -ENOENT) {
349     CLS_ERR("Could not read image's features off disk: %s",
350             cpp_strerror(r).c_str());
351     return r;
352   }
353
354   // newer clients might attempt to mask off features we don't support
355   mask &= RBD_FEATURES_ALL;
356
357   uint64_t enabled_features = features & mask;
358   if ((enabled_features & RBD_FEATURES_MUTABLE) != enabled_features) {
359     CLS_ERR("Attempting to enable immutable feature: %" PRIu64,
360             static_cast<uint64_t>(enabled_features & ~RBD_FEATURES_MUTABLE));
361     return -EINVAL;
362   }
363
364   uint64_t disabled_features = ~features & mask;
365   uint64_t disable_mask = (RBD_FEATURES_MUTABLE | RBD_FEATURES_DISABLE_ONLY);
366   if ((disabled_features & disable_mask) != disabled_features) {
367        CLS_ERR("Attempting to disable immutable feature: %" PRIu64,
368                enabled_features & ~disable_mask);
369        return -EINVAL;
370   }
371
372   features = (orig_features & ~mask) | (features & mask);
373   CLS_LOG(10, "set_features features=%" PRIu64 " orig_features=%" PRIu64,
374           features, orig_features);
375
376   bufferlist bl;
377   ::encode(features, bl);
378   r = cls_cxx_map_set_val(hctx, "features", &bl);
379   if (r < 0) {
380     CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
381     return r;
382   }
383   return 0;
384 }
385
386 /**
387  * check that given feature(s) are set
388  *
389  * @param hctx context
390  * @param need features needed
391  * @return 0 if features are set, negative error (like ENOEXEC) otherwise
392  */
393 int require_feature(cls_method_context_t hctx, uint64_t need)
394 {
395   uint64_t features;
396   int r = read_key(hctx, "features", &features);
397   if (r == -ENOENT)   // this implies it's an old-style image with no features
398     return -ENOEXEC;
399   if (r < 0)
400     return r;
401   if ((features & need) != need) {
402     CLS_LOG(10, "require_feature missing feature %llx, have %llx",
403             (unsigned long long)need, (unsigned long long)features);
404     return -ENOEXEC;
405   }
406   return 0;
407 }
408
409 /**
410  * Input:
411  * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
412  *
413  * Output:
414  * @param order bits to shift to get the size of data objects (uint8_t)
415  * @param size size of the image in bytes for the given snapshot (uint64_t)
416  * @returns 0 on success, negative error code on failure
417  */
418 int get_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
419 {
420   uint64_t snap_id, size;
421   uint8_t order;
422
423   bufferlist::iterator iter = in->begin();
424   try {
425     ::decode(snap_id, iter);
426   } catch (const buffer::error &err) {
427     return -EINVAL;
428   }
429
430   CLS_LOG(20, "get_size snap_id=%llu", (unsigned long long)snap_id);
431
432   int r = read_key(hctx, "order", &order);
433   if (r < 0) {
434     CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
435     return r;
436   }
437
438   if (snap_id == CEPH_NOSNAP) {
439     r = read_key(hctx, "size", &size);
440     if (r < 0) {
441       CLS_ERR("failed to read the image's size off of disk: %s", cpp_strerror(r).c_str());
442       return r;
443     }
444   } else {
445     cls_rbd_snap snap;
446     string snapshot_key;
447     key_from_snap_id(snap_id, &snapshot_key);
448     int r = read_key(hctx, snapshot_key, &snap);
449     if (r < 0)
450       return r;
451
452     size = snap.image_size;
453   }
454
455   ::encode(order, *out);
456   ::encode(size, *out);
457
458   return 0;
459 }
460
461 /**
462  * Input:
463  * @param size new capacity of the image in bytes (uint64_t)
464  *
465  * Output:
466  * @returns 0 on success, negative error code on failure
467  */
468 int set_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
469 {
470   uint64_t size;
471
472   bufferlist::iterator iter = in->begin();
473   try {
474     ::decode(size, iter);
475   } catch (const buffer::error &err) {
476     return -EINVAL;
477   }
478
479   // check that size exists to make sure this is a header object
480   // that was created correctly
481   uint64_t orig_size;
482   int r = read_key(hctx, "size", &orig_size);
483   if (r < 0) {
484     CLS_ERR("Could not read image's size off disk: %s", cpp_strerror(r).c_str());
485     return r;
486   }
487
488   CLS_LOG(20, "set_size size=%llu orig_size=%llu", (unsigned long long)size,
489           (unsigned long long)orig_size);
490
491   bufferlist sizebl;
492   ::encode(size, sizebl);
493   r = cls_cxx_map_set_val(hctx, "size", &sizebl);
494   if (r < 0) {
495     CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
496     return r;
497   }
498
499   // if we are shrinking, and have a parent, shrink our overlap with
500   // the parent, too.
501   if (size < orig_size) {
502     cls_rbd_parent parent;
503     r = read_key(hctx, "parent", &parent);
504     if (r == -ENOENT)
505       r = 0;
506     if (r < 0)
507       return r;
508     if (parent.exists() && parent.overlap > size) {
509       bufferlist parentbl;
510       parent.overlap = size;
511       ::encode(parent, parentbl);
512       r = cls_cxx_map_set_val(hctx, "parent", &parentbl);
513       if (r < 0) {
514         CLS_ERR("error writing parent: %s", cpp_strerror(r).c_str());
515         return r;
516       }
517     }
518   }
519
520   return 0;
521 }
522
523 /**
524  * verify that the header object exists
525  *
526  * @return 0 if the object exists, -ENOENT if it does not, or other error
527  */
528 int check_exists(cls_method_context_t hctx)
529 {
530   uint64_t size;
531   time_t mtime;
532   return cls_cxx_stat(hctx, &size, &mtime);
533 }
534
535 /**
536  * get the current protection status of the specified snapshot
537  *
538  * Input:
539  * @param snap_id (uint64_t) which snapshot to get the status of
540  *
541  * Output:
542  * @param status (uint8_t) one of:
543  * RBD_PROTECTION_STATUS_{PROTECTED, UNPROTECTED, UNPROTECTING}
544  *
545  * @returns 0 on success, negative error code on failure
546  * @returns -EINVAL if snapid is CEPH_NOSNAP
547  */
548 int get_protection_status(cls_method_context_t hctx, bufferlist *in,
549                           bufferlist *out)
550 {
551   snapid_t snap_id;
552
553   bufferlist::iterator iter = in->begin();
554   try {
555     ::decode(snap_id, iter);
556   } catch (const buffer::error &err) {
557     CLS_LOG(20, "get_protection_status: invalid decode");
558     return -EINVAL;
559   }
560
561   int r = check_exists(hctx);
562   if (r < 0)
563     return r;
564
565   CLS_LOG(20, "get_protection_status snap_id=%llu",
566          (unsigned long long)snap_id.val);
567
568   if (snap_id == CEPH_NOSNAP)
569     return -EINVAL;
570
571   cls_rbd_snap snap;
572   string snapshot_key;
573   key_from_snap_id(snap_id.val, &snapshot_key);
574   r = read_key(hctx, snapshot_key, &snap);
575   if (r < 0) {
576     CLS_ERR("could not read key for snapshot id %" PRIu64, snap_id.val);
577     return r;
578   }
579
580   if (snap.protection_status >= RBD_PROTECTION_STATUS_LAST) {
581     CLS_ERR("invalid protection status for snap id %llu: %u",
582             (unsigned long long)snap_id.val, snap.protection_status);
583     return -EIO;
584   }
585
586   ::encode(snap.protection_status, *out);
587   return 0;
588 }
589
590 /**
591  * set the proctection status of a snapshot
592  *
593  * Input:
594  * @param snapid (uint64_t) which snapshot to set the status of
595  * @param status (uint8_t) one of:
596  * RBD_PROTECTION_STATUS_{PROTECTED, UNPROTECTED, UNPROTECTING}
597  *
598  * @returns 0 on success, negative error code on failure
599  * @returns -EINVAL if snapid is CEPH_NOSNAP
600  */
601 int set_protection_status(cls_method_context_t hctx, bufferlist *in,
602                           bufferlist *out)
603 {
604   snapid_t snap_id;
605   uint8_t status;
606
607   bufferlist::iterator iter = in->begin();
608   try {
609     ::decode(snap_id, iter);
610     ::decode(status, iter);
611   } catch (const buffer::error &err) {
612     CLS_LOG(20, "set_protection_status: invalid decode");
613     return -EINVAL;
614   }
615
616   int r = check_exists(hctx);
617   if (r < 0)
618     return r;
619
620   r = require_feature(hctx, RBD_FEATURE_LAYERING);
621   if (r < 0) {
622     CLS_LOG(20, "image does not support layering");
623     return r;
624   }
625
626   CLS_LOG(20, "set_protection_status snapid=%llu status=%u",
627           (unsigned long long)snap_id.val, status);
628
629   if (snap_id == CEPH_NOSNAP)
630     return -EINVAL;
631
632   if (status >= RBD_PROTECTION_STATUS_LAST) {
633     CLS_LOG(10, "invalid protection status for snap id %llu: %u",
634             (unsigned long long)snap_id.val, status);
635     return -EINVAL;
636   }
637
638   cls_rbd_snap snap;
639   string snapshot_key;
640   key_from_snap_id(snap_id.val, &snapshot_key);
641   r = read_key(hctx, snapshot_key, &snap);
642   if (r < 0) {
643     CLS_ERR("could not read key for snapshot id %" PRIu64, snap_id.val);
644     return r;
645   }
646
647   snap.protection_status = status;
648   bufferlist snapshot_bl;
649   ::encode(snap, snapshot_bl);
650   r = cls_cxx_map_set_val(hctx, snapshot_key, &snapshot_bl);
651   if (r < 0) {
652     CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
653     return r;
654   }
655
656   return 0;
657 }
658
659 /**
660  * get striping parameters
661  *
662  * Input:
663  * none
664  *
665  * Output:
666  * @param stripe unit (bytes)
667  * @param stripe count (num objects)
668  *
669  * @returns 0 on success
670  */
671 int get_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
672 {
673   int r = check_exists(hctx);
674   if (r < 0)
675     return r;
676
677   CLS_LOG(20, "get_stripe_unit_count");
678
679   r = require_feature(hctx, RBD_FEATURE_STRIPINGV2);
680   if (r < 0)
681     return r;
682
683   uint64_t stripe_unit = 0, stripe_count = 0;
684   r = read_key(hctx, "stripe_unit", &stripe_unit);
685   if (r == -ENOENT) {
686     // default to object size
687     uint8_t order;
688     r = read_key(hctx, "order", &order);
689     if (r < 0) {
690       CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
691       return -EIO;
692     }
693     stripe_unit = 1ull << order;
694   }
695   if (r < 0)
696     return r;
697   r = read_key(hctx, "stripe_count", &stripe_count);
698   if (r == -ENOENT) {
699     // default to 1
700     stripe_count = 1;
701     r = 0;
702   }
703   if (r < 0)
704     return r;
705
706   ::encode(stripe_unit, *out);
707   ::encode(stripe_count, *out);
708   return 0;
709 }
710
711 /**
712  * set striping parameters
713  *
714  * Input:
715  * @param stripe unit (bytes)
716  * @param stripe count (num objects)
717  *
718  * @returns 0 on success
719  */
720 int set_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
721 {
722   uint64_t stripe_unit, stripe_count;
723
724   bufferlist::iterator iter = in->begin();
725   try {
726     ::decode(stripe_unit, iter);
727     ::decode(stripe_count, iter);
728   } catch (const buffer::error &err) {
729     CLS_LOG(20, "set_stripe_unit_count: invalid decode");
730     return -EINVAL;
731   }
732
733   if (!stripe_count || !stripe_unit)
734     return -EINVAL;
735
736   int r = check_exists(hctx);
737   if (r < 0)
738     return r;
739
740   CLS_LOG(20, "set_stripe_unit_count");
741
742   r = require_feature(hctx, RBD_FEATURE_STRIPINGV2);
743   if (r < 0)
744     return r;
745
746   uint8_t order;
747   r = read_key(hctx, "order", &order);
748   if (r < 0) {
749     CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
750     return r;
751   }
752   if ((1ull << order) % stripe_unit || stripe_unit > (1ull << order)) {
753     CLS_ERR("stripe unit %llu is not a factor of the object size %llu",
754             (unsigned long long)stripe_unit, 1ull << order);
755     return -EINVAL;
756   }
757
758   bufferlist bl, bl2;
759   ::encode(stripe_unit, bl);
760   r = cls_cxx_map_set_val(hctx, "stripe_unit", &bl);
761   if (r < 0) {
762     CLS_ERR("error writing stripe_unit metadata: %s", cpp_strerror(r).c_str());
763     return r;
764   }
765
766   ::encode(stripe_count, bl2);
767   r = cls_cxx_map_set_val(hctx, "stripe_count", &bl2);
768   if (r < 0) {
769     CLS_ERR("error writing stripe_count metadata: %s", cpp_strerror(r).c_str());
770     return r;
771   }
772
773   return 0;
774 }
775
776 int get_create_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
777 {
778   CLS_LOG(20, "get_create_timestamp");
779
780   utime_t timestamp;
781   bufferlist bl;
782   int r = cls_cxx_map_get_val(hctx, "create_timestamp", &bl);
783   if (r < 0) {
784     if (r != -ENOENT) {
785       CLS_ERR("error reading create_timestamp: %s", cpp_strerror(r).c_str());
786       return r;
787     }
788   } else {
789     try {
790       bufferlist::iterator it = bl.begin();
791       ::decode(timestamp, it);
792     } catch (const buffer::error &err) {
793       CLS_ERR("could not decode create_timestamp");
794       return -EIO;
795     }
796   }
797
798   ::encode(timestamp, *out);
799   return 0;
800 }
801
802 /**
803  * get the image flags
804  *
805  * Input:
806  * @param snap_id which snapshot to query, to CEPH_NOSNAP (uint64_t)
807  *
808  * Output:
809  * @param flags image flags
810  *
811  * @returns 0 on success, negative error code upon failure
812  */
813 int get_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
814 {
815   uint64_t snap_id;
816   bufferlist::iterator iter = in->begin();
817   try {
818     ::decode(snap_id, iter);
819   } catch (const buffer::error &err) {
820     return -EINVAL;
821   }
822
823   CLS_LOG(20, "get_flags snap_id=%llu", (unsigned long long)snap_id);
824
825   uint64_t flags = 0;
826   if (snap_id == CEPH_NOSNAP) {
827     int r = read_key(hctx, "flags", &flags);
828     if (r < 0 && r != -ENOENT) {
829       CLS_ERR("failed to read flags off disk: %s", cpp_strerror(r).c_str());
830       return r;
831     }
832   } else {
833     cls_rbd_snap snap;
834     string snapshot_key;
835     key_from_snap_id(snap_id, &snapshot_key);
836     int r = read_key(hctx, snapshot_key, &snap);
837     if (r < 0) {
838       return r;
839     }
840     flags = snap.flags;
841   }
842
843   ::encode(flags, *out);
844   return 0;
845 }
846
847 /**
848  * set the image flags
849  *
850  * Input:
851  * @param flags image flags
852  * @param mask image flag mask
853  * @param snap_id which snapshot to update, or CEPH_NOSNAP (uint64_t)
854  *
855  * Output:
856  * none
857  *
858  * @returns 0 on success, negative error code upon failure
859  */
860 int set_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
861 {
862   uint64_t flags;
863   uint64_t mask;
864   uint64_t snap_id = CEPH_NOSNAP;
865   bufferlist::iterator iter = in->begin();
866   try {
867     ::decode(flags, iter);
868     ::decode(mask, iter);
869     if (!iter.end()) {
870       ::decode(snap_id, iter);
871     }
872   } catch (const buffer::error &err) {
873     return -EINVAL;
874   }
875
876   // check that size exists to make sure this is a header object
877   // that was created correctly
878   int r;
879   uint64_t orig_flags = 0;
880   cls_rbd_snap snap_meta;
881   string snap_meta_key;
882   if (snap_id == CEPH_NOSNAP) {
883     r = read_key(hctx, "flags", &orig_flags);
884     if (r < 0 && r != -ENOENT) {
885       CLS_ERR("Could not read image's flags off disk: %s",
886               cpp_strerror(r).c_str());
887       return r;
888     }
889   } else {
890     key_from_snap_id(snap_id, &snap_meta_key);
891     r = read_key(hctx, snap_meta_key, &snap_meta);
892     if (r < 0) {
893       CLS_ERR("Could not read snapshot: snap_id=%" PRIu64 ": %s",
894               snap_id, cpp_strerror(r).c_str());
895       return r;
896     }
897     orig_flags = snap_meta.flags;
898   }
899
900   flags = (orig_flags & ~mask) | (flags & mask);
901   CLS_LOG(20, "set_flags snap_id=%" PRIu64 ", orig_flags=%" PRIu64 ", "
902               "new_flags=%" PRIu64 ", mask=%" PRIu64, snap_id, orig_flags,
903               flags, mask);
904
905   if (snap_id == CEPH_NOSNAP) {
906     bufferlist bl;
907     ::encode(flags, bl);
908     r = cls_cxx_map_set_val(hctx, "flags", &bl);
909   } else {
910     snap_meta.flags = flags;
911
912     bufferlist bl;
913     ::encode(snap_meta, bl);
914     r = cls_cxx_map_set_val(hctx, snap_meta_key, &bl);
915   }
916
917   if (r < 0) {
918     CLS_ERR("error updating flags: %s", cpp_strerror(r).c_str());
919     return r;
920   }
921   return 0;
922 }
923
924 /**
925  * get the current parent, if any
926  *
927  * Input:
928  * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
929  *
930  * Output:
931  * @param pool parent pool id (-1 if parent does not exist)
932  * @param image parent image id
933  * @param snapid parent snapid
934  * @param size portion of parent mapped under the child
935  *
936  * @returns 0 on success or parent does not exist, negative error code on failure
937  */
938 int get_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
939 {
940   uint64_t snap_id;
941
942   bufferlist::iterator iter = in->begin();
943   try {
944     ::decode(snap_id, iter);
945   } catch (const buffer::error &err) {
946     return -EINVAL;
947   }
948
949   int r = check_exists(hctx);
950   if (r < 0)
951     return r;
952
953   CLS_LOG(20, "get_parent snap_id=%llu", (unsigned long long)snap_id);
954
955   cls_rbd_parent parent;
956   r = require_feature(hctx, RBD_FEATURE_LAYERING);
957   if (r == 0) {
958     if (snap_id == CEPH_NOSNAP) {
959       r = read_key(hctx, "parent", &parent);
960       if (r < 0 && r != -ENOENT)
961         return r;
962     } else {
963       cls_rbd_snap snap;
964       string snapshot_key;
965       key_from_snap_id(snap_id, &snapshot_key);
966       r = read_key(hctx, snapshot_key, &snap);
967       if (r < 0 && r != -ENOENT)
968         return r;
969       parent = snap.parent;
970     }
971   }
972
973   ::encode(parent.pool, *out);
974   ::encode(parent.id, *out);
975   ::encode(parent.snapid, *out);
976   ::encode(parent.overlap, *out);
977   return 0;
978 }
979
980 /**
981  * set the image parent
982  *
983  * Input:
984  * @param pool parent pool
985  * @param id parent image id
986  * @param snapid parent snapid
987  * @param size parent size
988  *
989  * @returns 0 on success, or negative error code
990  */
991 int set_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
992 {
993   int64_t pool;
994   string id;
995   snapid_t snapid;
996   uint64_t size;
997
998   bufferlist::iterator iter = in->begin();
999   try {
1000     ::decode(pool, iter);
1001     ::decode(id, iter);
1002     ::decode(snapid, iter);
1003     ::decode(size, iter);
1004   } catch (const buffer::error &err) {
1005     CLS_LOG(20, "cls_rbd::set_parent: invalid decode");
1006     return -EINVAL;
1007   }
1008
1009   int r = check_exists(hctx);
1010   if (r < 0) {
1011     CLS_LOG(20, "cls_rbd::set_parent: child already exists");
1012     return r;
1013   }
1014
1015   r = require_feature(hctx, RBD_FEATURE_LAYERING);
1016   if (r < 0) {
1017     CLS_LOG(20, "cls_rbd::set_parent: child does not support layering");
1018     return r;
1019   }
1020
1021   CLS_LOG(20, "set_parent pool=%llu id=%s snapid=%llu size=%llu",
1022           (unsigned long long)pool, id.c_str(), (unsigned long long)snapid.val,
1023           (unsigned long long)size);
1024
1025   if (pool < 0 || id.length() == 0 || snapid == CEPH_NOSNAP || size == 0) {
1026     return -EINVAL;
1027   }
1028
1029   // make sure there isn't already a parent
1030   cls_rbd_parent parent;
1031   r = read_key(hctx, "parent", &parent);
1032   if (r == 0) {
1033     CLS_LOG(20, "set_parent existing parent pool=%llu id=%s snapid=%llu"
1034             "overlap=%llu", (unsigned long long)parent.pool, parent.id.c_str(),
1035             (unsigned long long)parent.snapid.val,
1036             (unsigned long long)parent.overlap);
1037     return -EEXIST;
1038   }
1039
1040   // our overlap is the min of our size and the parent's size.
1041   uint64_t our_size;
1042   r = read_key(hctx, "size", &our_size);
1043   if (r < 0)
1044     return r;
1045
1046   bufferlist parentbl;
1047   parent.pool = pool;
1048   parent.id = id;
1049   parent.snapid = snapid;
1050   parent.overlap = MIN(our_size, size);
1051   ::encode(parent, parentbl);
1052   r = cls_cxx_map_set_val(hctx, "parent", &parentbl);
1053   if (r < 0) {
1054     CLS_ERR("error writing parent: %s", cpp_strerror(r).c_str());
1055     return r;
1056   }
1057
1058   return 0;
1059 }
1060
1061
1062 /**
1063  * remove the parent pointer
1064  *
1065  * This can only happen on the head, not on a snapshot.  No arguments.
1066  *
1067  * @returns 0 on success, negative error code on failure.
1068  */
1069 int remove_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1070 {
1071   int r = check_exists(hctx);
1072   if (r < 0)
1073     return r;
1074
1075   r = require_feature(hctx, RBD_FEATURE_LAYERING);
1076   if (r < 0)
1077     return r;
1078
1079   uint64_t features;
1080   r = read_key(hctx, "features", &features);
1081   if (r < 0) {
1082     return r;
1083   }
1084
1085   // remove the parent from all snapshots
1086   if ((features & RBD_FEATURE_DEEP_FLATTEN) != 0) {
1087     int max_read = RBD_MAX_KEYS_READ;
1088     vector<snapid_t> snap_ids;
1089     string last_read = RBD_SNAP_KEY_PREFIX;
1090     bool more;
1091
1092     do {
1093       set<string> keys;
1094       r = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys, &more);
1095       if (r < 0) {
1096         return r;
1097       }
1098
1099       for (std::set<string>::const_iterator it = keys.begin();
1100            it != keys.end(); ++it) {
1101         if ((*it).find(RBD_SNAP_KEY_PREFIX) != 0) {
1102           break;
1103         }
1104
1105         uint64_t snap_id = snap_id_from_key(*it);
1106         cls_rbd_snap snap_meta;
1107         r = read_key(hctx, *it, &snap_meta);
1108         if (r < 0) {
1109           CLS_ERR("Could not read snapshot: snap_id=%" PRIu64 ": %s",
1110                   snap_id, cpp_strerror(r).c_str());
1111           return r;
1112         }
1113
1114         snap_meta.parent = cls_rbd_parent();
1115
1116         bufferlist bl;
1117         ::encode(snap_meta, bl);
1118         r = cls_cxx_map_set_val(hctx, *it, &bl);
1119         if (r < 0) {
1120           CLS_ERR("Could not update snapshot: snap_id=%" PRIu64 ": %s",
1121                   snap_id, cpp_strerror(r).c_str());
1122           return r;
1123         }
1124       }
1125
1126       if (!keys.empty()) {
1127         last_read = *(keys.rbegin());
1128       }
1129     } while (more);
1130   }
1131
1132   cls_rbd_parent parent;
1133   r = read_key(hctx, "parent", &parent);
1134   if (r < 0)
1135     return r;
1136
1137   r = cls_cxx_map_remove_key(hctx, "parent");
1138   if (r < 0) {
1139     CLS_ERR("error removing parent: %s", cpp_strerror(r).c_str());
1140     return r;
1141   }
1142   return 0;
1143 }
1144
1145 /**
1146  * methods for dealing with rbd_children object
1147  */
1148
1149 static int decode_parent_common(bufferlist::iterator& it, uint64_t *pool_id,
1150                                 string *image_id, snapid_t *snap_id)
1151 {
1152   try {
1153     ::decode(*pool_id, it);
1154     ::decode(*image_id, it);
1155     ::decode(*snap_id, it);
1156   } catch (const buffer::error &err) {
1157     CLS_ERR("error decoding parent spec");
1158     return -EINVAL;
1159   }
1160   return 0;
1161 }
1162
1163 static int decode_parent(bufferlist *in, uint64_t *pool_id,
1164                          string *image_id, snapid_t *snap_id)
1165 {
1166   bufferlist::iterator it = in->begin();
1167   return decode_parent_common(it, pool_id, image_id, snap_id);
1168 }
1169
1170 static int decode_parent_and_child(bufferlist *in, uint64_t *pool_id,
1171                                    string *image_id, snapid_t *snap_id,
1172                                    string *c_image_id)
1173 {
1174   bufferlist::iterator it = in->begin();
1175   int r = decode_parent_common(it, pool_id, image_id, snap_id);
1176   if (r < 0)
1177     return r;
1178   try {
1179     ::decode(*c_image_id, it);
1180   } catch (const buffer::error &err) {
1181     CLS_ERR("error decoding child image id");
1182     return -EINVAL;
1183   }
1184   return 0;
1185 }
1186
1187 static string parent_key(uint64_t pool_id, string image_id, snapid_t snap_id)
1188 {
1189   bufferlist key_bl;
1190   ::encode(pool_id, key_bl);
1191   ::encode(image_id, key_bl);
1192   ::encode(snap_id, key_bl);
1193   return string(key_bl.c_str(), key_bl.length());
1194 }
1195
1196 /**
1197  * add child to rbd_children directory object
1198  *
1199  * rbd_children is a map of (p_pool_id, p_image_id, p_snap_id) to
1200  * [c_image_id, [c_image_id ... ]]
1201  *
1202  * Input:
1203  * @param p_pool_id parent pool id
1204  * @param p_image_id parent image oid
1205  * @param p_snap_id parent snapshot id
1206  * @param c_image_id new child image oid to add
1207  *
1208  * @returns 0 on success, negative error on failure
1209  */
1210
1211 int add_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1212 {
1213   int r;
1214
1215   uint64_t p_pool_id;
1216   snapid_t p_snap_id;
1217   string p_image_id, c_image_id;
1218   // Use set for ease of erase() for remove_child()
1219   std::set<string> children;
1220
1221   r = decode_parent_and_child(in, &p_pool_id, &p_image_id, &p_snap_id,
1222                               &c_image_id);
1223   if (r < 0)
1224     return r;
1225
1226   CLS_LOG(20, "add_child %s to (%" PRIu64 ", %s, %" PRIu64 ")", c_image_id.c_str(),
1227           p_pool_id, p_image_id.c_str(), p_snap_id.val);
1228
1229   string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1230
1231   // get current child list for parent, if any
1232   r = read_key(hctx, key, &children);
1233   if ((r < 0) && (r != -ENOENT)) {
1234     CLS_LOG(20, "add_child: omap read failed: %s", cpp_strerror(r).c_str());
1235     return r;
1236   }
1237
1238   if (children.find(c_image_id) != children.end()) {
1239     CLS_LOG(20, "add_child: child already exists: %s", c_image_id.c_str());
1240     return -EEXIST;
1241   }
1242   // add new child
1243   children.insert(c_image_id);
1244
1245   // write back
1246   bufferlist childbl;
1247   ::encode(children, childbl);
1248   r = cls_cxx_map_set_val(hctx, key, &childbl);
1249   if (r < 0)
1250     CLS_LOG(20, "add_child: omap write failed: %s", cpp_strerror(r).c_str());
1251   return r;
1252 }
1253
1254 /**
1255  * remove child from rbd_children directory object
1256  *
1257  * Input:
1258  * @param p_pool_id parent pool id
1259  * @param p_image_id parent image oid
1260  * @param p_snap_id parent snapshot id
1261  * @param c_image_id new child image oid to add
1262  *
1263  * @returns 0 on success, negative error on failure
1264  */
1265
1266 int remove_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1267 {
1268   int r;
1269
1270   uint64_t p_pool_id;
1271   snapid_t p_snap_id;
1272   string p_image_id, c_image_id;
1273   std::set<string> children;
1274
1275   r = decode_parent_and_child(in, &p_pool_id, &p_image_id, &p_snap_id,
1276                               &c_image_id);
1277   if (r < 0)
1278     return r;
1279
1280   CLS_LOG(20, "remove_child %s from (%" PRIu64 ", %s, %" PRIu64 ")",
1281                c_image_id.c_str(), p_pool_id, p_image_id.c_str(),
1282                p_snap_id.val);
1283
1284   string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1285
1286   // get current child list for parent.  Unlike add_child(), an empty list
1287   // is an error (how can we remove something that doesn't exist?)
1288   r = read_key(hctx, key, &children);
1289   if (r < 0) {
1290     CLS_LOG(20, "remove_child: read omap failed: %s", cpp_strerror(r).c_str());
1291     return r;
1292   }
1293
1294   if (children.find(c_image_id) == children.end()) {
1295     CLS_LOG(20, "remove_child: child not found: %s", c_image_id.c_str());
1296     return -ENOENT;
1297   }
1298   // find and remove child
1299   children.erase(c_image_id);
1300
1301   // now empty?  remove key altogether
1302   if (children.empty()) {
1303     r = cls_cxx_map_remove_key(hctx, key);
1304     if (r < 0)
1305       CLS_LOG(20, "remove_child: remove key failed: %s", cpp_strerror(r).c_str());
1306   } else {
1307     // write back shortened children list
1308     bufferlist childbl;
1309     ::encode(children, childbl);
1310     r = cls_cxx_map_set_val(hctx, key, &childbl);
1311     if (r < 0)
1312       CLS_LOG(20, "remove_child: write omap failed: %s", cpp_strerror(r).c_str());
1313   }
1314   return r;
1315 }
1316
1317 /**
1318  * Input:
1319  * @param p_pool_id parent pool id
1320  * @param p_image_id parent image oid
1321  * @param p_snap_id parent snapshot id
1322  * @param c_image_id new child image oid to add
1323  *
1324  * Output:
1325  * @param children set<string> of children
1326  *
1327  * @returns 0 on success, negative error on failure
1328  */
1329 int get_children(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1330 {
1331   int r;
1332   uint64_t p_pool_id;
1333   snapid_t p_snap_id;
1334   string p_image_id;
1335   std::set<string> children;
1336
1337   r = decode_parent(in, &p_pool_id, &p_image_id, &p_snap_id);
1338   if (r < 0)
1339     return r;
1340
1341   CLS_LOG(20, "get_children of (%" PRIu64 ", %s, %" PRIu64 ")",
1342           p_pool_id, p_image_id.c_str(), p_snap_id.val);
1343
1344   string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1345
1346   r = read_key(hctx, key, &children);
1347   if (r < 0) {
1348     if (r != -ENOENT)
1349       CLS_LOG(20, "get_children: read omap failed: %s", cpp_strerror(r).c_str());
1350     return r;
1351   }
1352   ::encode(children, *out);
1353   return 0;
1354 }
1355
1356
1357 /**
1358  * Get the information needed to create a rados snap context for doing
1359  * I/O to the data objects. This must include all snapshots.
1360  *
1361  * Output:
1362  * @param snap_seq the highest snapshot id ever associated with the image (uint64_t)
1363  * @param snap_ids existing snapshot ids in descending order (vector<uint64_t>)
1364  * @returns 0 on success, negative error code on failure
1365  */
1366 int get_snapcontext(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1367 {
1368   CLS_LOG(20, "get_snapcontext");
1369
1370   int r;
1371   int max_read = RBD_MAX_KEYS_READ;
1372   vector<snapid_t> snap_ids;
1373   string last_read = RBD_SNAP_KEY_PREFIX;
1374   bool more;
1375
1376   do {
1377     set<string> keys;
1378     r = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys, &more);
1379     if (r < 0)
1380       return r;
1381
1382     for (set<string>::const_iterator it = keys.begin();
1383          it != keys.end(); ++it) {
1384       if ((*it).find(RBD_SNAP_KEY_PREFIX) != 0)
1385         break;
1386       snapid_t snap_id = snap_id_from_key(*it);
1387       snap_ids.push_back(snap_id);
1388     }
1389     if (!keys.empty())
1390       last_read = *(keys.rbegin());
1391   } while (more);
1392
1393   uint64_t snap_seq;
1394   r = read_key(hctx, "snap_seq", &snap_seq);
1395   if (r < 0) {
1396     CLS_ERR("could not read the image's snap_seq off disk: %s", cpp_strerror(r).c_str());
1397     return r;
1398   }
1399
1400   // snap_ids must be descending in a snap context
1401   std::reverse(snap_ids.begin(), snap_ids.end());
1402
1403   ::encode(snap_seq, *out);
1404   ::encode(snap_ids, *out);
1405
1406   return 0;
1407 }
1408
1409 /**
1410  * Output:
1411  * @param object_prefix prefix for data object names (string)
1412  * @returns 0 on success, negative error code on failure
1413  */
1414 int get_object_prefix(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1415 {
1416   CLS_LOG(20, "get_object_prefix");
1417
1418   string object_prefix;
1419   int r = read_key(hctx, "object_prefix", &object_prefix);
1420   if (r < 0) {
1421     CLS_ERR("failed to read the image's object prefix off of disk: %s",
1422             cpp_strerror(r).c_str());
1423     return r;
1424   }
1425
1426   ::encode(object_prefix, *out);
1427
1428   return 0;
1429 }
1430
1431 /**
1432  * Input:
1433  * none
1434  *
1435  * Output:
1436  * @param pool_id (int64_t) of data pool or -1 if none
1437  * @returns 0 on success, negative error code on failure
1438  */
1439 int get_data_pool(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1440 {
1441   CLS_LOG(20, "get_data_pool");
1442
1443   int64_t data_pool_id = -1;
1444   int r = read_key(hctx, "data_pool_id", &data_pool_id);
1445   if (r == -ENOENT) {
1446     data_pool_id = -1;
1447   } else if (r < 0) {
1448     CLS_ERR("error reading image data pool id: %s", cpp_strerror(r).c_str());
1449     return r;
1450   }
1451
1452   ::encode(data_pool_id, *out);
1453   return 0;
1454 }
1455
1456 int get_snapshot_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1457 {
1458   uint64_t snap_id;
1459
1460   bufferlist::iterator iter = in->begin();
1461   try {
1462     ::decode(snap_id, iter);
1463   } catch (const buffer::error &err) {
1464     return -EINVAL;
1465   }
1466
1467   CLS_LOG(20, "get_snapshot_name snap_id=%llu", (unsigned long long)snap_id);
1468
1469   if (snap_id == CEPH_NOSNAP)
1470     return -EINVAL;
1471
1472   cls_rbd_snap snap;
1473   string snapshot_key;
1474   key_from_snap_id(snap_id, &snapshot_key);
1475   int r = read_key(hctx, snapshot_key, &snap);
1476   if (r < 0)
1477     return r;
1478
1479   ::encode(snap.name, *out);
1480
1481   return 0;
1482 }
1483
1484 int get_snapshot_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1485 {
1486   uint64_t snap_id;
1487   
1488   bufferlist::iterator iter = in->begin();
1489   try {
1490     ::decode(snap_id, iter);
1491   } catch (const buffer::error &err) {
1492     return -EINVAL;
1493   }
1494
1495   CLS_LOG(20, "get_snapshot_timestamp snap_id=%llu", (unsigned long long)snap_id);
1496
1497   if (snap_id == CEPH_NOSNAP) {
1498     return -EINVAL;
1499   }
1500   
1501   cls_rbd_snap snap;
1502   string snapshot_key;
1503   key_from_snap_id(snap_id, &snapshot_key);
1504   int r = read_key(hctx, snapshot_key, &snap);
1505   if (r < 0) {
1506     return r;
1507   }
1508
1509   ::encode(snap.timestamp, *out);
1510   return 0;
1511 }
1512
1513 /**
1514  * Retrieve namespace of a snapshot.
1515  *
1516  * Input:
1517  * @param snap_id id of the snapshot (uint64_t)
1518  *
1519  * Output:
1520  * @param SnapshotNamespace
1521  * @returns 0 on success, negative error code on failure.
1522  */
1523 int get_snapshot_namespace(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1524 {
1525   uint64_t snap_id;
1526
1527   bufferlist::iterator iter = in->begin();
1528   try {
1529     ::decode(snap_id, iter);
1530   } catch (const buffer::error &err) {
1531     return -EINVAL;
1532   }
1533
1534   CLS_LOG(20, "get_snapshot_namespace snap_id=%" PRIu64, snap_id);
1535
1536   if (snap_id == CEPH_NOSNAP) {
1537     return -EINVAL;
1538   }
1539
1540   cls_rbd_snap snap;
1541   string snapshot_key;
1542   key_from_snap_id(snap_id, &snapshot_key);
1543   int r = read_key(hctx, snapshot_key, &snap);
1544   if (r < 0) {
1545     return r;
1546   }
1547
1548   ::encode(snap.snapshot_namespace, *out);
1549
1550   return 0;
1551 }
1552
1553 /**
1554  * Adds a snapshot to an rbd header. Ensures the id and name are unique.
1555  *
1556  * Input:
1557  * @param snap_name name of the snapshot (string)
1558  * @param snap_id id of the snapshot (uint64_t)
1559  * @param snap_namespace namespace of the snapshot (cls::rbd::SnapshotNamespaceOnDisk)
1560  *
1561  * Output:
1562  * @returns 0 on success, negative error code on failure.
1563  * @returns -ESTALE if the input snap_id is less than the image's snap_seq
1564  * @returns -EEXIST if the id or name are already used by another snapshot
1565  */
1566 int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1567 {
1568   bufferlist snap_namebl, snap_idbl;
1569   cls_rbd_snap snap_meta;
1570   uint64_t snap_limit;
1571
1572   try {
1573     bufferlist::iterator iter = in->begin();
1574     ::decode(snap_meta.name, iter);
1575     ::decode(snap_meta.id, iter);
1576     if (!iter.end()) {
1577       ::decode(snap_meta.snapshot_namespace, iter);
1578     }
1579   } catch (const buffer::error &err) {
1580     return -EINVAL;
1581   }
1582
1583   if (boost::get<cls::rbd::UnknownSnapshotNamespace>(
1584         &snap_meta.snapshot_namespace.snapshot_namespace) != nullptr) {
1585     CLS_ERR("Unknown snapshot namespace provided");
1586     return -EINVAL;
1587   }
1588
1589   CLS_LOG(20, "snapshot_add name=%s id=%llu", snap_meta.name.c_str(),
1590          (unsigned long long)snap_meta.id.val);
1591
1592   if (snap_meta.id > CEPH_MAXSNAP)
1593     return -EINVAL;
1594
1595   uint64_t cur_snap_seq;
1596   int r = read_key(hctx, "snap_seq", &cur_snap_seq);
1597   if (r < 0) {
1598     CLS_ERR("Could not read image's snap_seq off disk: %s", cpp_strerror(r).c_str());
1599     return r;
1600   }
1601
1602   // client lost a race with another snapshot creation.
1603   // snap_seq must be monotonically increasing.
1604   if (snap_meta.id < cur_snap_seq)
1605     return -ESTALE;
1606
1607   r = read_key(hctx, "size", &snap_meta.image_size);
1608   if (r < 0) {
1609     CLS_ERR("Could not read image's size off disk: %s", cpp_strerror(r).c_str());
1610     return r;
1611   }
1612   r = read_key(hctx, "features", &snap_meta.features);
1613   if (r < 0) {
1614     CLS_ERR("Could not read image's features off disk: %s", cpp_strerror(r).c_str());
1615     return r;
1616   }
1617   r = read_key(hctx, "flags", &snap_meta.flags);
1618   if (r < 0 && r != -ENOENT) {
1619     CLS_ERR("Could not read image's flags off disk: %s", cpp_strerror(r).c_str());
1620     return r;
1621   }
1622
1623   r = read_key(hctx, "snap_limit", &snap_limit);
1624   if (r == -ENOENT) {
1625     snap_limit = UINT64_MAX;
1626   } else if (r < 0) {
1627     CLS_ERR("Could not read snapshot limit off disk: %s", cpp_strerror(r).c_str());
1628     return r;
1629   }
1630
1631   snap_meta.timestamp = ceph_clock_now();
1632
1633   int max_read = RBD_MAX_KEYS_READ;
1634   uint64_t total_read = 0;
1635   string last_read = RBD_SNAP_KEY_PREFIX;
1636   bool more;
1637   do {
1638     map<string, bufferlist> vals;
1639     r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
1640                              max_read, &vals, &more);
1641     if (r < 0)
1642       return r;
1643
1644     total_read += vals.size();
1645     if (total_read >= snap_limit) {
1646       CLS_ERR("Attempt to create snapshot over limit of %" PRIu64, snap_limit);
1647       return -EDQUOT;
1648     }
1649
1650     for (map<string, bufferlist>::iterator it = vals.begin();
1651          it != vals.end(); ++it) {
1652       cls_rbd_snap old_meta;
1653       bufferlist::iterator iter = it->second.begin();
1654       try {
1655         ::decode(old_meta, iter);
1656       } catch (const buffer::error &err) {
1657         snapid_t snap_id = snap_id_from_key(it->first);
1658         CLS_ERR("error decoding snapshot metadata for snap_id: %llu",
1659                 (unsigned long long)snap_id.val);
1660         return -EIO;
1661       }
1662       if ((snap_meta.name == old_meta.name &&
1663             snap_meta.snapshot_namespace == old_meta.snapshot_namespace) ||
1664           snap_meta.id == old_meta.id) {
1665         CLS_LOG(20, "snap_name %s or snap_id %llu matches existing snap %s %llu",
1666                 snap_meta.name.c_str(), (unsigned long long)snap_meta.id.val,
1667                 old_meta.name.c_str(), (unsigned long long)old_meta.id.val);
1668         return -EEXIST;
1669       }
1670     }
1671
1672     if (!vals.empty())
1673       last_read = vals.rbegin()->first;
1674   } while (more);
1675
1676   // snapshot inherits parent, if any
1677   cls_rbd_parent parent;
1678   r = read_key(hctx, "parent", &parent);
1679   if (r < 0 && r != -ENOENT)
1680     return r;
1681   if (r == 0) {
1682     snap_meta.parent = parent;
1683   }
1684
1685   bufferlist snap_metabl, snap_seqbl;
1686   ::encode(snap_meta, snap_metabl);
1687   ::encode(snap_meta.id, snap_seqbl);
1688
1689   string snapshot_key;
1690   key_from_snap_id(snap_meta.id, &snapshot_key);
1691   map<string, bufferlist> vals;
1692   vals["snap_seq"] = snap_seqbl;
1693   vals[snapshot_key] = snap_metabl;
1694   r = cls_cxx_map_set_vals(hctx, &vals);
1695   if (r < 0) {
1696     CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1697     return r;
1698   }
1699
1700   return 0;
1701 }
1702
1703
1704 /**
1705  * rename snapshot .
1706  *
1707  * Input:
1708  * @param src_snap_id old snap id of the snapshot (snapid_t)
1709  * @param dst_snap_name new name of the snapshot (string)
1710  *
1711  * Output:
1712  * @returns 0 on success, negative error code on failure.
1713  */
1714 int snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1715 {
1716   bufferlist snap_namebl, snap_idbl;
1717   snapid_t src_snap_id;
1718   string src_snap_key,dst_snap_name;
1719   cls_rbd_snap snap_meta;
1720   int r;
1721
1722   try {
1723     bufferlist::iterator iter = in->begin();
1724     ::decode(src_snap_id, iter);
1725     ::decode(dst_snap_name, iter);
1726   } catch (const buffer::error &err) {
1727     return -EINVAL;
1728   }
1729   
1730   CLS_LOG(20, "snapshot_rename id=%llu dst_name=%s", (unsigned long long)src_snap_id.val,
1731          dst_snap_name.c_str());
1732
1733   int max_read = RBD_MAX_KEYS_READ;
1734   string last_read = RBD_SNAP_KEY_PREFIX;
1735   bool more;
1736   do {
1737     map<string, bufferlist> vals;
1738     r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
1739                              max_read, &vals, &more);
1740     if (r < 0)
1741       return r;
1742
1743     for (map<string, bufferlist>::iterator it = vals.begin();
1744          it != vals.end(); ++it) {
1745       bufferlist::iterator iter = it->second.begin();
1746       try {
1747         ::decode(snap_meta, iter);
1748       } catch (const buffer::error &err) {
1749         CLS_ERR("error decoding snapshot metadata for snap : %s",
1750                 dst_snap_name.c_str());
1751         return -EIO;
1752       }
1753       if (dst_snap_name == snap_meta.name) {
1754         CLS_LOG(20, "snap_name %s  matches existing snap with snap id = %llu ",
1755                 dst_snap_name.c_str(), (unsigned long long)snap_meta.id.val);
1756         return -EEXIST;
1757       }
1758     }
1759     if (!vals.empty())
1760       last_read = vals.rbegin()->first;
1761   } while (more);
1762
1763   key_from_snap_id(src_snap_id, &src_snap_key);
1764   r = read_key(hctx, src_snap_key, &snap_meta); 
1765   if (r == -ENOENT) {
1766     CLS_LOG(20, "cannot find existing snap with snap id = %llu ", (unsigned long long)src_snap_id);
1767     return r;
1768   }
1769   snap_meta.name = dst_snap_name;
1770   bufferlist snap_metabl;
1771   ::encode(snap_meta, snap_metabl);
1772
1773   r = cls_cxx_map_set_val(hctx, src_snap_key, &snap_metabl);
1774   if (r < 0) {
1775     CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1776     return r;
1777   }
1778
1779   return 0;
1780 }
1781 /**
1782  * Removes a snapshot from an rbd header.
1783  *
1784  * Input:
1785  * @param snap_id the id of the snapshot to remove (uint64_t)
1786  *
1787  * Output:
1788  * @returns 0 on success, negative error code on failure
1789  */
1790 int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1791 {
1792   snapid_t snap_id;
1793
1794   try {
1795     bufferlist::iterator iter = in->begin();
1796     ::decode(snap_id, iter);
1797   } catch (const buffer::error &err) {
1798     return -EINVAL;
1799   }
1800
1801   CLS_LOG(20, "snapshot_remove id=%llu", (unsigned long long)snap_id.val);
1802
1803   // check if the key exists. we can't rely on remove_key doing this for
1804   // us, since OMAPRMKEYS returns success if the key is not there.
1805   // bug or feature? sounds like a bug, since tmap did not have this
1806   // behavior, but cls_rgw may rely on it...
1807   cls_rbd_snap snap;
1808   string snapshot_key;
1809   key_from_snap_id(snap_id, &snapshot_key);
1810   int r = read_key(hctx, snapshot_key, &snap);
1811   if (r == -ENOENT)
1812     return -ENOENT;
1813
1814   if (snap.protection_status != RBD_PROTECTION_STATUS_UNPROTECTED)
1815     return -EBUSY;
1816
1817   r = cls_cxx_map_remove_key(hctx, snapshot_key);
1818   if (r < 0) {
1819     CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1820     return r;
1821   }
1822
1823   return 0;
1824 }
1825
1826 /**
1827  * Returns a uint64_t of all the features supported by this class.
1828  */
1829 int get_all_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1830 {
1831   uint64_t all_features = RBD_FEATURES_ALL;
1832   ::encode(all_features, *out);
1833   return 0;
1834 }
1835
1836 /**
1837  * "Copy up" data from the parent of a clone to the clone's object(s).
1838  * Used for implementing copy-on-write for a clone image.  Client
1839  * will pass down a chunk of data that fits completely within one
1840  * clone block (one object), and is aligned (starts at beginning of block),
1841  * but may be shorter (for non-full parent blocks).  The class method
1842  * can't know the object size to validate the requested length,
1843  * so it just writes the data as given if the child object doesn't
1844  * already exist, and returns success if it does.
1845  *
1846  * Input:
1847  * @param in bufferlist of data to write
1848  *
1849  * Output:
1850  * @returns 0 on success, or if block already exists in child
1851  *  negative error code on other error
1852  */
1853
1854 int copyup(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1855 {
1856   // check for existence; if child object exists, just return success
1857   if (cls_cxx_stat(hctx, NULL, NULL) == 0)
1858     return 0;
1859   CLS_LOG(20, "copyup: writing length %d\n", in->length());
1860   return cls_cxx_write(hctx, 0, in->length(), in);
1861 }
1862
1863
1864 /************************ rbd_id object methods **************************/
1865
1866 /**
1867  * Input:
1868  * @param in ignored
1869  *
1870  * Output:
1871  * @param id the id stored in the object
1872  * @returns 0 on success, negative error code on failure
1873  */
1874 int get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1875 {
1876   uint64_t size;
1877   int r = cls_cxx_stat(hctx, &size, NULL);
1878   if (r < 0)
1879     return r;
1880
1881   if (size == 0)
1882     return -ENOENT;
1883
1884   bufferlist read_bl;
1885   r = cls_cxx_read(hctx, 0, size, &read_bl);
1886   if (r < 0) {
1887     CLS_ERR("get_id: could not read id: %s", cpp_strerror(r).c_str());
1888     return r;
1889   }
1890
1891   string id;
1892   try {
1893     bufferlist::iterator iter = read_bl.begin();
1894     ::decode(id, iter);
1895   } catch (const buffer::error &err) {
1896     return -EIO;
1897   }
1898
1899   ::encode(id, *out);
1900   return 0;
1901 }
1902
1903 /**
1904  * Set the id of an image. The object must already exist.
1905  *
1906  * Input:
1907  * @param id the id of the image, as an alpha-numeric string
1908  *
1909  * Output:
1910  * @returns 0 on success, -EEXIST if the atomic create fails,
1911  *          negative error code on other error
1912  */
1913 int set_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1914 {
1915   int r = check_exists(hctx);
1916   if (r < 0)
1917     return r;
1918
1919   string id;
1920   try {
1921     bufferlist::iterator iter = in->begin();
1922     ::decode(id, iter);
1923   } catch (const buffer::error &err) {
1924     return -EINVAL;
1925   }
1926
1927   if (!is_valid_id(id)) {
1928     CLS_ERR("set_id: invalid id '%s'", id.c_str());
1929     return -EINVAL;
1930   }
1931
1932   uint64_t size;
1933   r = cls_cxx_stat(hctx, &size, NULL);
1934   if (r < 0)
1935     return r;
1936   if (size != 0)
1937     return -EEXIST;
1938
1939   CLS_LOG(20, "set_id: id=%s", id.c_str());
1940
1941   bufferlist write_bl;
1942   ::encode(id, write_bl);
1943   return cls_cxx_write(hctx, 0, write_bl.length(), &write_bl);
1944 }
1945
1946 /*********************** methods for rbd_directory ***********************/
1947
1948 static const string dir_key_for_id(const string &id)
1949 {
1950   return RBD_DIR_ID_KEY_PREFIX + id;
1951 }
1952
1953 static const string dir_key_for_name(const string &name)
1954 {
1955   return RBD_DIR_NAME_KEY_PREFIX + name;
1956 }
1957
1958 static const string dir_name_from_key(const string &key)
1959 {
1960   return key.substr(strlen(RBD_DIR_NAME_KEY_PREFIX));
1961 }
1962
1963 static int dir_add_image_helper(cls_method_context_t hctx,
1964                                 const string &name, const string &id,
1965                                 bool check_for_unique_id)
1966 {
1967   if (!name.size() || !is_valid_id(id)) {
1968     CLS_ERR("dir_add_image_helper: invalid name '%s' or id '%s'",
1969             name.c_str(), id.c_str());
1970     return -EINVAL;
1971   }
1972
1973   CLS_LOG(20, "dir_add_image_helper name=%s id=%s", name.c_str(), id.c_str());
1974
1975   string tmp;
1976   string name_key = dir_key_for_name(name);
1977   string id_key = dir_key_for_id(id);
1978   int r = read_key(hctx, name_key, &tmp);
1979   if (r != -ENOENT) {
1980     CLS_LOG(10, "name already exists");
1981     return -EEXIST;
1982   }
1983   r = read_key(hctx, id_key, &tmp);
1984   if (r != -ENOENT && check_for_unique_id) {
1985     CLS_LOG(10, "id already exists");
1986     return -EBADF;
1987   }
1988   bufferlist id_bl, name_bl;
1989   ::encode(id, id_bl);
1990   ::encode(name, name_bl);
1991   map<string, bufferlist> omap_vals;
1992   omap_vals[name_key] = id_bl;
1993   omap_vals[id_key] = name_bl;
1994   return cls_cxx_map_set_vals(hctx, &omap_vals);
1995 }
1996
1997 static int dir_remove_image_helper(cls_method_context_t hctx,
1998                                    const string &name, const string &id)
1999 {
2000   CLS_LOG(20, "dir_remove_image_helper name=%s id=%s",
2001           name.c_str(), id.c_str());
2002
2003   string stored_name, stored_id;
2004   string name_key = dir_key_for_name(name);
2005   string id_key = dir_key_for_id(id);
2006   int r = read_key(hctx, name_key, &stored_id);
2007   if (r < 0) {
2008     if (r != -ENOENT)
2009       CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
2010     return r;
2011   }
2012   r = read_key(hctx, id_key, &stored_name);
2013   if (r < 0) {
2014     CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
2015     return r;
2016   }
2017
2018   // check if this op raced with a rename
2019   if (stored_name != name || stored_id != id) {
2020     CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
2021             stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
2022     return -ESTALE;
2023   }
2024
2025   r = cls_cxx_map_remove_key(hctx, name_key);
2026   if (r < 0) {
2027     CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
2028     return r;
2029   }
2030
2031   r = cls_cxx_map_remove_key(hctx, id_key);
2032   if (r < 0) {
2033     CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
2034     return r;
2035   }
2036
2037   return 0;
2038 }
2039
2040 /**
2041  * Rename an image in the directory, updating both indexes
2042  * atomically. This can't be done from the client calling
2043  * dir_add_image and dir_remove_image in one transaction because the
2044  * results of the first method are not visibale to later steps.
2045  *
2046  * Input:
2047  * @param src original name of the image
2048  * @param dest new name of the image
2049  * @param id the id of the image
2050  *
2051  * Output:
2052  * @returns -ESTALE if src and id do not map to each other
2053  * @returns -ENOENT if src or id are not in the directory
2054  * @returns -EEXIST if dest already exists
2055  * @returns 0 on success, negative error code on failure
2056  */
2057 int dir_rename_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2058 {
2059   string src, dest, id;
2060   try {
2061     bufferlist::iterator iter = in->begin();
2062     ::decode(src, iter);
2063     ::decode(dest, iter);
2064     ::decode(id, iter);
2065   } catch (const buffer::error &err) {
2066     return -EINVAL;
2067   }
2068
2069   int r = dir_remove_image_helper(hctx, src, id);
2070   if (r < 0)
2071     return r;
2072   // ignore duplicate id because the result of
2073   // remove_image_helper is not visible yet
2074   return dir_add_image_helper(hctx, dest, id, false);
2075 }
2076
2077 /**
2078  * Get the id of an image given its name.
2079  *
2080  * Input:
2081  * @param name the name of the image
2082  *
2083  * Output:
2084  * @param id the id of the image
2085  * @returns 0 on success, negative error code on failure
2086  */
2087 int dir_get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2088 {
2089   string name;
2090
2091   try {
2092     bufferlist::iterator iter = in->begin();
2093     ::decode(name, iter);
2094   } catch (const buffer::error &err) {
2095     return -EINVAL;
2096   }
2097
2098   CLS_LOG(20, "dir_get_id: name=%s", name.c_str());
2099
2100   string id;
2101   int r = read_key(hctx, dir_key_for_name(name), &id);
2102   if (r < 0) {
2103     if (r != -ENOENT)
2104       CLS_ERR("error reading id for name '%s': %s", name.c_str(), cpp_strerror(r).c_str());
2105     return r;
2106   }
2107   ::encode(id, *out);
2108   return 0;
2109 }
2110
2111 /**
2112  * Get the name of an image given its id.
2113  *
2114  * Input:
2115  * @param id the id of the image
2116  *
2117  * Output:
2118  * @param name the name of the image
2119  * @returns 0 on success, negative error code on failure
2120  */
2121 int dir_get_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2122 {
2123   string id;
2124
2125   try {
2126     bufferlist::iterator iter = in->begin();
2127     ::decode(id, iter);
2128   } catch (const buffer::error &err) {
2129     return -EINVAL;
2130   }
2131
2132   CLS_LOG(20, "dir_get_name: id=%s", id.c_str());
2133
2134   string name;
2135   int r = read_key(hctx, dir_key_for_id(id), &name);
2136   if (r < 0) {
2137     CLS_ERR("error reading name for id '%s': %s", id.c_str(), cpp_strerror(r).c_str());
2138     return r;
2139   }
2140   ::encode(name, *out);
2141   return 0;
2142 }
2143
2144 /**
2145  * List the names and ids of the images in the directory, sorted by
2146  * name.
2147  *
2148  * Input:
2149  * @param start_after which name to begin listing after
2150  *        (use the empty string to start at the beginning)
2151  * @param max_return the maximum number of names to list
2152  *
2153  * Output:
2154  * @param images map from name to id of up to max_return images
2155  * @returns 0 on success, negative error code on failure
2156  */
2157 int dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2158 {
2159   string start_after;
2160   uint64_t max_return;
2161
2162   try {
2163     bufferlist::iterator iter = in->begin();
2164     ::decode(start_after, iter);
2165     ::decode(max_return, iter);
2166   } catch (const buffer::error &err) {
2167     return -EINVAL;
2168   }
2169
2170   int max_read = RBD_MAX_KEYS_READ;
2171   map<string, string> images;
2172   string last_read = dir_key_for_name(start_after);
2173   bool more = true;
2174
2175   while (more && images.size() < max_return) {
2176     map<string, bufferlist> vals;
2177     CLS_LOG(20, "last_read = '%s'", last_read.c_str());
2178     int r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
2179                                  max_read, &vals, &more);
2180     if (r < 0) {
2181       CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
2182       return r;
2183     }
2184
2185     for (map<string, bufferlist>::iterator it = vals.begin();
2186          it != vals.end(); ++it) {
2187       string id;
2188       bufferlist::iterator iter = it->second.begin();
2189       try {
2190         ::decode(id, iter);
2191       } catch (const buffer::error &err) {
2192         CLS_ERR("could not decode id of image '%s'", it->first.c_str());
2193         return -EIO;
2194       }
2195       CLS_LOG(20, "adding '%s' -> '%s'", dir_name_from_key(it->first).c_str(), id.c_str());
2196       images[dir_name_from_key(it->first)] = id;
2197       if (images.size() >= max_return)
2198         break;
2199     }
2200     if (!vals.empty()) {
2201       last_read = dir_key_for_name(images.rbegin()->first);
2202     }
2203   }
2204
2205   ::encode(images, *out);
2206
2207   return 0;
2208 }
2209
2210 /**
2211  * Add an image to the rbd directory. Creates the directory object if
2212  * needed, and updates the index from id to name and name to id.
2213  *
2214  * Input:
2215  * @param name the name of the image
2216  * @param id the id of the image
2217  *
2218  * Output:
2219  * @returns -EEXIST if the image name is already in the directory
2220  * @returns -EBADF if the image id is already in the directory
2221  * @returns 0 on success, negative error code on failure
2222  */
2223 int dir_add_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2224 {
2225   int r = cls_cxx_create(hctx, false);
2226   if (r < 0) {
2227     CLS_ERR("could not create directory: %s", cpp_strerror(r).c_str());
2228     return r;
2229   }
2230
2231   string name, id;
2232   try {
2233     bufferlist::iterator iter = in->begin();
2234     ::decode(name, iter);
2235     ::decode(id, iter);
2236   } catch (const buffer::error &err) {
2237     return -EINVAL;
2238   }
2239
2240   return dir_add_image_helper(hctx, name, id, true);
2241 }
2242
2243 /**
2244  * Remove an image from the rbd directory.
2245  *
2246  * Input:
2247  * @param name the name of the image
2248  * @param id the id of the image
2249  *
2250  * Output:
2251  * @returns -ESTALE if the name and id do not map to each other
2252  * @returns 0 on success, negative error code on failure
2253  */
2254 int dir_remove_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2255 {
2256   string name, id;
2257   try {
2258     bufferlist::iterator iter = in->begin();
2259     ::decode(name, iter);
2260     ::decode(id, iter);
2261   } catch (const buffer::error &err) {
2262     return -EINVAL;
2263   }
2264
2265   return dir_remove_image_helper(hctx, name, id);
2266 }
2267
2268 int object_map_read(cls_method_context_t hctx, BitVector<2> &object_map)
2269 {
2270   uint64_t size;
2271   int r = cls_cxx_stat(hctx, &size, NULL);
2272   if (r < 0) {
2273     return r;
2274   }
2275   if (size == 0) {
2276     return -ENOENT;
2277   }
2278
2279   bufferlist bl;
2280   r = cls_cxx_read(hctx, 0, size, &bl);
2281   if (r < 0) {
2282    return r;
2283   }
2284
2285   try {
2286     bufferlist::iterator iter = bl.begin();
2287     ::decode(object_map, iter);
2288   } catch (const buffer::error &err) {
2289     CLS_ERR("failed to decode object map: %s", err.what());
2290     return -EINVAL;
2291   }
2292   return 0;
2293 }
2294
2295 /**
2296  * Load an rbd image's object map
2297  *
2298  * Input:
2299  * none
2300  *
2301  * Output:
2302  * @param object map bit vector
2303  * @returns 0 on success, negative error code on failure
2304  */
2305 int object_map_load(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2306 {
2307   BitVector<2> object_map;
2308   int r = object_map_read(hctx, object_map);
2309   if (r < 0) {
2310     return r;
2311   }
2312
2313   object_map.set_crc_enabled(false);
2314   ::encode(object_map, *out);
2315   return 0;
2316 }
2317
2318 /**
2319  * Save an rbd image's object map
2320  *
2321  * Input:
2322  * @param object map bit vector
2323  *
2324  * Output:
2325  * @returns 0 on success, negative error code on failure
2326  */
2327 int object_map_save(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2328 {
2329   BitVector<2> object_map;
2330   try {
2331     bufferlist::iterator iter = in->begin();
2332     ::decode(object_map, iter);
2333   } catch (const buffer::error &err) {
2334     return -EINVAL;
2335   }
2336
2337   object_map.set_crc_enabled(true);
2338
2339   bufferlist bl;
2340   ::encode(object_map, bl);
2341   CLS_LOG(20, "object_map_save: object size=%" PRIu64 ", byte size=%u",
2342           object_map.size(), bl.length());
2343   return cls_cxx_write_full(hctx, &bl);
2344 }
2345
2346 /**
2347  * Resize an rbd image's object map
2348  *
2349  * Input:
2350  * @param object_count the max number of objects in the image
2351  * @param default_state the default state of newly created objects
2352  *
2353  * Output:
2354  * @returns 0 on success, negative error code on failure
2355  */
2356 int object_map_resize(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2357 {
2358   uint64_t object_count;
2359   uint8_t default_state;
2360   try {
2361     bufferlist::iterator iter = in->begin();
2362     ::decode(object_count, iter);
2363     ::decode(default_state, iter);
2364   } catch (const buffer::error &err) {
2365     return -EINVAL;
2366   }
2367
2368   // protect against excessive memory requirements
2369   if (object_count > cls::rbd::MAX_OBJECT_MAP_OBJECT_COUNT) {
2370     CLS_ERR("object map too large: %" PRIu64, object_count);
2371     return -EINVAL;
2372   }
2373
2374   BitVector<2> object_map;
2375   int r = object_map_read(hctx, object_map);
2376   if ((r < 0) && (r != -ENOENT)) {
2377     return r;
2378   }
2379
2380   size_t orig_object_map_size = object_map.size();
2381   if (object_count < orig_object_map_size) {
2382     for (uint64_t i = object_count + 1; i < orig_object_map_size; ++i) {
2383       if (object_map[i] != default_state) {
2384         CLS_ERR("object map indicates object still exists: %" PRIu64, i);
2385         return -ESTALE;
2386       }
2387     }
2388     object_map.resize(object_count);
2389   } else if (object_count > orig_object_map_size) {
2390     object_map.resize(object_count);
2391     for (uint64_t i = orig_object_map_size; i < object_count; ++i) {
2392       object_map[i] = default_state;
2393     }
2394   }
2395
2396   bufferlist map;
2397   ::encode(object_map, map);
2398   CLS_LOG(20, "object_map_resize: object size=%" PRIu64 ", byte size=%u",
2399           object_count, map.length());
2400   return cls_cxx_write_full(hctx, &map);
2401 }
2402
2403 /**
2404  * Update an rbd image's object map
2405  *
2406  * Input:
2407  * @param start_object_no the start object iterator
2408  * @param end_object_no the end object iterator
2409  * @param new_object_state the new object state
2410  * @param current_object_state optional current object state filter
2411  *
2412  * Output:
2413  * @returns 0 on success, negative error code on failure
2414  */
2415 int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2416 {
2417   uint64_t start_object_no;
2418   uint64_t end_object_no;
2419   uint8_t new_object_state;
2420   boost::optional<uint8_t> current_object_state;
2421   try {
2422     bufferlist::iterator iter = in->begin();
2423     ::decode(start_object_no, iter);
2424     ::decode(end_object_no, iter);
2425     ::decode(new_object_state, iter);
2426     ::decode(current_object_state, iter);
2427   } catch (const buffer::error &err) {
2428     CLS_ERR("failed to decode message");
2429     return -EINVAL;
2430   }
2431
2432   uint64_t size;
2433   int r = cls_cxx_stat(hctx, &size, NULL);
2434   if (r < 0) {
2435     return r;
2436   }
2437
2438   BitVector<2> object_map;
2439   bufferlist header_bl;
2440   r = cls_cxx_read2(hctx, 0, object_map.get_header_length(), &header_bl,
2441                     CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2442   if (r < 0) {
2443     CLS_ERR("object map header read failed");
2444     return r;
2445   }
2446
2447   try {
2448     bufferlist::iterator it = header_bl.begin();
2449     object_map.decode_header(it);
2450   } catch (const buffer::error &err) {
2451     CLS_ERR("failed to decode object map header: %s", err.what());
2452     return -EINVAL;
2453   }
2454
2455   bufferlist footer_bl;
2456   r = cls_cxx_read2(hctx, object_map.get_footer_offset(),
2457                     size - object_map.get_footer_offset(), &footer_bl,
2458                     CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2459   if (r < 0) {
2460     CLS_ERR("object map footer read failed");
2461     return r;
2462   }
2463
2464   try {
2465     bufferlist::iterator it = footer_bl.begin();
2466     object_map.decode_footer(it);
2467   } catch (const buffer::error &err) {
2468     CLS_ERR("failed to decode object map footer: %s", err.what());
2469   }
2470
2471   if (start_object_no >= end_object_no || end_object_no > object_map.size()) {
2472     return -ERANGE;
2473   }
2474
2475   uint64_t byte_offset;
2476   uint64_t byte_length;
2477   object_map.get_data_extents(start_object_no,
2478                               end_object_no - start_object_no,
2479                               &byte_offset, &byte_length);
2480
2481   bufferlist data_bl;
2482   r = cls_cxx_read2(hctx, object_map.get_header_length() + byte_offset,
2483                     byte_length, &data_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2484   if (r < 0) {
2485     CLS_ERR("object map data read failed");
2486     return r;
2487   }
2488
2489   try {
2490     bufferlist::iterator it = data_bl.begin();
2491     object_map.decode_data(it, byte_offset);
2492   } catch (const buffer::error &err) {
2493     CLS_ERR("failed to decode data chunk [%" PRIu64 "]: %s",
2494             byte_offset, err.what());
2495     return -EINVAL;
2496   }
2497
2498   bool updated = false;
2499   auto it = object_map.begin() + start_object_no;
2500   auto end_it = object_map.begin() + end_object_no;
2501   for (; it != end_it; ++it) {
2502     uint8_t state = *it;
2503     if ((!current_object_state || state == *current_object_state ||
2504         (*current_object_state == OBJECT_EXISTS &&
2505          state == OBJECT_EXISTS_CLEAN)) && state != new_object_state) {
2506       *it = new_object_state;
2507       updated = true;
2508     }
2509   }
2510
2511   if (updated) {
2512     CLS_LOG(20, "object_map_update: %" PRIu64 "~%" PRIu64 " -> %" PRIu64,
2513             byte_offset, byte_length,
2514             object_map.get_header_length() + byte_offset);
2515
2516     bufferlist data_bl;
2517     object_map.encode_data(data_bl, byte_offset, byte_length);
2518     r = cls_cxx_write2(hctx, object_map.get_header_length() + byte_offset,
2519                        data_bl.length(), &data_bl,
2520                        CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2521     if (r < 0) {
2522       CLS_ERR("failed to write object map header: %s", cpp_strerror(r).c_str());
2523       return r;
2524     }
2525
2526     footer_bl.clear();
2527     object_map.encode_footer(footer_bl);
2528     r = cls_cxx_write2(hctx, object_map.get_footer_offset(), footer_bl.length(),
2529                        &footer_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2530     if (r < 0) {
2531       CLS_ERR("failed to write object map footer: %s", cpp_strerror(r).c_str());
2532       return r;
2533     }
2534   } else {
2535     CLS_LOG(20, "object_map_update: no update necessary");
2536   }
2537
2538   return 0;
2539 }
2540
2541 /**
2542  * Mark all _EXISTS objects as _EXISTS_CLEAN so future writes to the
2543  * image HEAD can be tracked.
2544  *
2545  * Input:
2546  * none
2547  *
2548  * Output:
2549  * @returns 0 on success, negative error code on failure
2550  */
2551 int object_map_snap_add(cls_method_context_t hctx, bufferlist *in,
2552                         bufferlist *out)
2553 {
2554   BitVector<2> object_map;
2555   int r = object_map_read(hctx, object_map);
2556   if (r < 0) {
2557     return r;
2558   }
2559
2560   bool updated = false;
2561   for (uint64_t i = 0; i < object_map.size(); ++i) {
2562     if (object_map[i] == OBJECT_EXISTS) {
2563       object_map[i] = OBJECT_EXISTS_CLEAN;
2564       updated = true;
2565     }
2566   }
2567
2568   if (updated) {
2569     bufferlist bl;
2570     ::encode(object_map, bl);
2571     r = cls_cxx_write_full(hctx, &bl);
2572   }
2573   return r;
2574 }
2575
2576 /**
2577  * Mark all _EXISTS_CLEAN objects as _EXISTS in the current object map
2578  * if the provided snapshot object map object is marked as _EXISTS.
2579  *
2580  * Input:
2581  * @param snapshot object map bit vector
2582  *
2583  * Output:
2584  * @returns 0 on success, negative error code on failure
2585  */
2586 int object_map_snap_remove(cls_method_context_t hctx, bufferlist *in,
2587                            bufferlist *out)
2588 {
2589   BitVector<2> src_object_map;
2590   try {
2591     bufferlist::iterator iter = in->begin();
2592     ::decode(src_object_map, iter);
2593   } catch (const buffer::error &err) {
2594     return -EINVAL;
2595   }
2596
2597   BitVector<2> dst_object_map;
2598   int r = object_map_read(hctx, dst_object_map);
2599   if (r < 0) {
2600     return r;
2601   }
2602
2603   bool updated = false;
2604   for (uint64_t i = 0; i < dst_object_map.size(); ++i) {
2605     if (dst_object_map[i] == OBJECT_EXISTS_CLEAN &&
2606         (i >= src_object_map.size() || src_object_map[i] == OBJECT_EXISTS)) {
2607       dst_object_map[i] = OBJECT_EXISTS;
2608       updated = true;
2609     }
2610   }
2611
2612   if (updated) {
2613     bufferlist bl;
2614     ::encode(dst_object_map, bl);
2615     r = cls_cxx_write_full(hctx, &bl);
2616   }
2617   return r;
2618 }
2619
2620 static const string metadata_key_for_name(const string &name)
2621 {
2622   return RBD_METADATA_KEY_PREFIX + name;
2623 }
2624
2625 static const string metadata_name_from_key(const string &key)
2626 {
2627   return key.substr(strlen(RBD_METADATA_KEY_PREFIX));
2628 }
2629
2630 /**
2631  * Input:
2632  * @param start_after which name to begin listing after
2633  *        (use the empty string to start at the beginning)
2634  * @param max_return the maximum number of names to list
2635
2636  * Output:
2637  * @param value
2638  * @returns 0 on success, negative error code on failure
2639  */
2640 int metadata_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2641 {
2642   string start_after;
2643   uint64_t max_return;
2644
2645   try {
2646     bufferlist::iterator iter = in->begin();
2647     ::decode(start_after, iter);
2648     ::decode(max_return, iter);
2649   } catch (const buffer::error &err) {
2650     return -EINVAL;
2651   }
2652
2653   // TODO remove implicit support for zero during the N-release
2654   if (max_return == 0) {
2655     max_return = RBD_MAX_KEYS_READ;
2656   }
2657
2658   map<string, bufferlist> data;
2659   string last_read = metadata_key_for_name(start_after);
2660   bool more = true;
2661
2662   while (more && data.size() < max_return) {
2663     map<string, bufferlist> raw_data;
2664     int max_read = MIN(RBD_MAX_KEYS_READ, max_return - data.size());
2665     int r = cls_cxx_map_get_vals(hctx, last_read, RBD_METADATA_KEY_PREFIX,
2666                                  max_read, &raw_data, &more);
2667     if (r < 0) {
2668       CLS_ERR("failed to read the vals off of disk: %s", cpp_strerror(r).c_str());
2669       return r;
2670     }
2671
2672     for (auto& kv : raw_data) {
2673       data[metadata_name_from_key(kv.first)].swap(kv.second);
2674     }
2675
2676     if (!raw_data.empty()) {
2677       last_read = raw_data.rbegin()->first;
2678     }
2679   }
2680
2681   ::encode(data, *out);
2682   return 0;
2683 }
2684
2685 /**
2686  * Input:
2687  * @param data <map(key, value)>
2688  *
2689  * Output:
2690  * @returns 0 on success, negative error code on failure
2691  */
2692 int metadata_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2693 {
2694   map<string, bufferlist> data, raw_data;
2695
2696   bufferlist::iterator iter = in->begin();
2697   try {
2698     ::decode(data, iter);
2699   } catch (const buffer::error &err) {
2700     return -EINVAL;
2701   }
2702
2703   for (map<string, bufferlist>::iterator it = data.begin();
2704        it != data.end(); ++it) {
2705     CLS_LOG(20, "metdata_set key=%s value=%.*s", it->first.c_str(),
2706             it->second.length(), it->second.c_str());
2707     raw_data[metadata_key_for_name(it->first)].swap(it->second);
2708   }
2709   int r = cls_cxx_map_set_vals(hctx, &raw_data);
2710   if (r < 0) {
2711     CLS_ERR("error writing metadata: %s", cpp_strerror(r).c_str());
2712     return r;
2713   }
2714
2715   return 0;
2716 }
2717
2718 /**
2719  * Input:
2720  * @param key
2721  *
2722  * Output:
2723  * @returns 0 on success, negative error code on failure
2724  */
2725 int metadata_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2726 {
2727   string key;
2728
2729   bufferlist::iterator iter = in->begin();
2730   try {
2731     ::decode(key, iter);
2732   } catch (const buffer::error &err) {
2733     return -EINVAL;
2734   }
2735
2736   CLS_LOG(20, "metdata_set key=%s", key.c_str());
2737
2738   int r = cls_cxx_map_remove_key(hctx, metadata_key_for_name(key));
2739   if (r < 0) {
2740     CLS_ERR("error remove metadata: %s", cpp_strerror(r).c_str());
2741     return r;
2742   }
2743
2744   return 0;
2745 }
2746
2747 /**
2748  * Input:
2749  * @param key
2750  *
2751  * Output:
2752  * @param metadata value associated with the key
2753  * @returns 0 on success, negative error code on failure
2754  */
2755 int metadata_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2756 {
2757   string key;
2758   bufferlist value;
2759
2760   bufferlist::iterator iter = in->begin();
2761   try {
2762     ::decode(key, iter);
2763   } catch (const buffer::error &err) {
2764     return -EINVAL;
2765   }
2766
2767   CLS_LOG(20, "metdata_get key=%s", key.c_str());
2768
2769   int r = cls_cxx_map_get_val(hctx, metadata_key_for_name(key), &value);
2770   if (r < 0) {
2771     CLS_ERR("error get metadata: %s", cpp_strerror(r).c_str());
2772     return r;
2773   }
2774
2775   ::encode(value, *out);
2776   return 0;
2777 }
2778
2779 int snapshot_get_limit(cls_method_context_t hctx, bufferlist *in,
2780                        bufferlist *out)
2781 {
2782   uint64_t snap_limit;
2783   int r = read_key(hctx, "snap_limit", &snap_limit);
2784   if (r == -ENOENT) {
2785     snap_limit = UINT64_MAX;
2786   } else if (r < 0) {
2787     CLS_ERR("error retrieving snapshot limit: %s", cpp_strerror(r).c_str());
2788     return r;
2789   }
2790
2791   CLS_LOG(20, "read snapshot limit %" PRIu64, snap_limit);
2792   ::encode(snap_limit, *out);
2793
2794   return 0;
2795 }
2796
2797 int snapshot_set_limit(cls_method_context_t hctx, bufferlist *in,
2798                        bufferlist *out)
2799 {
2800   int rc;
2801   uint64_t new_limit;
2802   bufferlist bl;
2803
2804   try {
2805     bufferlist::iterator iter = in->begin();
2806     ::decode(new_limit, iter);
2807   } catch (const buffer::error &err) {
2808     return -EINVAL;
2809   }
2810
2811   if (new_limit == UINT64_MAX) {
2812     CLS_LOG(20, "remove snapshot limit\n");
2813     rc = cls_cxx_map_remove_key(hctx, "snap_limit");
2814   } else {
2815     CLS_LOG(20, "set snapshot limit to %" PRIu64 "\n", new_limit);
2816     ::encode(new_limit, bl);
2817     rc = cls_cxx_map_set_val(hctx, "snap_limit", &bl);
2818   }
2819
2820   return rc;
2821 }
2822
2823
2824 /****************************** Old format *******************************/
2825
2826 int old_snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2827 {
2828   bufferlist bl;
2829   struct rbd_obj_header_ondisk *header;
2830   int rc = snap_read_header(hctx, bl);
2831   if (rc < 0)
2832     return rc;
2833
2834   header = (struct rbd_obj_header_ondisk *)bl.c_str();
2835   bufferptr p(header->snap_names_len);
2836   char *buf = (char *)header;
2837   char *name = buf + sizeof(*header) + header->snap_count * sizeof(struct rbd_obj_snap_ondisk);
2838   char *end = name + header->snap_names_len;
2839   memcpy(p.c_str(),
2840          buf + sizeof(*header) + header->snap_count * sizeof(struct rbd_obj_snap_ondisk),
2841          header->snap_names_len);
2842
2843   ::encode(header->snap_seq, *out);
2844   ::encode(header->snap_count, *out);
2845
2846   for (unsigned i = 0; i < header->snap_count; i++) {
2847     string s = name;
2848     ::encode(header->snaps[i].id, *out);
2849     ::encode(header->snaps[i].image_size, *out);
2850     ::encode(s, *out);
2851
2852     name += strlen(name) + 1;
2853     if (name > end)
2854       return -EIO;
2855   }
2856
2857   return 0;
2858 }
2859
2860 int old_snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2861 {
2862   bufferlist bl;
2863   struct rbd_obj_header_ondisk *header;
2864   bufferlist newbl;
2865   bufferptr header_bp(sizeof(*header));
2866   struct rbd_obj_snap_ondisk *new_snaps;
2867
2868   int rc = snap_read_header(hctx, bl);
2869   if (rc < 0)
2870     return rc;
2871
2872   header = (struct rbd_obj_header_ondisk *)bl.c_str();
2873
2874   int snaps_id_ofs = sizeof(*header);
2875   int names_ofs = snaps_id_ofs + sizeof(*new_snaps) * header->snap_count;
2876   const char *snap_name;
2877   const char *snap_names = ((char *)header) + names_ofs;
2878   const char *end = snap_names + header->snap_names_len;
2879   bufferlist::iterator iter = in->begin();
2880   string s;
2881   uint64_t snap_id;
2882
2883   try {
2884     ::decode(s, iter);
2885     ::decode(snap_id, iter);
2886   } catch (const buffer::error &err) {
2887     return -EINVAL;
2888   }
2889   snap_name = s.c_str();
2890
2891   if (header->snap_seq > snap_id)
2892     return -ESTALE;
2893
2894   uint64_t snap_limit;
2895   rc = read_key(hctx, "snap_limit", &snap_limit);
2896   if (rc == -ENOENT) {
2897     snap_limit = UINT64_MAX;
2898   } else if (rc < 0) {
2899     return rc;
2900   }
2901
2902   if (header->snap_count >= snap_limit)
2903     return -EDQUOT;
2904
2905   const char *cur_snap_name;
2906   for (cur_snap_name = snap_names; cur_snap_name < end; cur_snap_name += strlen(cur_snap_name) + 1) {
2907     if (strncmp(cur_snap_name, snap_name, end - cur_snap_name) == 0)
2908       return -EEXIST;
2909   }
2910   if (cur_snap_name > end)
2911     return -EIO;
2912
2913   int snap_name_len = strlen(snap_name);
2914
2915   bufferptr new_names_bp(header->snap_names_len + snap_name_len + 1);
2916   bufferptr new_snaps_bp(sizeof(*new_snaps) * (header->snap_count + 1));
2917
2918   /* copy snap names and append to new snap name */
2919   char *new_snap_names = new_names_bp.c_str();
2920   strcpy(new_snap_names, snap_name);
2921   memcpy(new_snap_names + snap_name_len + 1, snap_names, header->snap_names_len);
2922
2923   /* append new snap id */
2924   new_snaps = (struct rbd_obj_snap_ondisk *)new_snaps_bp.c_str();
2925   memcpy(new_snaps + 1, header->snaps, sizeof(*new_snaps) * header->snap_count);
2926
2927   header->snap_count = header->snap_count + 1;
2928   header->snap_names_len = header->snap_names_len + snap_name_len + 1;
2929   header->snap_seq = snap_id;
2930
2931   new_snaps[0].id = snap_id;
2932   new_snaps[0].image_size = header->image_size;
2933
2934   memcpy(header_bp.c_str(), header, sizeof(*header));
2935
2936   newbl.push_back(header_bp);
2937   newbl.push_back(new_snaps_bp);
2938   newbl.push_back(new_names_bp);
2939
2940   rc = cls_cxx_write_full(hctx, &newbl);
2941   if (rc < 0)
2942     return rc;
2943
2944   return 0;
2945 }
2946
2947 int old_snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2948 {
2949   bufferlist bl;
2950   struct rbd_obj_header_ondisk *header;
2951   bufferlist newbl;
2952   bufferptr header_bp(sizeof(*header));
2953
2954   int rc = snap_read_header(hctx, bl);
2955   if (rc < 0)
2956     return rc;
2957
2958   header = (struct rbd_obj_header_ondisk *)bl.c_str();
2959
2960   int snaps_id_ofs = sizeof(*header);
2961   int names_ofs = snaps_id_ofs + sizeof(struct rbd_obj_snap_ondisk) * header->snap_count;
2962   const char *snap_name;
2963   const char *snap_names = ((char *)header) + names_ofs;
2964   const char *orig_names = snap_names;
2965   const char *end = snap_names + header->snap_names_len;
2966   bufferlist::iterator iter = in->begin();
2967   string s;
2968   unsigned i;
2969   bool found = false;
2970   struct rbd_obj_snap_ondisk snap;
2971
2972   try {
2973     ::decode(s, iter);
2974   } catch (const buffer::error &err) {
2975     return -EINVAL;
2976   }
2977   snap_name = s.c_str();
2978
2979   for (i = 0; snap_names < end; i++) {
2980     if (strcmp(snap_names, snap_name) == 0) {
2981       snap = header->snaps[i];
2982       found = true;
2983       break;
2984     }
2985     snap_names += strlen(snap_names) + 1;
2986   }
2987   if (!found) {
2988     CLS_ERR("couldn't find snap %s\n", snap_name);
2989     return -ENOENT;
2990   }
2991
2992   header->snap_names_len  = header->snap_names_len - (s.length() + 1);
2993   header->snap_count = header->snap_count - 1;
2994
2995   bufferptr new_names_bp(header->snap_names_len);
2996   bufferptr new_snaps_bp(sizeof(header->snaps[0]) * header->snap_count);
2997
2998   memcpy(header_bp.c_str(), header, sizeof(*header));
2999   newbl.push_back(header_bp);
3000
3001   if (header->snap_count) {
3002     int snaps_len = 0;
3003     int names_len = 0;
3004     CLS_LOG(20, "i=%u\n", i);
3005     if (i > 0) {
3006       snaps_len = sizeof(header->snaps[0]) * i;
3007       names_len =  snap_names - orig_names;
3008       memcpy(new_snaps_bp.c_str(), header->snaps, snaps_len);
3009       memcpy(new_names_bp.c_str(), orig_names, names_len);
3010     }
3011     snap_names += s.length() + 1;
3012
3013     if (i < header->snap_count) {
3014       memcpy(new_snaps_bp.c_str() + snaps_len,
3015              header->snaps + i + 1,
3016              sizeof(header->snaps[0]) * (header->snap_count - i));
3017       memcpy(new_names_bp.c_str() + names_len, snap_names , end - snap_names);
3018     }
3019     newbl.push_back(new_snaps_bp);
3020     newbl.push_back(new_names_bp);
3021   }
3022
3023   rc = cls_cxx_write_full(hctx, &newbl);
3024   if (rc < 0)
3025     return rc;
3026
3027   return 0;
3028 }
3029
3030 /**
3031  * rename snapshot of old format.
3032  *
3033  * Input:
3034  * @param src_snap_id old snap id of the snapshot (snapid_t)
3035  * @param dst_snap_name new name of the snapshot (string)
3036  *
3037  * Output:
3038  * @returns 0 on success, negative error code on failure.
3039 */
3040 int old_snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3041 {
3042   bufferlist bl;
3043   struct rbd_obj_header_ondisk *header;
3044   bufferlist newbl;
3045   bufferptr header_bp(sizeof(*header));
3046   snapid_t src_snap_id;
3047   const char *dst_snap_name;
3048   string dst;
3049
3050   int rc = snap_read_header(hctx, bl);
3051   if (rc < 0)
3052     return rc;
3053
3054   header = (struct rbd_obj_header_ondisk *)bl.c_str();
3055
3056   int snaps_id_ofs = sizeof(*header);
3057   int names_ofs = snaps_id_ofs + sizeof(rbd_obj_snap_ondisk) * header->snap_count;
3058   const char *snap_names = ((char *)header) + names_ofs;
3059   const char *orig_names = snap_names;
3060   const char *end = snap_names + header->snap_names_len;
3061   bufferlist::iterator iter = in->begin();
3062   unsigned i;
3063   bool found = false;
3064
3065   try {
3066     ::decode(src_snap_id, iter);
3067     ::decode(dst, iter);
3068   } catch (const buffer::error &err) {
3069     return -EINVAL;
3070   }
3071   dst_snap_name = dst.c_str();
3072
3073   const char *cur_snap_name;
3074   for (cur_snap_name = snap_names; cur_snap_name < end; 
3075     cur_snap_name += strlen(cur_snap_name) + 1) {
3076     if (strcmp(cur_snap_name, dst_snap_name) == 0)
3077       return -EEXIST;
3078   }
3079   if (cur_snap_name > end)
3080     return -EIO;
3081   for (i = 0; i < header->snap_count; i++) {
3082     if (src_snap_id == header->snaps[i].id) {
3083       found = true;
3084       break;
3085     }
3086     snap_names += strlen(snap_names) + 1;
3087   }
3088   if (!found) {
3089     CLS_ERR("couldn't find snap %llu\n", (unsigned long long)src_snap_id.val);
3090     return -ENOENT;
3091   }
3092   
3093   CLS_LOG(20, "rename snap with snap id %llu to dest name %s", (unsigned long long)src_snap_id.val, dst_snap_name);
3094   header->snap_names_len  = header->snap_names_len - strlen(snap_names) + dst.length();
3095
3096   bufferptr new_names_bp(header->snap_names_len);
3097   bufferptr new_snaps_bp(sizeof(header->snaps[0]) * header->snap_count);
3098
3099   if (header->snap_count) {
3100     int names_len = 0;
3101     CLS_LOG(20, "i=%u\n", i);
3102     if (i > 0) {
3103       names_len =  snap_names - orig_names;
3104       memcpy(new_names_bp.c_str(), orig_names, names_len);
3105     }
3106     strcpy(new_names_bp.c_str() + names_len, dst_snap_name);
3107     names_len += strlen(dst_snap_name) + 1;
3108     snap_names += strlen(snap_names) + 1;
3109     if (i < header->snap_count) {
3110       memcpy(new_names_bp.c_str() + names_len, snap_names , end - snap_names);
3111     }
3112     memcpy(new_snaps_bp.c_str(), header->snaps, sizeof(header->snaps[0]) * header->snap_count);
3113   }
3114
3115   memcpy(header_bp.c_str(), header, sizeof(*header));
3116   newbl.push_back(header_bp);
3117   newbl.push_back(new_snaps_bp);
3118   newbl.push_back(new_names_bp);
3119
3120   rc = cls_cxx_write_full(hctx, &newbl);
3121   if (rc < 0)
3122     return rc;
3123   return 0;
3124 }
3125
3126
3127 namespace mirror {
3128
3129 static const std::string UUID("mirror_uuid");
3130 static const std::string MODE("mirror_mode");
3131 static const std::string PEER_KEY_PREFIX("mirror_peer_");
3132 static const std::string IMAGE_KEY_PREFIX("image_");
3133 static const std::string GLOBAL_KEY_PREFIX("global_");
3134 static const std::string STATUS_GLOBAL_KEY_PREFIX("status_global_");
3135 static const std::string INSTANCE_KEY_PREFIX("instance_");
3136
3137 std::string peer_key(const std::string &uuid) {
3138   return PEER_KEY_PREFIX + uuid;
3139 }
3140
3141 std::string image_key(const string &image_id) {
3142   return IMAGE_KEY_PREFIX + image_id;
3143 }
3144
3145 std::string global_key(const string &global_id) {
3146   return GLOBAL_KEY_PREFIX + global_id;
3147 }
3148
3149 std::string status_global_key(const string &global_id) {
3150   return STATUS_GLOBAL_KEY_PREFIX + global_id;
3151 }
3152
3153 std::string instance_key(const string &instance_id) {
3154   return INSTANCE_KEY_PREFIX + instance_id;
3155 }
3156
3157 int uuid_get(cls_method_context_t hctx, std::string *mirror_uuid) {
3158   bufferlist mirror_uuid_bl;
3159   int r = cls_cxx_map_get_val(hctx, mirror::UUID, &mirror_uuid_bl);
3160   if (r < 0) {
3161     if (r != -ENOENT) {
3162       CLS_ERR("error reading mirror uuid: %s", cpp_strerror(r).c_str());
3163     }
3164     return r;
3165   }
3166
3167   *mirror_uuid = std::string(mirror_uuid_bl.c_str(), mirror_uuid_bl.length());
3168   return 0;
3169 }
3170
3171 int list_watchers(cls_method_context_t hctx,
3172                   std::set<entity_inst_t> *entities) {
3173   obj_list_watch_response_t watchers;
3174   int r = cls_cxx_list_watchers(hctx, &watchers);
3175   if (r < 0 && r != -ENOENT) {
3176     CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
3177     return r;
3178   }
3179
3180   entities->clear();
3181   for (auto &w : watchers.entries) {
3182     entities->emplace(w.name, w.addr);
3183   }
3184   return 0;
3185 }
3186
3187 int read_peers(cls_method_context_t hctx,
3188                std::vector<cls::rbd::MirrorPeer> *peers) {
3189   std::string last_read = PEER_KEY_PREFIX;
3190   int max_read = RBD_MAX_KEYS_READ;
3191   bool more = true;
3192   while (more) {
3193     std::map<std::string, bufferlist> vals;
3194     int r = cls_cxx_map_get_vals(hctx, last_read, PEER_KEY_PREFIX.c_str(),
3195                                  max_read, &vals, &more);
3196     if (r < 0) {
3197       CLS_ERR("error reading peers: %s", cpp_strerror(r).c_str());
3198       return r;
3199     }
3200
3201     for (auto &it : vals) {
3202       try {
3203         bufferlist::iterator bl_it = it.second.begin();
3204         cls::rbd::MirrorPeer peer;
3205         ::decode(peer, bl_it);
3206         peers->push_back(peer);
3207       } catch (const buffer::error &err) {
3208         CLS_ERR("could not decode peer '%s'", it.first.c_str());
3209         return -EIO;
3210       }
3211     }
3212
3213     if (!vals.empty()) {
3214       last_read = vals.rbegin()->first;
3215     }
3216   }
3217   return 0;
3218 }
3219
3220 int read_peer(cls_method_context_t hctx, const std::string &id,
3221               cls::rbd::MirrorPeer *peer) {
3222   bufferlist bl;
3223   int r = cls_cxx_map_get_val(hctx, peer_key(id), &bl);
3224   if (r < 0) {
3225     CLS_ERR("error reading peer '%s': %s", id.c_str(),
3226             cpp_strerror(r).c_str());
3227     return r;
3228   }
3229
3230   try {
3231     bufferlist::iterator bl_it = bl.begin();
3232     ::decode(*peer, bl_it);
3233   } catch (const buffer::error &err) {
3234     CLS_ERR("could not decode peer '%s'", id.c_str());
3235     return -EIO;
3236   }
3237   return 0;
3238 }
3239
3240 int write_peer(cls_method_context_t hctx, const std::string &id,
3241                const cls::rbd::MirrorPeer &peer) {
3242   bufferlist bl;
3243   ::encode(peer, bl);
3244
3245   int r = cls_cxx_map_set_val(hctx, peer_key(id), &bl);
3246   if (r < 0) {
3247     CLS_ERR("error writing peer '%s': %s", id.c_str(),
3248             cpp_strerror(r).c_str());
3249     return r;
3250   }
3251   return 0;
3252 }
3253
3254 int image_get(cls_method_context_t hctx, const string &image_id,
3255               cls::rbd::MirrorImage *mirror_image) {
3256   bufferlist bl;
3257   int r = cls_cxx_map_get_val(hctx, image_key(image_id), &bl);
3258   if (r < 0) {
3259     if (r != -ENOENT) {
3260       CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
3261               cpp_strerror(r).c_str());
3262     }
3263     return r;
3264   }
3265
3266   try {
3267     bufferlist::iterator it = bl.begin();
3268     ::decode(*mirror_image, it);
3269   } catch (const buffer::error &err) {
3270     CLS_ERR("could not decode mirrored image '%s'", image_id.c_str());
3271     return -EIO;
3272   }
3273
3274   return 0;
3275 }
3276
3277 int image_set(cls_method_context_t hctx, const string &image_id,
3278               const cls::rbd::MirrorImage &mirror_image) {
3279   bufferlist bl;
3280   ::encode(mirror_image, bl);
3281
3282   cls::rbd::MirrorImage existing_mirror_image;
3283   int r = image_get(hctx, image_id, &existing_mirror_image);
3284   if (r == -ENOENT) {
3285     // make sure global id doesn't already exist
3286     std::string global_id_key = global_key(mirror_image.global_image_id);
3287     std::string image_id;
3288     r = read_key(hctx, global_id_key, &image_id);
3289     if (r >= 0) {
3290       return -EEXIST;
3291     } else if (r != -ENOENT) {
3292       CLS_ERR("error reading global image id: '%s': '%s'", image_id.c_str(),
3293               cpp_strerror(r).c_str());
3294       return r;
3295     }
3296
3297     // make sure this was not a race for disabling
3298     if (mirror_image.state == cls::rbd::MIRROR_IMAGE_STATE_DISABLING) {
3299       CLS_ERR("image '%s' is already disabled", image_id.c_str());
3300       return r;
3301     }
3302   } else if (r < 0) {
3303     CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
3304             cpp_strerror(r).c_str());
3305     return r;
3306   } else if (existing_mirror_image.global_image_id !=
3307                 mirror_image.global_image_id) {
3308     // cannot change the global id
3309     return -EINVAL;
3310   }
3311
3312   r = cls_cxx_map_set_val(hctx, image_key(image_id), &bl);
3313   if (r < 0) {
3314     CLS_ERR("error adding mirrored image '%s': %s", image_id.c_str(),
3315             cpp_strerror(r).c_str());
3316     return r;
3317   }
3318
3319   bufferlist image_id_bl;
3320   ::encode(image_id, image_id_bl);
3321   r = cls_cxx_map_set_val(hctx, global_key(mirror_image.global_image_id),
3322                           &image_id_bl);
3323   if (r < 0) {
3324     CLS_ERR("error adding global id for image '%s': %s", image_id.c_str(),
3325             cpp_strerror(r).c_str());
3326     return r;
3327   }
3328   return 0;
3329 }
3330
3331 int image_remove(cls_method_context_t hctx, const string &image_id) {
3332   bufferlist bl;
3333   cls::rbd::MirrorImage mirror_image;
3334   int r = image_get(hctx, image_id, &mirror_image);
3335   if (r < 0) {
3336     if (r != -ENOENT) {
3337       CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
3338               cpp_strerror(r).c_str());
3339     }
3340     return r;
3341   }
3342
3343   if (mirror_image.state != cls::rbd::MIRROR_IMAGE_STATE_DISABLING) {
3344     return -EBUSY;
3345   }
3346
3347   r = cls_cxx_map_remove_key(hctx, image_key(image_id));
3348   if (r < 0) {
3349     CLS_ERR("error removing mirrored image '%s': %s", image_id.c_str(),
3350             cpp_strerror(r).c_str());
3351     return r;
3352   }
3353
3354   r = cls_cxx_map_remove_key(hctx, global_key(mirror_image.global_image_id));
3355   if (r < 0 && r != -ENOENT) {
3356     CLS_ERR("error removing global id for image '%s': %s", image_id.c_str(),
3357            cpp_strerror(r).c_str());
3358     return r;
3359   }
3360
3361   r = cls_cxx_map_remove_key(hctx,
3362                              status_global_key(mirror_image.global_image_id));
3363   if (r < 0 && r != -ENOENT) {
3364     CLS_ERR("error removing global status for image '%s': %s", image_id.c_str(),
3365            cpp_strerror(r).c_str());
3366     return r;
3367   }
3368
3369   return 0;
3370 }
3371
3372 struct MirrorImageStatusOnDisk : cls::rbd::MirrorImageStatus {
3373   entity_inst_t origin;
3374
3375   MirrorImageStatusOnDisk() {
3376   }
3377   MirrorImageStatusOnDisk(const cls::rbd::MirrorImageStatus &status) :
3378     cls::rbd::MirrorImageStatus(status) {
3379   }
3380
3381   void encode_meta(bufferlist &bl, uint64_t features) const {
3382     ENCODE_START(1, 1, bl);
3383     ::encode(origin, bl, features);
3384     ENCODE_FINISH(bl);
3385   }
3386
3387   void encode(bufferlist &bl, uint64_t features) const {
3388     encode_meta(bl, features);
3389     cls::rbd::MirrorImageStatus::encode(bl);
3390   }
3391
3392   void decode_meta(bufferlist::iterator &it) {
3393     DECODE_START(1, it);
3394     ::decode(origin, it);
3395     DECODE_FINISH(it);
3396   }
3397
3398   void decode(bufferlist::iterator &it) {
3399     decode_meta(it);
3400     cls::rbd::MirrorImageStatus::decode(it);
3401   }
3402 };
3403 WRITE_CLASS_ENCODER_FEATURES(MirrorImageStatusOnDisk)
3404
3405 int image_status_set(cls_method_context_t hctx, const string &global_image_id,
3406                      const cls::rbd::MirrorImageStatus &status) {
3407   MirrorImageStatusOnDisk ondisk_status(status);
3408   ondisk_status.up = false;
3409   ondisk_status.last_update = ceph_clock_now();
3410
3411   int r = cls_get_request_origin(hctx, &ondisk_status.origin);
3412   assert(r == 0);
3413
3414   bufferlist bl;
3415   encode(ondisk_status, bl, cls_get_features(hctx));
3416
3417   r = cls_cxx_map_set_val(hctx, status_global_key(global_image_id), &bl);
3418   if (r < 0) {
3419     CLS_ERR("error setting status for mirrored image, global id '%s': %s",
3420             global_image_id.c_str(), cpp_strerror(r).c_str());
3421     return r;
3422   }
3423   return 0;
3424 }
3425
3426 int image_status_remove(cls_method_context_t hctx,
3427                         const string &global_image_id) {
3428
3429   int r = cls_cxx_map_remove_key(hctx, status_global_key(global_image_id));
3430   if (r < 0) {
3431     CLS_ERR("error removing status for mirrored image, global id '%s': %s",
3432             global_image_id.c_str(), cpp_strerror(r).c_str());
3433     return r;
3434   }
3435   return 0;
3436 }
3437
3438 int image_status_get(cls_method_context_t hctx, const string &global_image_id,
3439                      const std::set<entity_inst_t> &watchers,
3440                      cls::rbd::MirrorImageStatus *status) {
3441
3442   bufferlist bl;
3443   int r = cls_cxx_map_get_val(hctx, status_global_key(global_image_id), &bl);
3444   if (r < 0) {
3445     if (r != -ENOENT) {
3446       CLS_ERR("error reading status for mirrored image, global id '%s': '%s'",
3447               global_image_id.c_str(), cpp_strerror(r).c_str());
3448     }
3449     return r;
3450   }
3451
3452   MirrorImageStatusOnDisk ondisk_status;
3453   try {
3454     bufferlist::iterator it = bl.begin();
3455     decode(ondisk_status, it);
3456   } catch (const buffer::error &err) {
3457     CLS_ERR("could not decode status for mirrored image, global id '%s'",
3458             global_image_id.c_str());
3459     return -EIO;
3460   }
3461
3462
3463   *status = static_cast<cls::rbd::MirrorImageStatus>(ondisk_status);
3464   status->up = (watchers.find(ondisk_status.origin) != watchers.end());
3465   return 0;
3466 }
3467
3468 int image_status_list(cls_method_context_t hctx,
3469         const std::string &start_after, uint64_t max_return,
3470         map<std::string, cls::rbd::MirrorImage> *mirror_images,
3471         map<std::string, cls::rbd::MirrorImageStatus> *mirror_statuses) {
3472   std::string last_read = image_key(start_after);
3473   int max_read = RBD_MAX_KEYS_READ;
3474   bool more = true;
3475
3476   std::set<entity_inst_t> watchers;
3477   int r = list_watchers(hctx, &watchers);
3478   if (r < 0) {
3479     return r;
3480   }
3481
3482   while (more && mirror_images->size() < max_return) {
3483     std::map<std::string, bufferlist> vals;
3484     CLS_LOG(20, "last_read = '%s'", last_read.c_str());
3485     r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX, max_read, &vals,
3486                              &more);
3487     if (r < 0) {
3488       CLS_ERR("error reading mirror image directory by name: %s",
3489               cpp_strerror(r).c_str());
3490       return r;
3491     }
3492
3493     for (auto it = vals.begin(); it != vals.end() &&
3494            mirror_images->size() < max_return; ++it) {
3495       const std::string &image_id = it->first.substr(IMAGE_KEY_PREFIX.size());
3496       cls::rbd::MirrorImage mirror_image;
3497       bufferlist::iterator iter = it->second.begin();
3498       try {
3499         ::decode(mirror_image, iter);
3500       } catch (const buffer::error &err) {
3501         CLS_ERR("could not decode mirror image payload of image '%s'",
3502                 image_id.c_str());
3503         return -EIO;
3504       }
3505
3506       (*mirror_images)[image_id] = mirror_image;
3507
3508       cls::rbd::MirrorImageStatus status;
3509       int r1 = image_status_get(hctx, mirror_image.global_image_id, watchers,
3510                                 &status);
3511       if (r1 < 0) {
3512         continue;
3513       }
3514
3515       (*mirror_statuses)[image_id] = status;
3516     }
3517     if (!vals.empty()) {
3518       last_read = image_key(mirror_images->rbegin()->first);
3519     }
3520   }
3521
3522   return 0;
3523 }
3524
3525 int image_status_get_summary(cls_method_context_t hctx,
3526         std::map<cls::rbd::MirrorImageStatusState, int> *states) {
3527   std::set<entity_inst_t> watchers;
3528   int r = list_watchers(hctx, &watchers);
3529   if (r < 0) {
3530     return r;
3531   }
3532
3533   states->clear();
3534
3535   string last_read = IMAGE_KEY_PREFIX;
3536   int max_read = RBD_MAX_KEYS_READ;
3537   bool more = true;
3538   while (more) {
3539     map<string, bufferlist> vals;
3540     r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX,
3541                              max_read, &vals, &more);
3542     if (r < 0) {
3543       CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
3544       return r;
3545     }
3546
3547     for (auto &list_it : vals) {
3548       const string &key = list_it.first;
3549
3550       if (0 != key.compare(0, IMAGE_KEY_PREFIX.size(), IMAGE_KEY_PREFIX)) {
3551         break;
3552       }
3553
3554       cls::rbd::MirrorImage mirror_image;
3555       bufferlist::iterator iter = list_it.second.begin();
3556       try {
3557         ::decode(mirror_image, iter);
3558       } catch (const buffer::error &err) {
3559         CLS_ERR("could not decode mirror image payload for key '%s'",
3560                 key.c_str());
3561         return -EIO;
3562       }
3563
3564       cls::rbd::MirrorImageStatus status;
3565       image_status_get(hctx, mirror_image.global_image_id, watchers, &status);
3566
3567       cls::rbd::MirrorImageStatusState state = status.up ? status.state :
3568         cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
3569       (*states)[state]++;
3570     }
3571
3572     if (!vals.empty()) {
3573       last_read = vals.rbegin()->first;
3574     }
3575   }
3576
3577   return 0;
3578 }
3579
3580 int image_status_remove_down(cls_method_context_t hctx) {
3581   std::set<entity_inst_t> watchers;
3582   int r = list_watchers(hctx, &watchers);
3583   if (r < 0) {
3584     return r;
3585   }
3586
3587   string last_read = STATUS_GLOBAL_KEY_PREFIX;
3588   int max_read = RBD_MAX_KEYS_READ;
3589   bool more = true;
3590   while (more) {
3591     map<string, bufferlist> vals;
3592     r = cls_cxx_map_get_vals(hctx, last_read, STATUS_GLOBAL_KEY_PREFIX,
3593                              max_read, &vals, &more);
3594     if (r < 0) {
3595       CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
3596       return r;
3597     }
3598
3599     for (auto &list_it : vals) {
3600       const string &key = list_it.first;
3601
3602       if (0 != key.compare(0, STATUS_GLOBAL_KEY_PREFIX.size(),
3603                            STATUS_GLOBAL_KEY_PREFIX)) {
3604         break;
3605       }
3606
3607       MirrorImageStatusOnDisk status;
3608       try {
3609         bufferlist::iterator it = list_it.second.begin();
3610         status.decode_meta(it);
3611       } catch (const buffer::error &err) {
3612         CLS_ERR("could not decode status metadata for mirrored image '%s'",
3613                 key.c_str());
3614         return -EIO;
3615       }
3616
3617       if (watchers.find(status.origin) == watchers.end()) {
3618         CLS_LOG(20, "removing stale status object for key %s",
3619                 key.c_str());
3620         int r1 = cls_cxx_map_remove_key(hctx, key);
3621         if (r1 < 0) {
3622           CLS_ERR("error removing stale status for key '%s': %s",
3623                   key.c_str(), cpp_strerror(r1).c_str());
3624           return r1;
3625         }
3626       }
3627     }
3628
3629     if (!vals.empty()) {
3630       last_read = vals.rbegin()->first;
3631     }
3632   }
3633
3634   return 0;
3635 }
3636
3637 int instances_list(cls_method_context_t hctx,
3638                    std::vector<std::string> *instance_ids) {
3639   std::string last_read = INSTANCE_KEY_PREFIX;
3640   int max_read = RBD_MAX_KEYS_READ;
3641   bool more = true;
3642   while (more) {
3643     std::map<std::string, bufferlist> vals;
3644     int r = cls_cxx_map_get_vals(hctx, last_read, INSTANCE_KEY_PREFIX.c_str(),
3645                                  max_read, &vals, &more);
3646     if (r < 0) {
3647       if (r != -ENOENT) {
3648         CLS_ERR("error reading mirror instances: %s", cpp_strerror(r).c_str());
3649       }
3650       return r;
3651     }
3652
3653     for (auto &it : vals) {
3654       instance_ids->push_back(it.first.substr(INSTANCE_KEY_PREFIX.size()));
3655     }
3656
3657     if (!vals.empty()) {
3658       last_read = vals.rbegin()->first;
3659     }
3660   }
3661   return 0;
3662 }
3663
3664 int instances_add(cls_method_context_t hctx, const string &instance_id) {
3665   bufferlist bl;
3666
3667   int r = cls_cxx_map_set_val(hctx, instance_key(instance_id), &bl);
3668   if (r < 0) {
3669     CLS_ERR("error setting mirror instance %s: %s", instance_id.c_str(),
3670             cpp_strerror(r).c_str());
3671     return r;
3672   }
3673   return 0;
3674 }
3675
3676 int instances_remove(cls_method_context_t hctx, const string &instance_id) {
3677
3678   int r = cls_cxx_map_remove_key(hctx, instance_key(instance_id));
3679   if (r < 0) {
3680     CLS_ERR("error removing mirror instance %s: %s", instance_id.c_str(),
3681             cpp_strerror(r).c_str());
3682     return r;
3683   }
3684   return 0;
3685 }
3686
3687 } // namespace mirror
3688
3689 /**
3690  * Input:
3691  * none
3692  *
3693  * Output:
3694  * @param uuid (std::string)
3695  * @returns 0 on success, negative error code on failure
3696  */
3697 int mirror_uuid_get(cls_method_context_t hctx, bufferlist *in,
3698                     bufferlist *out) {
3699   std::string mirror_uuid;
3700   int r = mirror::uuid_get(hctx, &mirror_uuid);
3701   if (r < 0) {
3702     return r;
3703   }
3704
3705   ::encode(mirror_uuid, *out);
3706   return 0;
3707 }
3708
3709 /**
3710  * Input:
3711  * @param mirror_uuid (std::string)
3712  *
3713  * Output:
3714  * @returns 0 on success, negative error code on failure
3715  */
3716 int mirror_uuid_set(cls_method_context_t hctx, bufferlist *in,
3717                     bufferlist *out) {
3718   std::string mirror_uuid;
3719   try {
3720     bufferlist::iterator bl_it = in->begin();
3721     ::decode(mirror_uuid, bl_it);
3722   } catch (const buffer::error &err) {
3723     return -EINVAL;
3724   }
3725
3726   if (mirror_uuid.empty()) {
3727     CLS_ERR("cannot set empty mirror uuid");
3728     return -EINVAL;
3729   }
3730
3731   uint32_t mirror_mode;
3732   int r = read_key(hctx, mirror::MODE, &mirror_mode);
3733   if (r < 0 && r != -ENOENT) {
3734     return r;
3735   } else if (r == 0 && mirror_mode != cls::rbd::MIRROR_MODE_DISABLED) {
3736     CLS_ERR("cannot set mirror uuid while mirroring enabled");
3737     return -EINVAL;
3738   }
3739
3740   bufferlist mirror_uuid_bl;
3741   mirror_uuid_bl.append(mirror_uuid);
3742   r = cls_cxx_map_set_val(hctx, mirror::UUID, &mirror_uuid_bl);
3743   if (r < 0) {
3744     CLS_ERR("failed to set mirror uuid");
3745     return r;
3746   }
3747   return 0;
3748 }
3749
3750 /**
3751  * Input:
3752  * none
3753  *
3754  * Output:
3755  * @param cls::rbd::MirrorMode (uint32_t)
3756  * @returns 0 on success, negative error code on failure
3757  */
3758 int mirror_mode_get(cls_method_context_t hctx, bufferlist *in,
3759                     bufferlist *out) {
3760   uint32_t mirror_mode_decode;
3761   int r = read_key(hctx, mirror::MODE, &mirror_mode_decode);
3762   if (r < 0) {
3763     return r;
3764   }
3765
3766   ::encode(mirror_mode_decode, *out);
3767   return 0;
3768 }
3769
3770 /**
3771  * Input:
3772  * @param mirror_mode (cls::rbd::MirrorMode) (uint32_t)
3773  *
3774  * Output:
3775  * @returns 0 on success, negative error code on failure
3776  */
3777 int mirror_mode_set(cls_method_context_t hctx, bufferlist *in,
3778                     bufferlist *out) {
3779   uint32_t mirror_mode_decode;
3780   try {
3781     bufferlist::iterator bl_it = in->begin();
3782     ::decode(mirror_mode_decode, bl_it);
3783   } catch (const buffer::error &err) {
3784     return -EINVAL;
3785   }
3786
3787   bool enabled;
3788   switch (static_cast<cls::rbd::MirrorMode>(mirror_mode_decode)) {
3789   case cls::rbd::MIRROR_MODE_DISABLED:
3790     enabled = false;
3791     break;
3792   case cls::rbd::MIRROR_MODE_IMAGE:
3793   case cls::rbd::MIRROR_MODE_POOL:
3794     enabled = true;
3795     break;
3796   default:
3797     CLS_ERR("invalid mirror mode: %d", mirror_mode_decode);
3798     return -EINVAL;
3799   }
3800
3801   int r;
3802   if (enabled) {
3803     std::string mirror_uuid;
3804     r = mirror::uuid_get(hctx, &mirror_uuid);
3805     if (r == -ENOENT) {
3806       return -EINVAL;
3807     } else if (r < 0) {
3808       return r;
3809     }
3810
3811     bufferlist bl;
3812     ::encode(mirror_mode_decode, bl);
3813
3814     r = cls_cxx_map_set_val(hctx, mirror::MODE, &bl);
3815     if (r < 0) {
3816       CLS_ERR("error enabling mirroring: %s", cpp_strerror(r).c_str());
3817       return r;
3818     }
3819   } else {
3820     std::vector<cls::rbd::MirrorPeer> peers;
3821     r = mirror::read_peers(hctx, &peers);
3822     if (r < 0 && r != -ENOENT) {
3823       return r;
3824     }
3825
3826     if (!peers.empty()) {
3827       CLS_ERR("mirroring peers still registered");
3828       return -EBUSY;
3829     }
3830
3831     r = remove_key(hctx, mirror::MODE);
3832     if (r < 0) {
3833       return r;
3834     }
3835
3836     r = remove_key(hctx, mirror::UUID);
3837     if (r < 0) {
3838       return r;
3839     }
3840   }
3841   return 0;
3842 }
3843
3844 /**
3845  * Input:
3846  * none
3847  *
3848  * Output:
3849  * @param std::vector<cls::rbd::MirrorPeer>: collection of peers
3850  * @returns 0 on success, negative error code on failure
3851  */
3852 int mirror_peer_list(cls_method_context_t hctx, bufferlist *in,
3853                      bufferlist *out) {
3854   std::vector<cls::rbd::MirrorPeer> peers;
3855   int r = mirror::read_peers(hctx, &peers);
3856   if (r < 0 && r != -ENOENT) {
3857     return r;
3858   }
3859
3860   ::encode(peers, *out);
3861   return 0;
3862 }
3863
3864 /**
3865  * Input:
3866  * @param mirror_peer (cls::rbd::MirrorPeer)
3867  *
3868  * Output:
3869  * @returns 0 on success, negative error code on failure
3870  */
3871 int mirror_peer_add(cls_method_context_t hctx, bufferlist *in,
3872                     bufferlist *out) {
3873   cls::rbd::MirrorPeer mirror_peer;
3874   try {
3875     bufferlist::iterator it = in->begin();
3876     ::decode(mirror_peer, it);
3877   } catch (const buffer::error &err) {
3878     return -EINVAL;
3879   }
3880
3881   uint32_t mirror_mode_decode;
3882   int r = read_key(hctx, mirror::MODE, &mirror_mode_decode);
3883   if (r < 0 && r != -ENOENT) {
3884     return r;
3885   } else if (r == -ENOENT ||
3886              mirror_mode_decode == cls::rbd::MIRROR_MODE_DISABLED) {
3887     CLS_ERR("mirroring must be enabled on the pool");
3888     return -EINVAL;
3889   } else if (!mirror_peer.is_valid()) {
3890     CLS_ERR("mirror peer is not valid");
3891     return -EINVAL;
3892   }
3893
3894   std::string mirror_uuid;
3895   r = mirror::uuid_get(hctx, &mirror_uuid);
3896   if (r < 0) {
3897     CLS_ERR("error retrieving mirroring uuid: %s", cpp_strerror(r).c_str());
3898     return r;
3899   } else if (mirror_peer.uuid == mirror_uuid) {
3900     CLS_ERR("peer uuid '%s' matches pool mirroring uuid",
3901             mirror_uuid.c_str());
3902     return -EINVAL;
3903   }
3904
3905   std::vector<cls::rbd::MirrorPeer> peers;
3906   r = mirror::read_peers(hctx, &peers);
3907   if (r < 0 && r != -ENOENT) {
3908     return r;
3909   }
3910
3911   for (auto const &peer : peers) {
3912     if (peer.uuid == mirror_peer.uuid) {
3913       CLS_ERR("peer uuid '%s' already exists",
3914               peer.uuid.c_str());
3915       return -ESTALE;
3916     } else if (peer.cluster_name == mirror_peer.cluster_name &&
3917                (peer.pool_id == -1 || mirror_peer.pool_id == -1 ||
3918                 peer.pool_id == mirror_peer.pool_id)) {
3919       CLS_ERR("peer cluster name '%s' already exists",
3920               peer.cluster_name.c_str());
3921       return -EEXIST;
3922     }
3923   }
3924
3925   bufferlist bl;
3926   ::encode(mirror_peer, bl);
3927   r = cls_cxx_map_set_val(hctx, mirror::peer_key(mirror_peer.uuid),
3928                           &bl);
3929   if (r < 0) {
3930     CLS_ERR("error adding peer: %s", cpp_strerror(r).c_str());
3931     return r;
3932   }
3933   return 0;
3934 }
3935
3936 /**
3937  * Input:
3938  * @param uuid (std::string)
3939  *
3940  * Output:
3941  * @returns 0 on success, negative error code on failure
3942  */
3943 int mirror_peer_remove(cls_method_context_t hctx, bufferlist *in,
3944                        bufferlist *out) {
3945   std::string uuid;
3946   try {
3947     bufferlist::iterator it = in->begin();
3948     ::decode(uuid, it);
3949   } catch (const buffer::error &err) {
3950     return -EINVAL;
3951   }
3952
3953   int r = cls_cxx_map_remove_key(hctx, mirror::peer_key(uuid));
3954   if (r < 0 && r != -ENOENT) {
3955     CLS_ERR("error removing peer: %s", cpp_strerror(r).c_str());
3956     return r;
3957   }
3958   return 0;
3959 }
3960
3961 /**
3962  * Input:
3963  * @param uuid (std::string)
3964  * @param client_name (std::string)
3965  *
3966  * Output:
3967  * @returns 0 on success, negative error code on failure
3968  */
3969 int mirror_peer_set_client(cls_method_context_t hctx, bufferlist *in,
3970                            bufferlist *out) {
3971   std::string uuid;
3972   std::string client_name;
3973   try {
3974     bufferlist::iterator it = in->begin();
3975     ::decode(uuid, it);
3976     ::decode(client_name, it);
3977   } catch (const buffer::error &err) {
3978     return -EINVAL;
3979   }
3980
3981   cls::rbd::MirrorPeer peer;
3982   int r = mirror::read_peer(hctx, uuid, &peer);
3983   if (r < 0) {
3984     return r;
3985   }
3986
3987   peer.client_name = client_name;
3988   r = mirror::write_peer(hctx, uuid, peer);
3989   if (r < 0) {
3990     return r;
3991   }
3992   return 0;
3993 }
3994
3995 /**
3996  * Input:
3997  * @param uuid (std::string)
3998  * @param cluster_name (std::string)
3999  *
4000  * Output:
4001  * @returns 0 on success, negative error code on failure
4002  */
4003 int mirror_peer_set_cluster(cls_method_context_t hctx, bufferlist *in,
4004                             bufferlist *out) {
4005   std::string uuid;
4006   std::string cluster_name;
4007   try {
4008     bufferlist::iterator it = in->begin();
4009     ::decode(uuid, it);
4010     ::decode(cluster_name, it);
4011   } catch (const buffer::error &err) {
4012     return -EINVAL;
4013   }
4014
4015   cls::rbd::MirrorPeer peer;
4016   int r = mirror::read_peer(hctx, uuid, &peer);
4017   if (r < 0) {
4018     return r;
4019   }
4020
4021   peer.cluster_name = cluster_name;
4022   r = mirror::write_peer(hctx, uuid, peer);
4023   if (r < 0) {
4024     return r;
4025   }
4026   return 0;
4027 }
4028
4029 /**
4030  * Input:
4031  * @param start_after which name to begin listing after
4032  *        (use the empty string to start at the beginning)
4033  * @param max_return the maximum number of names to list
4034  *
4035  * Output:
4036  * @param std::map<std::string, std::string>: local id to global id map
4037  * @returns 0 on success, negative error code on failure
4038  */
4039 int mirror_image_list(cls_method_context_t hctx, bufferlist *in,
4040                      bufferlist *out) {
4041   std::string start_after;
4042   uint64_t max_return;
4043   try {
4044     bufferlist::iterator iter = in->begin();
4045     ::decode(start_after, iter);
4046     ::decode(max_return, iter);
4047   } catch (const buffer::error &err) {
4048     return -EINVAL;
4049   }
4050
4051   int max_read = RBD_MAX_KEYS_READ;
4052   bool more = true;
4053   std::map<std::string, std::string> mirror_images;
4054   std::string last_read = mirror::image_key(start_after);
4055
4056   while (more && mirror_images.size() < max_return) {
4057     std::map<std::string, bufferlist> vals;
4058     CLS_LOG(20, "last_read = '%s'", last_read.c_str());
4059     int r = cls_cxx_map_get_vals(hctx, last_read, mirror::IMAGE_KEY_PREFIX,
4060                                  max_read, &vals, &more);
4061     if (r < 0) {
4062       CLS_ERR("error reading mirror image directory by name: %s",
4063               cpp_strerror(r).c_str());
4064       return r;
4065     }
4066
4067     for (auto it = vals.begin(); it != vals.end(); ++it) {
4068       const std::string &image_id =
4069         it->first.substr(mirror::IMAGE_KEY_PREFIX.size());
4070       cls::rbd::MirrorImage mirror_image;
4071       bufferlist::iterator iter = it->second.begin();
4072       try {
4073         ::decode(mirror_image, iter);
4074       } catch (const buffer::error &err) {
4075         CLS_ERR("could not decode mirror image payload of image '%s'",
4076                 image_id.c_str());
4077         return -EIO;
4078       }
4079
4080       mirror_images[image_id] = mirror_image.global_image_id;
4081       if (mirror_images.size() >= max_return) {
4082         break;
4083       }
4084     }
4085     if (!vals.empty()) {
4086       last_read = mirror::image_key(mirror_images.rbegin()->first);
4087     }
4088   }
4089
4090   ::encode(mirror_images, *out);
4091   return 0;
4092 }
4093
4094 /**
4095  * Input:
4096  * @param global_id (std::string)
4097  *
4098  * Output:
4099  * @param std::string - image id
4100  * @returns 0 on success, negative error code on failure
4101  */
4102 int mirror_image_get_image_id(cls_method_context_t hctx, bufferlist *in,
4103                               bufferlist *out) {
4104   std::string global_id;
4105   try {
4106     bufferlist::iterator it = in->begin();
4107     ::decode(global_id, it);
4108   } catch (const buffer::error &err) {
4109     return -EINVAL;
4110   }
4111
4112   std::string image_id;
4113   int r = read_key(hctx, mirror::global_key(global_id), &image_id);
4114   if (r < 0) {
4115     CLS_ERR("error retrieving image id for global id '%s': %s",
4116             global_id.c_str(), cpp_strerror(r).c_str());
4117     return r;
4118   }
4119
4120   ::encode(image_id, *out);
4121   return 0;
4122 }
4123
4124 /**
4125  * Input:
4126  * @param image_id (std::string)
4127  *
4128  * Output:
4129  * @param cls::rbd::MirrorImage - metadata associated with the image_id
4130  * @returns 0 on success, negative error code on failure
4131  */
4132 int mirror_image_get(cls_method_context_t hctx, bufferlist *in,
4133                      bufferlist *out) {
4134   string image_id;
4135   try {
4136     bufferlist::iterator it = in->begin();
4137     ::decode(image_id, it);
4138   } catch (const buffer::error &err) {
4139     return -EINVAL;
4140   }
4141
4142   cls::rbd::MirrorImage mirror_image;
4143   int r = mirror::image_get(hctx, image_id, &mirror_image);
4144   if (r < 0) {
4145     return r;
4146   }
4147
4148   ::encode(mirror_image, *out);
4149   return 0;
4150 }
4151
4152 /**
4153  * Input:
4154  * @param image_id (std::string)
4155  * @param mirror_image (cls::rbd::MirrorImage)
4156  *
4157  * Output:
4158  * @returns 0 on success, negative error code on failure
4159  * @returns -EEXIST if there's an existing image_id with a different global_image_id
4160  */
4161 int mirror_image_set(cls_method_context_t hctx, bufferlist *in,
4162                      bufferlist *out) {
4163   string image_id;
4164   cls::rbd::MirrorImage mirror_image;
4165   try {
4166     bufferlist::iterator it = in->begin();
4167     ::decode(image_id, it);
4168     ::decode(mirror_image, it);
4169   } catch (const buffer::error &err) {
4170     return -EINVAL;
4171   }
4172
4173   int r = mirror::image_set(hctx, image_id, mirror_image);
4174   if (r < 0) {
4175     return r;
4176   }
4177   return 0;
4178 }
4179
4180 /**
4181  * Input:
4182  * @param image_id (std::string)
4183  *
4184  * Output:
4185  * @returns 0 on success, negative error code on failure
4186  */
4187 int mirror_image_remove(cls_method_context_t hctx, bufferlist *in,
4188                         bufferlist *out) {
4189   string image_id;
4190   try {
4191     bufferlist::iterator it = in->begin();
4192     ::decode(image_id, it);
4193   } catch (const buffer::error &err) {
4194     return -EINVAL;
4195   }
4196
4197   int r = mirror::image_remove(hctx, image_id);
4198   if (r < 0) {
4199     return r;
4200   }
4201   return 0;
4202 }
4203
4204 /**
4205  * Input:
4206  * @param global_image_id (std::string)
4207  * @param status (cls::rbd::MirrorImageStatus)
4208  *
4209  * Output:
4210  * @returns 0 on success, negative error code on failure
4211  */
4212 int mirror_image_status_set(cls_method_context_t hctx, bufferlist *in,
4213                             bufferlist *out) {
4214   string global_image_id;
4215   cls::rbd::MirrorImageStatus status;
4216   try {
4217     bufferlist::iterator it = in->begin();
4218     ::decode(global_image_id, it);
4219     ::decode(status, it);
4220   } catch (const buffer::error &err) {
4221     return -EINVAL;
4222   }
4223
4224   int r = mirror::image_status_set(hctx, global_image_id, status);
4225   if (r < 0) {
4226     return r;
4227   }
4228   return 0;
4229 }
4230
4231 /**
4232  * Input:
4233  * @param global_image_id (std::string)
4234  *
4235  * Output:
4236  * @returns 0 on success, negative error code on failure
4237  */
4238 int mirror_image_status_remove(cls_method_context_t hctx, bufferlist *in,
4239                                bufferlist *out) {
4240   string global_image_id;
4241   try {
4242     bufferlist::iterator it = in->begin();
4243     ::decode(global_image_id, it);
4244   } catch (const buffer::error &err) {
4245     return -EINVAL;
4246   }
4247
4248   int r = mirror::image_status_remove(hctx, global_image_id);
4249   if (r < 0) {
4250     return r;
4251   }
4252   return 0;
4253 }
4254
4255 /**
4256  * Input:
4257  * @param global_image_id (std::string)
4258  *
4259  * Output:
4260  * @param cls::rbd::MirrorImageStatus - metadata associated with the global_image_id
4261  * @returns 0 on success, negative error code on failure
4262  */
4263 int mirror_image_status_get(cls_method_context_t hctx, bufferlist *in,
4264                             bufferlist *out) {
4265   string global_image_id;
4266   try {
4267     bufferlist::iterator it = in->begin();
4268     ::decode(global_image_id, it);
4269   } catch (const buffer::error &err) {
4270     return -EINVAL;
4271   }
4272
4273   std::set<entity_inst_t> watchers;
4274   int r = mirror::list_watchers(hctx, &watchers);
4275   if (r < 0) {
4276     return r;
4277   }
4278
4279   cls::rbd::MirrorImageStatus status;
4280   r = mirror::image_status_get(hctx, global_image_id, watchers, &status);
4281   if (r < 0) {
4282     return r;
4283   }
4284
4285   ::encode(status, *out);
4286   return 0;
4287 }
4288
4289 /**
4290  * Input:
4291  * @param start_after which name to begin listing after
4292  *        (use the empty string to start at the beginning)
4293  * @param max_return the maximum number of names to list
4294  *
4295  * Output:
4296  * @param std::map<std::string, cls::rbd::MirrorImage>: image id to image map
4297  * @param std::map<std::string, cls::rbd::MirrorImageStatus>: image it to status map
4298  * @returns 0 on success, negative error code on failure
4299  */
4300 int mirror_image_status_list(cls_method_context_t hctx, bufferlist *in,
4301                              bufferlist *out) {
4302   std::string start_after;
4303   uint64_t max_return;
4304   try {
4305     bufferlist::iterator iter = in->begin();
4306     ::decode(start_after, iter);
4307     ::decode(max_return, iter);
4308   } catch (const buffer::error &err) {
4309     return -EINVAL;
4310   }
4311
4312   map<std::string, cls::rbd::MirrorImage> images;
4313   map<std::string, cls::rbd::MirrorImageStatus> statuses;
4314   int r = mirror::image_status_list(hctx, start_after, max_return, &images,
4315                                     &statuses);
4316   if (r < 0) {
4317     return r;
4318   }
4319
4320   ::encode(images, *out);
4321   ::encode(statuses, *out);
4322   return 0;
4323 }
4324
4325 /**
4326  * Input:
4327  * none
4328  *
4329  * Output:
4330  * @param std::map<cls::rbd::MirrorImageStatusState, int>: states counts
4331  * @returns 0 on success, negative error code on failure
4332  */
4333 int mirror_image_status_get_summary(cls_method_context_t hctx, bufferlist *in,
4334                                     bufferlist *out) {
4335   std::map<cls::rbd::MirrorImageStatusState, int> states;
4336
4337   int r = mirror::image_status_get_summary(hctx, &states);
4338   if (r < 0) {
4339     return r;
4340   }
4341
4342   ::encode(states, *out);
4343   return 0;
4344 }
4345
4346 /**
4347  * Input:
4348  * none
4349  *
4350  * Output:
4351  * @returns 0 on success, negative error code on failure
4352  */
4353 int mirror_image_status_remove_down(cls_method_context_t hctx, bufferlist *in,
4354                                     bufferlist *out) {
4355   int r = mirror::image_status_remove_down(hctx);
4356   if (r < 0) {
4357     return r;
4358   }
4359   return 0;
4360 }
4361
4362 /**
4363  * Input:
4364  * none
4365  *
4366  * Output:
4367  * @param std::vector<std::string>: instance ids
4368  * @returns 0 on success, negative error code on failure
4369  */
4370 int mirror_instances_list(cls_method_context_t hctx, bufferlist *in,
4371                           bufferlist *out) {
4372   std::vector<std::string> instance_ids;
4373
4374   int r = mirror::instances_list(hctx, &instance_ids);
4375   if (r < 0) {
4376     return r;
4377   }
4378
4379   ::encode(instance_ids, *out);
4380   return 0;
4381 }
4382
4383 /**
4384  * Input:
4385  * @param instance_id (std::string)
4386  *
4387  * Output:
4388  * @returns 0 on success, negative error code on failure
4389  */
4390 int mirror_instances_add(cls_method_context_t hctx, bufferlist *in,
4391                          bufferlist *out) {
4392   std::string instance_id;
4393   try {
4394     bufferlist::iterator iter = in->begin();
4395     ::decode(instance_id, iter);
4396   } catch (const buffer::error &err) {
4397     return -EINVAL;
4398   }
4399
4400   int r = mirror::instances_add(hctx, instance_id);
4401   if (r < 0) {
4402     return r;
4403   }
4404   return 0;
4405 }
4406
4407 /**
4408  * Input:
4409  * @param instance_id (std::string)
4410  *
4411  * Output:
4412  * @returns 0 on success, negative error code on failure
4413  */
4414 int mirror_instances_remove(cls_method_context_t hctx, bufferlist *in,
4415                             bufferlist *out) {
4416   std::string instance_id;
4417   try {
4418     bufferlist::iterator iter = in->begin();
4419     ::decode(instance_id, iter);
4420   } catch (const buffer::error &err) {
4421     return -EINVAL;
4422   }
4423
4424   int r = mirror::instances_remove(hctx, instance_id);
4425   if (r < 0) {
4426     return r;
4427   }
4428   return 0;
4429 }
4430
4431 /**
4432  * Initialize the header with basic metadata.
4433  * Everything is stored as key/value pairs as omaps in the header object.
4434  *
4435  * Input:
4436  * none
4437  *
4438  * Output:
4439  * @return 0 on success, negative error code on failure
4440  */
4441 int group_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4442 {
4443   bufferlist snap_seqbl;
4444   uint64_t snap_seq = 0;
4445   ::encode(snap_seq, snap_seqbl);
4446   int r = cls_cxx_map_set_val(hctx, GROUP_SNAP_SEQ, &snap_seqbl);
4447   if (r < 0)
4448     return r;
4449
4450   return 0;
4451 }
4452
4453 /**
4454  * List consistency groups from the directory.
4455  *
4456  * Input:
4457  * @param start_after (std::string)
4458  * @param max_return (int64_t)
4459  *
4460  * Output:
4461  * @param map of consistency groups (name, id)
4462  * @return 0 on success, negative error code on failure
4463  */
4464 int group_dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4465 {
4466   string start_after;
4467   uint64_t max_return;
4468
4469   try {
4470     bufferlist::iterator iter = in->begin();
4471     ::decode(start_after, iter);
4472     ::decode(max_return, iter);
4473   } catch (const buffer::error &err) {
4474     return -EINVAL;
4475   }
4476
4477   int max_read = RBD_MAX_KEYS_READ;
4478   bool more = true;
4479   map<string, string> groups;
4480   string last_read = dir_key_for_name(start_after);
4481
4482   while (more && groups.size() < max_return) {
4483     map<string, bufferlist> vals;
4484     CLS_LOG(20, "last_read = '%s'", last_read.c_str());
4485     int r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
4486                                  max_read, &vals, &more);
4487     if (r < 0) {
4488       CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
4489       return r;
4490     }
4491
4492     for (pair<string, bufferlist> val: vals) {
4493       string id;
4494       bufferlist::iterator iter = val.second.begin();
4495       try {
4496         ::decode(id, iter);
4497       } catch (const buffer::error &err) {
4498         CLS_ERR("could not decode id of consistency group '%s'", val.first.c_str());
4499         return -EIO;
4500       }
4501       CLS_LOG(20, "adding '%s' -> '%s'", dir_name_from_key(val.first).c_str(), id.c_str());
4502       groups[dir_name_from_key(val.first)] = id;
4503       if (groups.size() >= max_return)
4504         break;
4505     }
4506     if (!vals.empty()) {
4507       last_read = dir_key_for_name(groups.rbegin()->first);
4508     }
4509   }
4510
4511   ::encode(groups, *out);
4512
4513   return 0;
4514 }
4515
4516 /**
4517  * Add a consistency group to the directory.
4518  *
4519  * Input:
4520  * @param name (std::string)
4521  * @param id (std::string)
4522  *
4523  * Output:
4524  * @return 0 on success, negative error code on failure
4525  */
4526 int group_dir_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4527 {
4528   int r = cls_cxx_create(hctx, false);
4529
4530   if (r < 0) {
4531     CLS_ERR("could not create consistency group directory: %s",
4532             cpp_strerror(r).c_str());
4533     return r;
4534   }
4535
4536   string name, id;
4537   try {
4538     bufferlist::iterator iter = in->begin();
4539     ::decode(name, iter);
4540     ::decode(id, iter);
4541   } catch (const buffer::error &err) {
4542     return -EINVAL;
4543   }
4544
4545   if (!name.size() || !is_valid_id(id)) {
4546     CLS_ERR("invalid consistency group name '%s' or id '%s'",
4547             name.c_str(), id.c_str());
4548     return -EINVAL;
4549   }
4550
4551   CLS_LOG(20, "group_dir_add name=%s id=%s", name.c_str(), id.c_str());
4552
4553   string tmp;
4554   string name_key = dir_key_for_name(name);
4555   string id_key = dir_key_for_id(id);
4556   r = read_key(hctx, name_key, &tmp);
4557   if (r != -ENOENT) {
4558     CLS_LOG(10, "name already exists");
4559     return -EEXIST;
4560   }
4561   r = read_key(hctx, id_key, &tmp);
4562   if (r != -ENOENT) {
4563     CLS_LOG(10, "id already exists");
4564     return -EBADF;
4565   }
4566   bufferlist id_bl, name_bl;
4567   ::encode(id, id_bl);
4568   ::encode(name, name_bl);
4569   map<string, bufferlist> omap_vals;
4570   omap_vals[name_key] = id_bl;
4571   omap_vals[id_key] = name_bl;
4572   return cls_cxx_map_set_vals(hctx, &omap_vals);
4573 }
4574
4575 /**
4576  * Remove a consistency group from the directory.
4577  *
4578  * Input:
4579  * @param name (std::string)
4580  * @param id (std::string)
4581  *
4582  * Output:
4583  * @return 0 on success, negative error code on failure
4584  */
4585 int group_dir_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4586 {
4587   string name, id;
4588   try {
4589     bufferlist::iterator iter = in->begin();
4590     ::decode(name, iter);
4591     ::decode(id, iter);
4592   } catch (const buffer::error &err) {
4593     return -EINVAL;
4594   }
4595
4596   CLS_LOG(20, "group_dir_remove name=%s id=%s", name.c_str(), id.c_str());
4597
4598   string stored_name, stored_id;
4599   string name_key = dir_key_for_name(name);
4600   string id_key = dir_key_for_id(id);
4601
4602   int r = read_key(hctx, name_key, &stored_id);
4603   if (r < 0) {
4604     if (r != -ENOENT)
4605       CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
4606     return r;
4607   }
4608   r = read_key(hctx, id_key, &stored_name);
4609   if (r < 0) {
4610     if (r != -ENOENT)
4611       CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
4612     return r;
4613   }
4614
4615   // check if this op raced with a rename
4616   if (stored_name != name || stored_id != id) {
4617     CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
4618             stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
4619     return -ESTALE;
4620   }
4621
4622   r = cls_cxx_map_remove_key(hctx, name_key);
4623   if (r < 0) {
4624     CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
4625     return r;
4626   }
4627
4628   r = cls_cxx_map_remove_key(hctx, id_key);
4629   if (r < 0) {
4630     CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
4631     return r;
4632   }
4633
4634   return 0;
4635 }
4636
4637 /**
4638  * Set state of an image in the consistency group.
4639  *
4640  * Input:
4641  * @param image_status (cls::rbd::GroupImageStatus)
4642  *
4643  * Output:
4644  * @return 0 on success, negative error code on failure
4645  */
4646 int group_image_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4647 {
4648   CLS_LOG(20, "group_image_set");
4649
4650   cls::rbd::GroupImageStatus st;
4651   try {
4652     bufferlist::iterator iter = in->begin();
4653     ::decode(st, iter);
4654   } catch (const buffer::error &err) {
4655     return -EINVAL;
4656   }
4657
4658   string image_key = st.spec.image_key();
4659
4660   bufferlist image_val_bl;
4661   ::encode(st.state, image_val_bl);
4662   int r = cls_cxx_map_set_val(hctx, image_key, &image_val_bl);
4663   if (r < 0) {
4664     return r;
4665   }
4666
4667   return 0;
4668 }
4669
4670 /**
4671  * Remove reference to an image from the consistency group.
4672  *
4673  * Input:
4674  * @param spec (cls::rbd::GroupImageSpec)
4675  *
4676  * Output:
4677  * @return 0 on success, negative error code on failure
4678  */
4679 int group_image_remove(cls_method_context_t hctx,
4680                        bufferlist *in, bufferlist *out)
4681 {
4682   CLS_LOG(20, "group_image_remove");
4683   cls::rbd::GroupImageSpec spec;
4684   try {
4685     bufferlist::iterator iter = in->begin();
4686     ::decode(spec, iter);
4687   } catch (const buffer::error &err) {
4688     return -EINVAL;
4689   }
4690
4691   string image_key = spec.image_key();
4692
4693   int r = cls_cxx_map_remove_key(hctx, image_key);
4694   if (r < 0) {
4695     CLS_ERR("error removing image from group: %s", cpp_strerror(r).c_str());
4696     return r;
4697   }
4698
4699   return 0;
4700 }
4701
4702 /*
4703  * List images in the consistency group.
4704  *
4705  * Input:
4706  * @param start_after which name to begin listing after
4707  *        (use the empty string to start at the beginning)
4708  * @param max_return the maximum number of names to list
4709  *
4710  * Output:
4711  * @param tuples of descriptions of the images: image_id, pool_id, image reference state.
4712  * @return 0 on success, negative error code on failure
4713  */
4714 int group_image_list(cls_method_context_t hctx,
4715                      bufferlist *in, bufferlist *out)
4716 {
4717   CLS_LOG(20, "group_image_list");
4718   cls::rbd::GroupImageSpec start_after;
4719   uint64_t max_return;
4720   try {
4721     bufferlist::iterator iter = in->begin();
4722     ::decode(start_after, iter);
4723     ::decode(max_return, iter);
4724   } catch (const buffer::error &err) {
4725     return -EINVAL;
4726   }
4727
4728   int max_read = RBD_MAX_KEYS_READ;
4729   std::map<string, bufferlist> vals;
4730   string last_read = start_after.image_key();
4731   std::vector<cls::rbd::GroupImageStatus> res;
4732   bool more;
4733   do {
4734     int r = cls_cxx_map_get_vals(hctx, last_read,cls::rbd::RBD_GROUP_IMAGE_KEY_PREFIX,
4735                                  max_read, &vals, &more);
4736     if (r < 0)
4737       return r;
4738
4739     for (map<string, bufferlist>::iterator it = vals.begin();
4740          it != vals.end() && res.size() < max_return; ++it) {
4741
4742       bufferlist::iterator iter = it->second.begin();
4743       cls::rbd::GroupImageLinkState state;
4744       try {
4745         ::decode(state, iter);
4746       } catch (const buffer::error &err) {
4747         CLS_ERR("error decoding state for image: %s", it->first.c_str());
4748         return -EIO;
4749       }
4750       cls::rbd::GroupImageSpec spec;
4751       int r = cls::rbd::GroupImageSpec::from_key(it->first, &spec);
4752       if (r < 0)
4753         return r;
4754
4755       CLS_LOG(20, "Discovered image %s %" PRId64 " %d", spec.image_id.c_str(),
4756                                                  spec.pool_id,
4757                                                  (int)state);
4758       res.push_back(cls::rbd::GroupImageStatus(spec, state));
4759     }
4760     if (res.size() > 0) {
4761       last_read = res.rbegin()->spec.image_key();
4762     }
4763
4764   } while (more && (res.size() < max_return));
4765   ::encode(res, *out);
4766
4767   return 0;
4768 }
4769
4770 /**
4771  * Reference the consistency group this image belongs to.
4772  *
4773  * Input:
4774  * @param group_id (std::string)
4775  * @param pool_id (int64_t)
4776  *
4777  * Output:
4778  * @return 0 on success, negative error code on failure
4779  */
4780 int image_add_group(cls_method_context_t hctx,
4781                     bufferlist *in, bufferlist *out)
4782 {
4783   CLS_LOG(20, "image_add_group");
4784   cls::rbd::GroupSpec new_group;
4785   try {
4786     bufferlist::iterator iter = in->begin();
4787     ::decode(new_group, iter);
4788   } catch (const buffer::error &err) {
4789     return -EINVAL;
4790   }
4791
4792   bufferlist existing_refbl;
4793
4794   int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &existing_refbl);
4795   if (r == 0) {
4796     // If we are trying to link this image to the same group then return success.
4797     // If this image already belongs to another group then abort.
4798     cls::rbd::GroupSpec old_group;
4799     try {
4800       bufferlist::iterator iter = existing_refbl.begin();
4801       ::decode(old_group, iter);
4802     } catch (const buffer::error &err) {
4803       return -EINVAL;
4804     }
4805
4806     if ((old_group.group_id != new_group.group_id)
4807         || (old_group.pool_id != new_group.pool_id)) {
4808       return -EEXIST;
4809     } else {
4810       return 0; // In this case the values are already correct
4811     }
4812   } else if (r < 0 && r != -ENOENT) { // No entry means this image is not a member of any consistency group. So, we can use it.
4813     return r;
4814   }
4815
4816   bufferlist refbl;
4817   ::encode(new_group, refbl);
4818   r = cls_cxx_map_set_val(hctx, RBD_GROUP_REF, &refbl);
4819
4820   if (r < 0) {
4821     return r;
4822   }
4823
4824   return 0;
4825 }
4826
4827 /**
4828  * Remove image's pointer to the consistency group.
4829  *
4830  * Input:
4831  * @param cg_id (std::string)
4832  * @param pool_id (int64_t)
4833  *
4834  * Output:
4835  * @return 0 on success, negative error code on failure
4836  */
4837 int image_remove_group(cls_method_context_t hctx,
4838                        bufferlist *in,
4839                        bufferlist *out)
4840 {
4841   CLS_LOG(20, "image_remove_group");
4842   cls::rbd::GroupSpec spec;
4843   try {
4844     bufferlist::iterator iter = in->begin();
4845     ::decode(spec, iter);
4846   } catch (const buffer::error &err) {
4847     return -EINVAL;
4848   }
4849
4850   bufferlist refbl;
4851   int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
4852   if (r < 0) {
4853     return r;
4854   }
4855
4856   cls::rbd::GroupSpec ref_spec;
4857   bufferlist::iterator iter = refbl.begin();
4858   try {
4859     ::decode(ref_spec, iter);
4860   } catch (const buffer::error &err) {
4861     return -EINVAL;
4862   }
4863
4864   if (ref_spec.pool_id != spec.pool_id || ref_spec.group_id != spec.group_id) {
4865     return -EBADF;
4866   }
4867
4868   r = cls_cxx_map_remove_key(hctx, RBD_GROUP_REF);
4869   if (r < 0) {
4870     return r;
4871   }
4872
4873   return 0;
4874 }
4875
4876 /**
4877  * Retrieve the id and pool of the consistency group this image belongs to.
4878  *
4879  * Input:
4880  * none
4881  *
4882  * Output:
4883  * @param GroupSpec
4884  * @return 0 on success, negative error code on failure
4885  */
4886 int image_get_group(cls_method_context_t hctx,
4887                     bufferlist *in, bufferlist *out)
4888 {
4889   CLS_LOG(20, "image_get_group");
4890   bufferlist refbl;
4891   int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
4892   if (r < 0 && r != -ENOENT) {
4893     return r;
4894   }
4895
4896   cls::rbd::GroupSpec spec;
4897
4898   if (r != -ENOENT) {
4899     bufferlist::iterator iter = refbl.begin();
4900     try {
4901       ::decode(spec, iter);
4902     } catch (const buffer::error &err) {
4903       return -EINVAL;
4904     }
4905   }
4906
4907   ::encode(spec, *out);
4908   return 0;
4909 }
4910
4911 namespace trash {
4912
4913 static const std::string IMAGE_KEY_PREFIX("id_");
4914
4915 std::string image_key(const std::string &image_id) {
4916   return IMAGE_KEY_PREFIX + image_id;
4917 }
4918
4919 std::string image_id_from_key(const std::string &key) {
4920   return key.substr(IMAGE_KEY_PREFIX.size());
4921 }
4922
4923 } // namespace trash
4924
4925 /**
4926  * Add an image entry to the rbd trash. Creates the trash object if
4927  * needed, and stores the trash spec information of the deleted image.
4928  *
4929  * Input:
4930  * @param id the id of the image
4931  * @param trash_spec the spec info of the deleted image
4932  *
4933  * Output:
4934  * @returns -EEXIST if the image id is already in the trash
4935  * @returns 0 on success, negative error code on failure
4936  */
4937 int trash_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4938 {
4939   int r = cls_cxx_create(hctx, false);
4940   if (r < 0) {
4941     CLS_ERR("could not create trash: %s", cpp_strerror(r).c_str());
4942     return r;
4943   }
4944
4945   string id;
4946   cls::rbd::TrashImageSpec trash_spec;
4947   try {
4948     bufferlist::iterator iter = in->begin();
4949     ::decode(id, iter);
4950     ::decode(trash_spec, iter);
4951   } catch (const buffer::error &err) {
4952     return -EINVAL;
4953   }
4954
4955   if (!is_valid_id(id)) {
4956     CLS_ERR("trash_add: invalid id '%s'", id.c_str());
4957     return -EINVAL;
4958   }
4959
4960   CLS_LOG(20, "trash_add id=%s", id.c_str());
4961
4962   string key = trash::image_key(id);
4963   cls::rbd::TrashImageSpec tmp;
4964   r = read_key(hctx, key, &tmp);
4965   if (r < 0 && r != -ENOENT) {
4966     CLS_ERR("could not read key %s entry from trash: %s", key.c_str(),
4967             cpp_strerror(r).c_str());
4968     return r;
4969   } else if (r == 0) {
4970     CLS_LOG(10, "id already exists");
4971     return -EEXIST;
4972   }
4973
4974   map<string, bufferlist> omap_vals;
4975   ::encode(trash_spec, omap_vals[key]);
4976   return cls_cxx_map_set_vals(hctx, &omap_vals);
4977 }
4978
4979 /**
4980  * Removes an image entry from the rbd trash object.
4981  * image.
4982  *
4983  * Input:
4984  * @param id the id of the image
4985  *
4986  * Output:
4987  * @returns -ENOENT if the image id does not exist in the trash
4988  * @returns 0 on success, negative error code on failure
4989  */
4990 int trash_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4991 {
4992   string id;
4993   try {
4994     bufferlist::iterator iter = in->begin();
4995     ::decode(id, iter);
4996   } catch (const buffer::error &err) {
4997     return -EINVAL;
4998   }
4999
5000   CLS_LOG(20, "trash_remove id=%s", id.c_str());
5001
5002   string key = trash::image_key(id);
5003   bufferlist tmp;
5004   int r = cls_cxx_map_get_val(hctx, key, &tmp);
5005   if (r < 0) {
5006     if (r != -ENOENT) {
5007       CLS_ERR("error reading entry key %s: %s", key.c_str(), cpp_strerror(r).c_str());
5008     }
5009     return r;
5010   }
5011
5012   r = cls_cxx_map_remove_key(hctx, key);
5013   if (r < 0) {
5014     CLS_ERR("error removing entry: %s", cpp_strerror(r).c_str());
5015     return r;
5016   }
5017
5018   return 0;
5019 }
5020
5021 /**
5022  * Returns the list of trash spec entries registered in the rbd_trash
5023  * object.
5024  *
5025  * Input:
5026  * @param start_after which name to begin listing after
5027  *        (use the empty string to start at the beginning)
5028  * @param max_return the maximum number of names to list
5029  *
5030  * Output:
5031  * @param data the map between image id and trash spec info
5032  *
5033  * @returns 0 on success, negative error code on failure
5034  */
5035 int trash_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
5036 {
5037   string start_after;
5038   uint64_t max_return;
5039
5040   try {
5041     bufferlist::iterator iter = in->begin();
5042     ::decode(start_after, iter);
5043     ::decode(max_return, iter);
5044   } catch (const buffer::error &err) {
5045     return -EINVAL;
5046   }
5047
5048   map<string, cls::rbd::TrashImageSpec> data;
5049   string last_read = trash::image_key(start_after);
5050   bool more = true;
5051
5052   CLS_LOG(20, "trash_get_images");
5053   while (data.size() < max_return) {
5054     map<string, bufferlist> raw_data;
5055     int max_read = std::min<int32_t>(RBD_MAX_KEYS_READ,
5056                                      max_return - data.size());
5057     int r = cls_cxx_map_get_vals(hctx, last_read, trash::IMAGE_KEY_PREFIX,
5058                                  max_read, &raw_data, &more);
5059     if (r < 0) {
5060       CLS_ERR("failed to read the vals off of disk: %s",
5061               cpp_strerror(r).c_str());
5062       return r;
5063     }
5064     if (raw_data.empty()) {
5065       break;
5066     }
5067
5068     map<string, bufferlist>::iterator it = raw_data.begin();
5069     for (; it != raw_data.end(); ++it) {
5070       ::decode(data[trash::image_id_from_key(it->first)], it->second);
5071     }
5072
5073     if (!more) {
5074       break;
5075     }
5076
5077     last_read = raw_data.rbegin()->first;
5078   }
5079
5080   ::encode(data, *out);
5081   return 0;
5082 }
5083
5084 /**
5085  * Returns the trash spec entry of an image registered in the rbd_trash
5086  * object.
5087  *
5088  * Input:
5089  * @param id the id of the image
5090  *
5091  * Output:
5092  * @param out the trash spec entry
5093  *
5094  * @returns 0 on success, negative error code on failure
5095  */
5096 int trash_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
5097 {
5098   string id;
5099   try {
5100     bufferlist::iterator iter = in->begin();
5101     ::decode(id, iter);
5102   } catch (const buffer::error &err) {
5103     return -EINVAL;
5104   }
5105
5106   CLS_LOG(20, "trash_get_image id=%s", id.c_str());
5107
5108
5109   string key = trash::image_key(id);
5110   bufferlist bl;
5111   int r = cls_cxx_map_get_val(hctx, key, out);
5112   if (r != -ENOENT) {
5113     CLS_ERR("error reading image from trash '%s': '%s'", id.c_str(),
5114             cpp_strerror(r).c_str());
5115   }
5116   return r;
5117 }
5118
5119 CLS_INIT(rbd)
5120 {
5121   CLS_LOG(20, "Loaded rbd class!");
5122
5123   cls_handle_t h_class;
5124   cls_method_handle_t h_create;
5125   cls_method_handle_t h_get_features;
5126   cls_method_handle_t h_set_features;
5127   cls_method_handle_t h_get_size;
5128   cls_method_handle_t h_set_size;
5129   cls_method_handle_t h_get_parent;
5130   cls_method_handle_t h_set_parent;
5131   cls_method_handle_t h_get_protection_status;
5132   cls_method_handle_t h_set_protection_status;
5133   cls_method_handle_t h_get_stripe_unit_count;
5134   cls_method_handle_t h_set_stripe_unit_count;
5135   cls_method_handle_t h_get_create_timestamp;
5136   cls_method_handle_t h_get_flags;
5137   cls_method_handle_t h_set_flags;
5138   cls_method_handle_t h_remove_parent;
5139   cls_method_handle_t h_add_child;
5140   cls_method_handle_t h_remove_child;
5141   cls_method_handle_t h_get_children;
5142   cls_method_handle_t h_get_snapcontext;
5143   cls_method_handle_t h_get_object_prefix;
5144   cls_method_handle_t h_get_data_pool;
5145   cls_method_handle_t h_get_snapshot_name;
5146   cls_method_handle_t h_get_snapshot_namespace;
5147   cls_method_handle_t h_get_snapshot_timestamp;
5148   cls_method_handle_t h_snapshot_add;
5149   cls_method_handle_t h_snapshot_remove;
5150   cls_method_handle_t h_snapshot_rename;
5151   cls_method_handle_t h_get_all_features;
5152   cls_method_handle_t h_copyup;
5153   cls_method_handle_t h_get_id;
5154   cls_method_handle_t h_set_id;
5155   cls_method_handle_t h_dir_get_id;
5156   cls_method_handle_t h_dir_get_name;
5157   cls_method_handle_t h_dir_list;
5158   cls_method_handle_t h_dir_add_image;
5159   cls_method_handle_t h_dir_remove_image;
5160   cls_method_handle_t h_dir_rename_image;
5161   cls_method_handle_t h_object_map_load;
5162   cls_method_handle_t h_object_map_save;
5163   cls_method_handle_t h_object_map_resize;
5164   cls_method_handle_t h_object_map_update;
5165   cls_method_handle_t h_object_map_snap_add;
5166   cls_method_handle_t h_object_map_snap_remove;
5167   cls_method_handle_t h_metadata_set;
5168   cls_method_handle_t h_metadata_remove;
5169   cls_method_handle_t h_metadata_list;
5170   cls_method_handle_t h_metadata_get;
5171   cls_method_handle_t h_snapshot_get_limit;
5172   cls_method_handle_t h_snapshot_set_limit;
5173   cls_method_handle_t h_old_snapshots_list;
5174   cls_method_handle_t h_old_snapshot_add;
5175   cls_method_handle_t h_old_snapshot_remove;
5176   cls_method_handle_t h_old_snapshot_rename;
5177   cls_method_handle_t h_mirror_uuid_get;
5178   cls_method_handle_t h_mirror_uuid_set;
5179   cls_method_handle_t h_mirror_mode_get;
5180   cls_method_handle_t h_mirror_mode_set;
5181   cls_method_handle_t h_mirror_peer_list;
5182   cls_method_handle_t h_mirror_peer_add;
5183   cls_method_handle_t h_mirror_peer_remove;
5184   cls_method_handle_t h_mirror_peer_set_client;
5185   cls_method_handle_t h_mirror_peer_set_cluster;
5186   cls_method_handle_t h_mirror_image_list;
5187   cls_method_handle_t h_mirror_image_get_image_id;
5188   cls_method_handle_t h_mirror_image_get;
5189   cls_method_handle_t h_mirror_image_set;
5190   cls_method_handle_t h_mirror_image_remove;
5191   cls_method_handle_t h_mirror_image_status_set;
5192   cls_method_handle_t h_mirror_image_status_remove;
5193   cls_method_handle_t h_mirror_image_status_get;
5194   cls_method_handle_t h_mirror_image_status_list;
5195   cls_method_handle_t h_mirror_image_status_get_summary;
5196   cls_method_handle_t h_mirror_image_status_remove_down;
5197   cls_method_handle_t h_mirror_instances_list;
5198   cls_method_handle_t h_mirror_instances_add;
5199   cls_method_handle_t h_mirror_instances_remove;
5200   cls_method_handle_t h_group_create;
5201   cls_method_handle_t h_group_dir_list;
5202   cls_method_handle_t h_group_dir_add;
5203   cls_method_handle_t h_group_dir_remove;
5204   cls_method_handle_t h_group_image_remove;
5205   cls_method_handle_t h_group_image_list;
5206   cls_method_handle_t h_group_image_set;
5207   cls_method_handle_t h_image_add_group;
5208   cls_method_handle_t h_image_remove_group;
5209   cls_method_handle_t h_image_get_group;
5210   cls_method_handle_t h_trash_add;
5211   cls_method_handle_t h_trash_remove;
5212   cls_method_handle_t h_trash_list;
5213   cls_method_handle_t h_trash_get;
5214
5215   cls_register("rbd", &h_class);
5216   cls_register_cxx_method(h_class, "create",
5217                           CLS_METHOD_RD | CLS_METHOD_WR,
5218                           create, &h_create);
5219   cls_register_cxx_method(h_class, "get_features",
5220                           CLS_METHOD_RD,
5221                           get_features, &h_get_features);
5222   cls_register_cxx_method(h_class, "set_features",
5223                           CLS_METHOD_RD | CLS_METHOD_WR,
5224                           set_features, &h_set_features);
5225   cls_register_cxx_method(h_class, "get_size",
5226                           CLS_METHOD_RD,
5227                           get_size, &h_get_size);
5228   cls_register_cxx_method(h_class, "set_size",
5229                           CLS_METHOD_RD | CLS_METHOD_WR,
5230                           set_size, &h_set_size);
5231   cls_register_cxx_method(h_class, "get_snapcontext",
5232                           CLS_METHOD_RD,
5233                           get_snapcontext, &h_get_snapcontext);
5234   cls_register_cxx_method(h_class, "get_object_prefix",
5235                           CLS_METHOD_RD,
5236                           get_object_prefix, &h_get_object_prefix);
5237   cls_register_cxx_method(h_class, "get_data_pool", CLS_METHOD_RD,
5238                           get_data_pool, &h_get_data_pool);
5239   cls_register_cxx_method(h_class, "get_snapshot_name",
5240                           CLS_METHOD_RD,
5241                           get_snapshot_name, &h_get_snapshot_name);
5242   cls_register_cxx_method(h_class, "get_snapshot_namespace",
5243                           CLS_METHOD_RD,
5244                           get_snapshot_namespace, &h_get_snapshot_namespace);
5245   cls_register_cxx_method(h_class, "get_snapshot_timestamp",
5246                           CLS_METHOD_RD,
5247                           get_snapshot_timestamp, &h_get_snapshot_timestamp);
5248   cls_register_cxx_method(h_class, "snapshot_add",
5249                           CLS_METHOD_RD | CLS_METHOD_WR,
5250                           snapshot_add, &h_snapshot_add);
5251   cls_register_cxx_method(h_class, "snapshot_remove",
5252                           CLS_METHOD_RD | CLS_METHOD_WR,
5253                           snapshot_remove, &h_snapshot_remove);
5254   cls_register_cxx_method(h_class, "snapshot_rename",
5255                           CLS_METHOD_RD | CLS_METHOD_WR,
5256                           snapshot_rename, &h_snapshot_rename);
5257   cls_register_cxx_method(h_class, "get_all_features",
5258                           CLS_METHOD_RD,
5259                           get_all_features, &h_get_all_features);
5260   cls_register_cxx_method(h_class, "copyup",
5261                           CLS_METHOD_RD | CLS_METHOD_WR,
5262                           copyup, &h_copyup);
5263   cls_register_cxx_method(h_class, "get_parent",
5264                           CLS_METHOD_RD,
5265                           get_parent, &h_get_parent);
5266   cls_register_cxx_method(h_class, "set_parent",
5267                           CLS_METHOD_RD | CLS_METHOD_WR,
5268                           set_parent, &h_set_parent);
5269   cls_register_cxx_method(h_class, "remove_parent",
5270                           CLS_METHOD_RD | CLS_METHOD_WR,
5271                           remove_parent, &h_remove_parent);
5272   cls_register_cxx_method(h_class, "set_protection_status",
5273                           CLS_METHOD_RD | CLS_METHOD_WR,
5274                           set_protection_status, &h_set_protection_status);
5275   cls_register_cxx_method(h_class, "get_protection_status",
5276                           CLS_METHOD_RD,
5277                           get_protection_status, &h_get_protection_status);
5278   cls_register_cxx_method(h_class, "get_stripe_unit_count",
5279                           CLS_METHOD_RD,
5280                           get_stripe_unit_count, &h_get_stripe_unit_count);
5281   cls_register_cxx_method(h_class, "set_stripe_unit_count",
5282                           CLS_METHOD_RD | CLS_METHOD_WR,
5283                           set_stripe_unit_count, &h_set_stripe_unit_count);
5284   cls_register_cxx_method(h_class, "get_create_timestamp",
5285                           CLS_METHOD_RD,
5286                           get_create_timestamp, &h_get_create_timestamp);
5287   cls_register_cxx_method(h_class, "get_flags",
5288                           CLS_METHOD_RD,
5289                           get_flags, &h_get_flags);
5290   cls_register_cxx_method(h_class, "set_flags",
5291                           CLS_METHOD_RD | CLS_METHOD_WR,
5292                           set_flags, &h_set_flags);
5293   cls_register_cxx_method(h_class, "metadata_list",
5294                           CLS_METHOD_RD,
5295                           metadata_list, &h_metadata_list);
5296   cls_register_cxx_method(h_class, "metadata_set",
5297                           CLS_METHOD_RD | CLS_METHOD_WR,
5298                           metadata_set, &h_metadata_set);
5299   cls_register_cxx_method(h_class, "metadata_remove",
5300                           CLS_METHOD_RD | CLS_METHOD_WR,
5301                           metadata_remove, &h_metadata_remove);
5302   cls_register_cxx_method(h_class, "metadata_get",
5303                           CLS_METHOD_RD,
5304                           metadata_get, &h_metadata_get);
5305   cls_register_cxx_method(h_class, "snapshot_get_limit",
5306                           CLS_METHOD_RD,
5307                           snapshot_get_limit, &h_snapshot_get_limit);
5308   cls_register_cxx_method(h_class, "snapshot_set_limit",
5309                           CLS_METHOD_WR,
5310                           snapshot_set_limit, &h_snapshot_set_limit);
5311
5312   /* methods for the rbd_children object */
5313   cls_register_cxx_method(h_class, "add_child",
5314                           CLS_METHOD_RD | CLS_METHOD_WR,
5315                           add_child, &h_add_child);
5316   cls_register_cxx_method(h_class, "remove_child",
5317                           CLS_METHOD_RD | CLS_METHOD_WR,
5318                           remove_child, &h_remove_child);
5319   cls_register_cxx_method(h_class, "get_children",
5320                           CLS_METHOD_RD,
5321                           get_children, &h_get_children);
5322
5323   /* methods for the rbd_id.$image_name objects */
5324   cls_register_cxx_method(h_class, "get_id",
5325                           CLS_METHOD_RD,
5326                           get_id, &h_get_id);
5327   cls_register_cxx_method(h_class, "set_id",
5328                           CLS_METHOD_RD | CLS_METHOD_WR,
5329                           set_id, &h_set_id);
5330
5331   /* methods for the rbd_directory object */
5332   cls_register_cxx_method(h_class, "dir_get_id",
5333                           CLS_METHOD_RD,
5334                           dir_get_id, &h_dir_get_id);
5335   cls_register_cxx_method(h_class, "dir_get_name",
5336                           CLS_METHOD_RD,
5337                           dir_get_name, &h_dir_get_name);
5338   cls_register_cxx_method(h_class, "dir_list",
5339                           CLS_METHOD_RD,
5340                           dir_list, &h_dir_list);
5341   cls_register_cxx_method(h_class, "dir_add_image",
5342                           CLS_METHOD_RD | CLS_METHOD_WR,
5343                           dir_add_image, &h_dir_add_image);
5344   cls_register_cxx_method(h_class, "dir_remove_image",
5345                           CLS_METHOD_RD | CLS_METHOD_WR,
5346                           dir_remove_image, &h_dir_remove_image);
5347   cls_register_cxx_method(h_class, "dir_rename_image",
5348                           CLS_METHOD_RD | CLS_METHOD_WR,
5349                           dir_rename_image, &h_dir_rename_image);
5350
5351   /* methods for the rbd_object_map.$image_id object */
5352   cls_register_cxx_method(h_class, "object_map_load",
5353                           CLS_METHOD_RD,
5354                           object_map_load, &h_object_map_load);
5355   cls_register_cxx_method(h_class, "object_map_save",
5356                           CLS_METHOD_RD | CLS_METHOD_WR,
5357                           object_map_save, &h_object_map_save);
5358   cls_register_cxx_method(h_class, "object_map_resize",
5359                           CLS_METHOD_RD | CLS_METHOD_WR,
5360                           object_map_resize, &h_object_map_resize);
5361   cls_register_cxx_method(h_class, "object_map_update",
5362                           CLS_METHOD_RD | CLS_METHOD_WR,
5363                           object_map_update, &h_object_map_update);
5364   cls_register_cxx_method(h_class, "object_map_snap_add",
5365                           CLS_METHOD_RD | CLS_METHOD_WR,
5366                           object_map_snap_add, &h_object_map_snap_add);
5367   cls_register_cxx_method(h_class, "object_map_snap_remove",
5368                           CLS_METHOD_RD | CLS_METHOD_WR,
5369                           object_map_snap_remove, &h_object_map_snap_remove);
5370
5371  /* methods for the old format */
5372   cls_register_cxx_method(h_class, "snap_list",
5373                           CLS_METHOD_RD,
5374                           old_snapshots_list, &h_old_snapshots_list);
5375   cls_register_cxx_method(h_class, "snap_add",
5376                           CLS_METHOD_RD | CLS_METHOD_WR,
5377                           old_snapshot_add, &h_old_snapshot_add);
5378   cls_register_cxx_method(h_class, "snap_remove",
5379                           CLS_METHOD_RD | CLS_METHOD_WR,
5380                           old_snapshot_remove, &h_old_snapshot_remove);
5381   cls_register_cxx_method(h_class, "snap_rename",
5382                           CLS_METHOD_RD | CLS_METHOD_WR,
5383                           old_snapshot_rename, &h_old_snapshot_rename);
5384
5385   /* methods for the rbd_mirroring object */
5386   cls_register_cxx_method(h_class, "mirror_uuid_get", CLS_METHOD_RD,
5387                           mirror_uuid_get, &h_mirror_uuid_get);
5388   cls_register_cxx_method(h_class, "mirror_uuid_set",
5389                           CLS_METHOD_RD | CLS_METHOD_WR,
5390                           mirror_uuid_set, &h_mirror_uuid_set);
5391   cls_register_cxx_method(h_class, "mirror_mode_get", CLS_METHOD_RD,
5392                           mirror_mode_get, &h_mirror_mode_get);
5393   cls_register_cxx_method(h_class, "mirror_mode_set",
5394                           CLS_METHOD_RD | CLS_METHOD_WR,
5395                           mirror_mode_set, &h_mirror_mode_set);
5396   cls_register_cxx_method(h_class, "mirror_peer_list", CLS_METHOD_RD,
5397                           mirror_peer_list, &h_mirror_peer_list);
5398   cls_register_cxx_method(h_class, "mirror_peer_add",
5399                           CLS_METHOD_RD | CLS_METHOD_WR,
5400                           mirror_peer_add, &h_mirror_peer_add);
5401   cls_register_cxx_method(h_class, "mirror_peer_remove",
5402                           CLS_METHOD_RD | CLS_METHOD_WR,
5403                           mirror_peer_remove, &h_mirror_peer_remove);
5404   cls_register_cxx_method(h_class, "mirror_peer_set_client",
5405                           CLS_METHOD_RD | CLS_METHOD_WR,
5406                           mirror_peer_set_client, &h_mirror_peer_set_client);
5407   cls_register_cxx_method(h_class, "mirror_peer_set_cluster",
5408                           CLS_METHOD_RD | CLS_METHOD_WR,
5409                           mirror_peer_set_cluster, &h_mirror_peer_set_cluster);
5410   cls_register_cxx_method(h_class, "mirror_image_list", CLS_METHOD_RD,
5411                           mirror_image_list, &h_mirror_image_list);
5412   cls_register_cxx_method(h_class, "mirror_image_get_image_id", CLS_METHOD_RD,
5413                           mirror_image_get_image_id,
5414                           &h_mirror_image_get_image_id);
5415   cls_register_cxx_method(h_class, "mirror_image_get", CLS_METHOD_RD,
5416                           mirror_image_get, &h_mirror_image_get);
5417   cls_register_cxx_method(h_class, "mirror_image_set",
5418                           CLS_METHOD_RD | CLS_METHOD_WR,
5419                           mirror_image_set, &h_mirror_image_set);
5420   cls_register_cxx_method(h_class, "mirror_image_remove",
5421                           CLS_METHOD_RD | CLS_METHOD_WR,
5422                           mirror_image_remove, &h_mirror_image_remove);
5423   cls_register_cxx_method(h_class, "mirror_image_status_set",
5424                           CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PROMOTE,
5425                           mirror_image_status_set, &h_mirror_image_status_set);
5426   cls_register_cxx_method(h_class, "mirror_image_status_remove",
5427                           CLS_METHOD_RD | CLS_METHOD_WR,
5428                           mirror_image_status_remove,
5429                           &h_mirror_image_status_remove);
5430   cls_register_cxx_method(h_class, "mirror_image_status_get", CLS_METHOD_RD,
5431                           mirror_image_status_get, &h_mirror_image_status_get);
5432   cls_register_cxx_method(h_class, "mirror_image_status_list", CLS_METHOD_RD,
5433                           mirror_image_status_list,
5434                           &h_mirror_image_status_list);
5435   cls_register_cxx_method(h_class, "mirror_image_status_get_summary",
5436                           CLS_METHOD_RD, mirror_image_status_get_summary,
5437                           &h_mirror_image_status_get_summary);
5438   cls_register_cxx_method(h_class, "mirror_image_status_remove_down",
5439                           CLS_METHOD_RD | CLS_METHOD_WR,
5440                           mirror_image_status_remove_down,
5441                           &h_mirror_image_status_remove_down);
5442   cls_register_cxx_method(h_class, "mirror_instances_list", CLS_METHOD_RD,
5443                           mirror_instances_list, &h_mirror_instances_list);
5444   cls_register_cxx_method(h_class, "mirror_instances_add",
5445                           CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PROMOTE,
5446                           mirror_instances_add, &h_mirror_instances_add);
5447   cls_register_cxx_method(h_class, "mirror_instances_remove",
5448                           CLS_METHOD_RD | CLS_METHOD_WR,
5449                           mirror_instances_remove,
5450                           &h_mirror_instances_remove);
5451   /* methods for the consistency groups feature */
5452   cls_register_cxx_method(h_class, "group_create",
5453                           CLS_METHOD_RD | CLS_METHOD_WR,
5454                           group_create, &h_group_create);
5455   cls_register_cxx_method(h_class, "group_dir_list",
5456                           CLS_METHOD_RD,
5457                           group_dir_list, &h_group_dir_list);
5458   cls_register_cxx_method(h_class, "group_dir_add",
5459                           CLS_METHOD_RD | CLS_METHOD_WR,
5460                           group_dir_add, &h_group_dir_add);
5461   cls_register_cxx_method(h_class, "group_dir_remove",
5462                           CLS_METHOD_RD | CLS_METHOD_WR,
5463                           group_dir_remove, &h_group_dir_remove);
5464   cls_register_cxx_method(h_class, "group_image_remove",
5465                           CLS_METHOD_RD | CLS_METHOD_WR,
5466                           group_image_remove, &h_group_image_remove);
5467   cls_register_cxx_method(h_class, "group_image_list",
5468                           CLS_METHOD_RD | CLS_METHOD_WR,
5469                           group_image_list, &h_group_image_list);
5470   cls_register_cxx_method(h_class, "group_image_set",
5471                           CLS_METHOD_RD | CLS_METHOD_WR,
5472                           group_image_set, &h_group_image_set);
5473   cls_register_cxx_method(h_class, "image_add_group",
5474                           CLS_METHOD_RD | CLS_METHOD_WR,
5475                           image_add_group, &h_image_add_group);
5476   cls_register_cxx_method(h_class, "image_remove_group",
5477                           CLS_METHOD_RD | CLS_METHOD_WR,
5478                           image_remove_group, &h_image_remove_group);
5479   cls_register_cxx_method(h_class, "image_get_group",
5480                           CLS_METHOD_RD,
5481                           image_get_group, &h_image_get_group);
5482
5483   /* rbd_trash object methods */
5484   cls_register_cxx_method(h_class, "trash_add",
5485                           CLS_METHOD_RD | CLS_METHOD_WR,
5486                           trash_add, &h_trash_add);
5487   cls_register_cxx_method(h_class, "trash_remove",
5488                           CLS_METHOD_RD | CLS_METHOD_WR,
5489                           trash_remove, &h_trash_remove);
5490   cls_register_cxx_method(h_class, "trash_list",
5491                           CLS_METHOD_RD,
5492                           trash_list, &h_trash_list);
5493   cls_register_cxx_method(h_class, "trash_get",
5494                           CLS_METHOD_RD,
5495                           trash_get, &h_trash_get);
5496
5497   return;
5498 }