1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 * This is an OSD class that implements methods for
9 * Most of these deal with the rbd header object. Methods prefixed
10 * with old_ deal with the original rbd design, in which clients read
11 * and interpreted the header object directly.
13 * The new format is meant to be opaque to clients - all their
14 * interactions with non-data objects should go through this
15 * class. The OSD class interface leaves the class to implement its
16 * own argument and payload serialization/deserialization, so for ease
17 * of implementation we use the existing ceph encoding/decoding
18 * methods. Something like json might be preferable, but the rbd
19 * kernel module has to be able to understand format as well. The
20 * datatypes exposed to the clients are strings, unsigned integers,
21 * and vectors of those types. The on-wire format can be found in
22 * src/include/encoding.h.
24 * The methods for interacting with the new format document their
25 * parameters as the client sees them - it would be silly to mention
26 * in each one that they take an input and an output bufferlist.
28 #include "include/types.h"
34 #include "common/bit_vector.hpp"
35 #include "common/errno.h"
36 #include "objclass/objclass.h"
37 #include "osd/osd_types.h"
38 #include "include/rbd_types.h"
39 #include "include/rbd/object_map_types.h"
41 #include "cls/rbd/cls_rbd.h"
42 #include "cls/rbd/cls_rbd_types.h"
50 * stripe_unit: size in bytes of the stripe unit. if not present,
51 * the stripe unit is assumed to match the object size (1 << order).
53 * stripe_count: number of objects to stripe over before looping back.
54 * if not present or 1, striping is disabled. this is the default.
61 #define RBD_MAX_KEYS_READ 64
62 #define RBD_SNAP_KEY_PREFIX "snapshot_"
63 #define RBD_DIR_ID_KEY_PREFIX "id_"
64 #define RBD_DIR_NAME_KEY_PREFIX "name_"
65 #define RBD_METADATA_KEY_PREFIX "metadata_"
67 #define GROUP_SNAP_SEQ "snap_seq"
69 static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
71 unsigned snap_count = 0;
72 uint64_t snap_names_len = 0;
73 struct rbd_obj_header_ondisk *header;
75 CLS_LOG(20, "snapshots_list");
78 int len = sizeof(*header) +
79 snap_count * sizeof(struct rbd_obj_snap_ondisk) +
82 int rc = cls_cxx_read(hctx, 0, len, &bl);
86 if (bl.length() < sizeof(*header))
89 header = (struct rbd_obj_header_ondisk *)bl.c_str();
92 if ((snap_count != header->snap_count) ||
93 (snap_names_len != header->snap_names_len)) {
94 snap_count = header->snap_count;
95 snap_names_len = header->snap_names_len;
105 static void key_from_snap_id(snapid_t snap_id, string *out)
108 oss << RBD_SNAP_KEY_PREFIX
109 << std::setw(16) << std::setfill('0') << std::hex << snap_id;
113 static snapid_t snap_id_from_key(const string &key)
115 istringstream iss(key);
117 iss.ignore(strlen(RBD_SNAP_KEY_PREFIX)) >> std::hex >> id;
122 static int read_key(cls_method_context_t hctx, const string &key, T *out)
125 int r = cls_cxx_map_get_val(hctx, key, &bl);
128 CLS_ERR("error reading omap key %s: %s", key.c_str(), cpp_strerror(r).c_str());
134 bufferlist::iterator it = bl.begin();
136 } catch (const buffer::error &err) {
137 CLS_ERR("error decoding %s", key.c_str());
144 static int remove_key(cls_method_context_t hctx, const string &key) {
145 int r = cls_cxx_map_remove_key(hctx, key);
146 if (r < 0 && r != -ENOENT) {
147 CLS_ERR("failed to remove key: %s", key.c_str());
153 static bool is_valid_id(const string &id) {
156 for (size_t i = 0; i < id.size(); ++i) {
157 if (!isalnum(id[i])) {
165 * Initialize the header with basic metadata.
166 * Extra features may initialize more fields in the future.
167 * Everything is stored as key/value pairs as omaps in the header object.
169 * If features the OSD does not understand are requested, -ENOSYS is
173 * @param size number of bytes in the image (uint64_t)
174 * @param order bits to shift to determine the size of data objects (uint8_t)
175 * @param features what optional things this image will use (uint64_t)
176 * @param object_prefix a prefix for all the data objects
177 * @param data_pool_id pool id where data objects is stored (int64_t)
180 * @return 0 on success, negative error code on failure
182 int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
184 string object_prefix;
185 uint64_t features, size;
187 int64_t data_pool_id = -1;
190 bufferlist::iterator iter = in->begin();
191 ::decode(size, iter);
192 ::decode(order, iter);
193 ::decode(features, iter);
194 ::decode(object_prefix, iter);
196 ::decode(data_pool_id, iter);
198 } catch (const buffer::error &err) {
202 CLS_LOG(20, "create object_prefix=%s size=%llu order=%u features=%llu",
203 object_prefix.c_str(), (unsigned long long)size, order,
204 (unsigned long long)features);
206 if (features & ~RBD_FEATURES_ALL) {
210 if (!object_prefix.size()) {
214 bufferlist stored_prefixbl;
215 int r = cls_cxx_map_get_val(hctx, "object_prefix", &stored_prefixbl);
217 CLS_ERR("reading object_prefix returned %d", r);
223 bufferlist featuresbl;
224 bufferlist object_prefixbl;
225 bufferlist snap_seqbl;
226 bufferlist create_timestampbl;
227 uint64_t snap_seq = 0;
228 utime_t create_timestamp = ceph_clock_now();
229 ::encode(size, sizebl);
230 ::encode(order, orderbl);
231 ::encode(features, featuresbl);
232 ::encode(object_prefix, object_prefixbl);
233 ::encode(snap_seq, snap_seqbl);
234 ::encode(create_timestamp, create_timestampbl);
236 map<string, bufferlist> omap_vals;
237 omap_vals["size"] = sizebl;
238 omap_vals["order"] = orderbl;
239 omap_vals["features"] = featuresbl;
240 omap_vals["object_prefix"] = object_prefixbl;
241 omap_vals["snap_seq"] = snap_seqbl;
242 omap_vals["create_timestamp"] = create_timestampbl;
244 if (features & RBD_FEATURE_DATA_POOL) {
245 if (data_pool_id == -1) {
246 CLS_ERR("data pool not provided with feature enabled");
250 bufferlist data_pool_id_bl;
251 ::encode(data_pool_id, data_pool_id_bl);
252 omap_vals["data_pool_id"] = data_pool_id_bl;
253 } else if (data_pool_id != -1) {
254 CLS_ERR("data pool provided with feature disabled");
258 r = cls_cxx_map_set_vals(hctx, &omap_vals);
267 * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t) (deprecated)
268 * @param read_only true if the image will be used read-only (bool)
271 * @param features list of enabled features for the given snapshot (uint64_t)
272 * @param incompatible incompatible feature bits
273 * @returns 0 on success, negative error code on failure
275 int get_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
278 bool read_only = false;
280 bufferlist::iterator iter = in->begin();
282 ::decode(snap_id, iter);
284 ::decode(read_only, iter);
286 } catch (const buffer::error &err) {
290 CLS_LOG(20, "get_features snap_id=%" PRIu64 ", read_only=%d",
293 // NOTE: keep this deprecated snapshot logic to support negative
294 // test cases in older (pre-Infernalis) releases. Remove once older
295 // releases are no longer supported.
296 if (snap_id != CEPH_NOSNAP) {
299 key_from_snap_id(snap_id, &snapshot_key);
300 int r = read_key(hctx, snapshot_key, &snap);
307 int r = read_key(hctx, "features", &features);
309 CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
313 uint64_t incompatible = (read_only ? features & RBD_FEATURES_INCOMPATIBLE :
314 features & RBD_FEATURES_RW_INCOMPATIBLE);
315 ::encode(features, *out);
316 ::encode(incompatible, *out);
321 * set the image features
324 * @param features image features
325 * @param mask image feature mask
330 * @returns 0 on success, negative error code upon failure
332 int set_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
336 bufferlist::iterator iter = in->begin();
338 ::decode(features, iter);
339 ::decode(mask, iter);
340 } catch (const buffer::error &err) {
344 // check that features exists to make sure this is a header object
345 // that was created correctly
346 uint64_t orig_features = 0;
347 int r = read_key(hctx, "features", &orig_features);
348 if (r < 0 && r != -ENOENT) {
349 CLS_ERR("Could not read image's features off disk: %s",
350 cpp_strerror(r).c_str());
354 // newer clients might attempt to mask off features we don't support
355 mask &= RBD_FEATURES_ALL;
357 uint64_t enabled_features = features & mask;
358 if ((enabled_features & RBD_FEATURES_MUTABLE) != enabled_features) {
359 CLS_ERR("Attempting to enable immutable feature: %" PRIu64,
360 static_cast<uint64_t>(enabled_features & ~RBD_FEATURES_MUTABLE));
364 uint64_t disabled_features = ~features & mask;
365 uint64_t disable_mask = (RBD_FEATURES_MUTABLE | RBD_FEATURES_DISABLE_ONLY);
366 if ((disabled_features & disable_mask) != disabled_features) {
367 CLS_ERR("Attempting to disable immutable feature: %" PRIu64,
368 enabled_features & ~disable_mask);
372 features = (orig_features & ~mask) | (features & mask);
373 CLS_LOG(10, "set_features features=%" PRIu64 " orig_features=%" PRIu64,
374 features, orig_features);
377 ::encode(features, bl);
378 r = cls_cxx_map_set_val(hctx, "features", &bl);
380 CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
387 * check that given feature(s) are set
389 * @param hctx context
390 * @param need features needed
391 * @return 0 if features are set, negative error (like ENOEXEC) otherwise
393 int require_feature(cls_method_context_t hctx, uint64_t need)
396 int r = read_key(hctx, "features", &features);
397 if (r == -ENOENT) // this implies it's an old-style image with no features
401 if ((features & need) != need) {
402 CLS_LOG(10, "require_feature missing feature %llx, have %llx",
403 (unsigned long long)need, (unsigned long long)features);
411 * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
414 * @param order bits to shift to get the size of data objects (uint8_t)
415 * @param size size of the image in bytes for the given snapshot (uint64_t)
416 * @returns 0 on success, negative error code on failure
418 int get_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
420 uint64_t snap_id, size;
423 bufferlist::iterator iter = in->begin();
425 ::decode(snap_id, iter);
426 } catch (const buffer::error &err) {
430 CLS_LOG(20, "get_size snap_id=%llu", (unsigned long long)snap_id);
432 int r = read_key(hctx, "order", &order);
434 CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
438 if (snap_id == CEPH_NOSNAP) {
439 r = read_key(hctx, "size", &size);
441 CLS_ERR("failed to read the image's size off of disk: %s", cpp_strerror(r).c_str());
447 key_from_snap_id(snap_id, &snapshot_key);
448 int r = read_key(hctx, snapshot_key, &snap);
452 size = snap.image_size;
455 ::encode(order, *out);
456 ::encode(size, *out);
463 * @param size new capacity of the image in bytes (uint64_t)
466 * @returns 0 on success, negative error code on failure
468 int set_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
472 bufferlist::iterator iter = in->begin();
474 ::decode(size, iter);
475 } catch (const buffer::error &err) {
479 // check that size exists to make sure this is a header object
480 // that was created correctly
482 int r = read_key(hctx, "size", &orig_size);
484 CLS_ERR("Could not read image's size off disk: %s", cpp_strerror(r).c_str());
488 CLS_LOG(20, "set_size size=%llu orig_size=%llu", (unsigned long long)size,
489 (unsigned long long)orig_size);
492 ::encode(size, sizebl);
493 r = cls_cxx_map_set_val(hctx, "size", &sizebl);
495 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
499 // if we are shrinking, and have a parent, shrink our overlap with
501 if (size < orig_size) {
502 cls_rbd_parent parent;
503 r = read_key(hctx, "parent", &parent);
508 if (parent.exists() && parent.overlap > size) {
510 parent.overlap = size;
511 ::encode(parent, parentbl);
512 r = cls_cxx_map_set_val(hctx, "parent", &parentbl);
514 CLS_ERR("error writing parent: %s", cpp_strerror(r).c_str());
524 * verify that the header object exists
526 * @return 0 if the object exists, -ENOENT if it does not, or other error
528 int check_exists(cls_method_context_t hctx)
532 return cls_cxx_stat(hctx, &size, &mtime);
536 * get the current protection status of the specified snapshot
539 * @param snap_id (uint64_t) which snapshot to get the status of
542 * @param status (uint8_t) one of:
543 * RBD_PROTECTION_STATUS_{PROTECTED, UNPROTECTED, UNPROTECTING}
545 * @returns 0 on success, negative error code on failure
546 * @returns -EINVAL if snapid is CEPH_NOSNAP
548 int get_protection_status(cls_method_context_t hctx, bufferlist *in,
553 bufferlist::iterator iter = in->begin();
555 ::decode(snap_id, iter);
556 } catch (const buffer::error &err) {
557 CLS_LOG(20, "get_protection_status: invalid decode");
561 int r = check_exists(hctx);
565 CLS_LOG(20, "get_protection_status snap_id=%llu",
566 (unsigned long long)snap_id.val);
568 if (snap_id == CEPH_NOSNAP)
573 key_from_snap_id(snap_id.val, &snapshot_key);
574 r = read_key(hctx, snapshot_key, &snap);
576 CLS_ERR("could not read key for snapshot id %" PRIu64, snap_id.val);
580 if (snap.protection_status >= RBD_PROTECTION_STATUS_LAST) {
581 CLS_ERR("invalid protection status for snap id %llu: %u",
582 (unsigned long long)snap_id.val, snap.protection_status);
586 ::encode(snap.protection_status, *out);
591 * set the proctection status of a snapshot
594 * @param snapid (uint64_t) which snapshot to set the status of
595 * @param status (uint8_t) one of:
596 * RBD_PROTECTION_STATUS_{PROTECTED, UNPROTECTED, UNPROTECTING}
598 * @returns 0 on success, negative error code on failure
599 * @returns -EINVAL if snapid is CEPH_NOSNAP
601 int set_protection_status(cls_method_context_t hctx, bufferlist *in,
607 bufferlist::iterator iter = in->begin();
609 ::decode(snap_id, iter);
610 ::decode(status, iter);
611 } catch (const buffer::error &err) {
612 CLS_LOG(20, "set_protection_status: invalid decode");
616 int r = check_exists(hctx);
620 r = require_feature(hctx, RBD_FEATURE_LAYERING);
622 CLS_LOG(20, "image does not support layering");
626 CLS_LOG(20, "set_protection_status snapid=%llu status=%u",
627 (unsigned long long)snap_id.val, status);
629 if (snap_id == CEPH_NOSNAP)
632 if (status >= RBD_PROTECTION_STATUS_LAST) {
633 CLS_LOG(10, "invalid protection status for snap id %llu: %u",
634 (unsigned long long)snap_id.val, status);
640 key_from_snap_id(snap_id.val, &snapshot_key);
641 r = read_key(hctx, snapshot_key, &snap);
643 CLS_ERR("could not read key for snapshot id %" PRIu64, snap_id.val);
647 snap.protection_status = status;
648 bufferlist snapshot_bl;
649 ::encode(snap, snapshot_bl);
650 r = cls_cxx_map_set_val(hctx, snapshot_key, &snapshot_bl);
652 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
660 * get striping parameters
666 * @param stripe unit (bytes)
667 * @param stripe count (num objects)
669 * @returns 0 on success
671 int get_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
673 int r = check_exists(hctx);
677 CLS_LOG(20, "get_stripe_unit_count");
679 r = require_feature(hctx, RBD_FEATURE_STRIPINGV2);
683 uint64_t stripe_unit = 0, stripe_count = 0;
684 r = read_key(hctx, "stripe_unit", &stripe_unit);
686 // default to object size
688 r = read_key(hctx, "order", &order);
690 CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
693 stripe_unit = 1ull << order;
697 r = read_key(hctx, "stripe_count", &stripe_count);
706 ::encode(stripe_unit, *out);
707 ::encode(stripe_count, *out);
712 * set striping parameters
715 * @param stripe unit (bytes)
716 * @param stripe count (num objects)
718 * @returns 0 on success
720 int set_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
722 uint64_t stripe_unit, stripe_count;
724 bufferlist::iterator iter = in->begin();
726 ::decode(stripe_unit, iter);
727 ::decode(stripe_count, iter);
728 } catch (const buffer::error &err) {
729 CLS_LOG(20, "set_stripe_unit_count: invalid decode");
733 if (!stripe_count || !stripe_unit)
736 int r = check_exists(hctx);
740 CLS_LOG(20, "set_stripe_unit_count");
742 r = require_feature(hctx, RBD_FEATURE_STRIPINGV2);
747 r = read_key(hctx, "order", &order);
749 CLS_ERR("failed to read the order off of disk: %s", cpp_strerror(r).c_str());
752 if ((1ull << order) % stripe_unit || stripe_unit > (1ull << order)) {
753 CLS_ERR("stripe unit %llu is not a factor of the object size %llu",
754 (unsigned long long)stripe_unit, 1ull << order);
759 ::encode(stripe_unit, bl);
760 r = cls_cxx_map_set_val(hctx, "stripe_unit", &bl);
762 CLS_ERR("error writing stripe_unit metadata: %s", cpp_strerror(r).c_str());
766 ::encode(stripe_count, bl2);
767 r = cls_cxx_map_set_val(hctx, "stripe_count", &bl2);
769 CLS_ERR("error writing stripe_count metadata: %s", cpp_strerror(r).c_str());
776 int get_create_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
778 CLS_LOG(20, "get_create_timestamp");
782 int r = cls_cxx_map_get_val(hctx, "create_timestamp", &bl);
785 CLS_ERR("error reading create_timestamp: %s", cpp_strerror(r).c_str());
790 bufferlist::iterator it = bl.begin();
791 ::decode(timestamp, it);
792 } catch (const buffer::error &err) {
793 CLS_ERR("could not decode create_timestamp");
798 ::encode(timestamp, *out);
803 * get the image flags
806 * @param snap_id which snapshot to query, to CEPH_NOSNAP (uint64_t)
809 * @param flags image flags
811 * @returns 0 on success, negative error code upon failure
813 int get_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
816 bufferlist::iterator iter = in->begin();
818 ::decode(snap_id, iter);
819 } catch (const buffer::error &err) {
823 CLS_LOG(20, "get_flags snap_id=%llu", (unsigned long long)snap_id);
826 if (snap_id == CEPH_NOSNAP) {
827 int r = read_key(hctx, "flags", &flags);
828 if (r < 0 && r != -ENOENT) {
829 CLS_ERR("failed to read flags off disk: %s", cpp_strerror(r).c_str());
835 key_from_snap_id(snap_id, &snapshot_key);
836 int r = read_key(hctx, snapshot_key, &snap);
843 ::encode(flags, *out);
848 * set the image flags
851 * @param flags image flags
852 * @param mask image flag mask
853 * @param snap_id which snapshot to update, or CEPH_NOSNAP (uint64_t)
858 * @returns 0 on success, negative error code upon failure
860 int set_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
864 uint64_t snap_id = CEPH_NOSNAP;
865 bufferlist::iterator iter = in->begin();
867 ::decode(flags, iter);
868 ::decode(mask, iter);
870 ::decode(snap_id, iter);
872 } catch (const buffer::error &err) {
876 // check that size exists to make sure this is a header object
877 // that was created correctly
879 uint64_t orig_flags = 0;
880 cls_rbd_snap snap_meta;
881 string snap_meta_key;
882 if (snap_id == CEPH_NOSNAP) {
883 r = read_key(hctx, "flags", &orig_flags);
884 if (r < 0 && r != -ENOENT) {
885 CLS_ERR("Could not read image's flags off disk: %s",
886 cpp_strerror(r).c_str());
890 key_from_snap_id(snap_id, &snap_meta_key);
891 r = read_key(hctx, snap_meta_key, &snap_meta);
893 CLS_ERR("Could not read snapshot: snap_id=%" PRIu64 ": %s",
894 snap_id, cpp_strerror(r).c_str());
897 orig_flags = snap_meta.flags;
900 flags = (orig_flags & ~mask) | (flags & mask);
901 CLS_LOG(20, "set_flags snap_id=%" PRIu64 ", orig_flags=%" PRIu64 ", "
902 "new_flags=%" PRIu64 ", mask=%" PRIu64, snap_id, orig_flags,
905 if (snap_id == CEPH_NOSNAP) {
908 r = cls_cxx_map_set_val(hctx, "flags", &bl);
910 snap_meta.flags = flags;
913 ::encode(snap_meta, bl);
914 r = cls_cxx_map_set_val(hctx, snap_meta_key, &bl);
918 CLS_ERR("error updating flags: %s", cpp_strerror(r).c_str());
925 * get the current parent, if any
928 * @param snap_id which snapshot to query, or CEPH_NOSNAP (uint64_t)
931 * @param pool parent pool id (-1 if parent does not exist)
932 * @param image parent image id
933 * @param snapid parent snapid
934 * @param size portion of parent mapped under the child
936 * @returns 0 on success or parent does not exist, negative error code on failure
938 int get_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
942 bufferlist::iterator iter = in->begin();
944 ::decode(snap_id, iter);
945 } catch (const buffer::error &err) {
949 int r = check_exists(hctx);
953 CLS_LOG(20, "get_parent snap_id=%llu", (unsigned long long)snap_id);
955 cls_rbd_parent parent;
956 r = require_feature(hctx, RBD_FEATURE_LAYERING);
958 if (snap_id == CEPH_NOSNAP) {
959 r = read_key(hctx, "parent", &parent);
960 if (r < 0 && r != -ENOENT)
965 key_from_snap_id(snap_id, &snapshot_key);
966 r = read_key(hctx, snapshot_key, &snap);
967 if (r < 0 && r != -ENOENT)
969 parent = snap.parent;
973 ::encode(parent.pool, *out);
974 ::encode(parent.id, *out);
975 ::encode(parent.snapid, *out);
976 ::encode(parent.overlap, *out);
981 * set the image parent
984 * @param pool parent pool
985 * @param id parent image id
986 * @param snapid parent snapid
987 * @param size parent size
989 * @returns 0 on success, or negative error code
991 int set_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
998 bufferlist::iterator iter = in->begin();
1000 ::decode(pool, iter);
1002 ::decode(snapid, iter);
1003 ::decode(size, iter);
1004 } catch (const buffer::error &err) {
1005 CLS_LOG(20, "cls_rbd::set_parent: invalid decode");
1009 int r = check_exists(hctx);
1011 CLS_LOG(20, "cls_rbd::set_parent: child already exists");
1015 r = require_feature(hctx, RBD_FEATURE_LAYERING);
1017 CLS_LOG(20, "cls_rbd::set_parent: child does not support layering");
1021 CLS_LOG(20, "set_parent pool=%llu id=%s snapid=%llu size=%llu",
1022 (unsigned long long)pool, id.c_str(), (unsigned long long)snapid.val,
1023 (unsigned long long)size);
1025 if (pool < 0 || id.length() == 0 || snapid == CEPH_NOSNAP || size == 0) {
1029 // make sure there isn't already a parent
1030 cls_rbd_parent parent;
1031 r = read_key(hctx, "parent", &parent);
1033 CLS_LOG(20, "set_parent existing parent pool=%llu id=%s snapid=%llu"
1034 "overlap=%llu", (unsigned long long)parent.pool, parent.id.c_str(),
1035 (unsigned long long)parent.snapid.val,
1036 (unsigned long long)parent.overlap);
1040 // our overlap is the min of our size and the parent's size.
1042 r = read_key(hctx, "size", &our_size);
1046 bufferlist parentbl;
1049 parent.snapid = snapid;
1050 parent.overlap = MIN(our_size, size);
1051 ::encode(parent, parentbl);
1052 r = cls_cxx_map_set_val(hctx, "parent", &parentbl);
1054 CLS_ERR("error writing parent: %s", cpp_strerror(r).c_str());
1063 * remove the parent pointer
1065 * This can only happen on the head, not on a snapshot. No arguments.
1067 * @returns 0 on success, negative error code on failure.
1069 int remove_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1071 int r = check_exists(hctx);
1075 r = require_feature(hctx, RBD_FEATURE_LAYERING);
1080 r = read_key(hctx, "features", &features);
1085 // remove the parent from all snapshots
1086 if ((features & RBD_FEATURE_DEEP_FLATTEN) != 0) {
1087 int max_read = RBD_MAX_KEYS_READ;
1088 vector<snapid_t> snap_ids;
1089 string last_read = RBD_SNAP_KEY_PREFIX;
1094 r = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys, &more);
1099 for (std::set<string>::const_iterator it = keys.begin();
1100 it != keys.end(); ++it) {
1101 if ((*it).find(RBD_SNAP_KEY_PREFIX) != 0) {
1105 uint64_t snap_id = snap_id_from_key(*it);
1106 cls_rbd_snap snap_meta;
1107 r = read_key(hctx, *it, &snap_meta);
1109 CLS_ERR("Could not read snapshot: snap_id=%" PRIu64 ": %s",
1110 snap_id, cpp_strerror(r).c_str());
1114 snap_meta.parent = cls_rbd_parent();
1117 ::encode(snap_meta, bl);
1118 r = cls_cxx_map_set_val(hctx, *it, &bl);
1120 CLS_ERR("Could not update snapshot: snap_id=%" PRIu64 ": %s",
1121 snap_id, cpp_strerror(r).c_str());
1126 if (!keys.empty()) {
1127 last_read = *(keys.rbegin());
1132 cls_rbd_parent parent;
1133 r = read_key(hctx, "parent", &parent);
1137 r = cls_cxx_map_remove_key(hctx, "parent");
1139 CLS_ERR("error removing parent: %s", cpp_strerror(r).c_str());
1146 * methods for dealing with rbd_children object
1149 static int decode_parent_common(bufferlist::iterator& it, uint64_t *pool_id,
1150 string *image_id, snapid_t *snap_id)
1153 ::decode(*pool_id, it);
1154 ::decode(*image_id, it);
1155 ::decode(*snap_id, it);
1156 } catch (const buffer::error &err) {
1157 CLS_ERR("error decoding parent spec");
1163 static int decode_parent(bufferlist *in, uint64_t *pool_id,
1164 string *image_id, snapid_t *snap_id)
1166 bufferlist::iterator it = in->begin();
1167 return decode_parent_common(it, pool_id, image_id, snap_id);
1170 static int decode_parent_and_child(bufferlist *in, uint64_t *pool_id,
1171 string *image_id, snapid_t *snap_id,
1174 bufferlist::iterator it = in->begin();
1175 int r = decode_parent_common(it, pool_id, image_id, snap_id);
1179 ::decode(*c_image_id, it);
1180 } catch (const buffer::error &err) {
1181 CLS_ERR("error decoding child image id");
1187 static string parent_key(uint64_t pool_id, string image_id, snapid_t snap_id)
1190 ::encode(pool_id, key_bl);
1191 ::encode(image_id, key_bl);
1192 ::encode(snap_id, key_bl);
1193 return string(key_bl.c_str(), key_bl.length());
1197 * add child to rbd_children directory object
1199 * rbd_children is a map of (p_pool_id, p_image_id, p_snap_id) to
1200 * [c_image_id, [c_image_id ... ]]
1203 * @param p_pool_id parent pool id
1204 * @param p_image_id parent image oid
1205 * @param p_snap_id parent snapshot id
1206 * @param c_image_id new child image oid to add
1208 * @returns 0 on success, negative error on failure
1211 int add_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1217 string p_image_id, c_image_id;
1218 // Use set for ease of erase() for remove_child()
1219 std::set<string> children;
1221 r = decode_parent_and_child(in, &p_pool_id, &p_image_id, &p_snap_id,
1226 CLS_LOG(20, "add_child %s to (%" PRIu64 ", %s, %" PRIu64 ")", c_image_id.c_str(),
1227 p_pool_id, p_image_id.c_str(), p_snap_id.val);
1229 string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1231 // get current child list for parent, if any
1232 r = read_key(hctx, key, &children);
1233 if ((r < 0) && (r != -ENOENT)) {
1234 CLS_LOG(20, "add_child: omap read failed: %s", cpp_strerror(r).c_str());
1238 if (children.find(c_image_id) != children.end()) {
1239 CLS_LOG(20, "add_child: child already exists: %s", c_image_id.c_str());
1243 children.insert(c_image_id);
1247 ::encode(children, childbl);
1248 r = cls_cxx_map_set_val(hctx, key, &childbl);
1250 CLS_LOG(20, "add_child: omap write failed: %s", cpp_strerror(r).c_str());
1255 * remove child from rbd_children directory object
1258 * @param p_pool_id parent pool id
1259 * @param p_image_id parent image oid
1260 * @param p_snap_id parent snapshot id
1261 * @param c_image_id new child image oid to add
1263 * @returns 0 on success, negative error on failure
1266 int remove_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1272 string p_image_id, c_image_id;
1273 std::set<string> children;
1275 r = decode_parent_and_child(in, &p_pool_id, &p_image_id, &p_snap_id,
1280 CLS_LOG(20, "remove_child %s from (%" PRIu64 ", %s, %" PRIu64 ")",
1281 c_image_id.c_str(), p_pool_id, p_image_id.c_str(),
1284 string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1286 // get current child list for parent. Unlike add_child(), an empty list
1287 // is an error (how can we remove something that doesn't exist?)
1288 r = read_key(hctx, key, &children);
1290 CLS_LOG(20, "remove_child: read omap failed: %s", cpp_strerror(r).c_str());
1294 if (children.find(c_image_id) == children.end()) {
1295 CLS_LOG(20, "remove_child: child not found: %s", c_image_id.c_str());
1298 // find and remove child
1299 children.erase(c_image_id);
1301 // now empty? remove key altogether
1302 if (children.empty()) {
1303 r = cls_cxx_map_remove_key(hctx, key);
1305 CLS_LOG(20, "remove_child: remove key failed: %s", cpp_strerror(r).c_str());
1307 // write back shortened children list
1309 ::encode(children, childbl);
1310 r = cls_cxx_map_set_val(hctx, key, &childbl);
1312 CLS_LOG(20, "remove_child: write omap failed: %s", cpp_strerror(r).c_str());
1319 * @param p_pool_id parent pool id
1320 * @param p_image_id parent image oid
1321 * @param p_snap_id parent snapshot id
1322 * @param c_image_id new child image oid to add
1325 * @param children set<string> of children
1327 * @returns 0 on success, negative error on failure
1329 int get_children(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1335 std::set<string> children;
1337 r = decode_parent(in, &p_pool_id, &p_image_id, &p_snap_id);
1341 CLS_LOG(20, "get_children of (%" PRIu64 ", %s, %" PRIu64 ")",
1342 p_pool_id, p_image_id.c_str(), p_snap_id.val);
1344 string key = parent_key(p_pool_id, p_image_id, p_snap_id);
1346 r = read_key(hctx, key, &children);
1349 CLS_LOG(20, "get_children: read omap failed: %s", cpp_strerror(r).c_str());
1352 ::encode(children, *out);
1358 * Get the information needed to create a rados snap context for doing
1359 * I/O to the data objects. This must include all snapshots.
1362 * @param snap_seq the highest snapshot id ever associated with the image (uint64_t)
1363 * @param snap_ids existing snapshot ids in descending order (vector<uint64_t>)
1364 * @returns 0 on success, negative error code on failure
1366 int get_snapcontext(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1368 CLS_LOG(20, "get_snapcontext");
1371 int max_read = RBD_MAX_KEYS_READ;
1372 vector<snapid_t> snap_ids;
1373 string last_read = RBD_SNAP_KEY_PREFIX;
1378 r = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys, &more);
1382 for (set<string>::const_iterator it = keys.begin();
1383 it != keys.end(); ++it) {
1384 if ((*it).find(RBD_SNAP_KEY_PREFIX) != 0)
1386 snapid_t snap_id = snap_id_from_key(*it);
1387 snap_ids.push_back(snap_id);
1390 last_read = *(keys.rbegin());
1394 r = read_key(hctx, "snap_seq", &snap_seq);
1396 CLS_ERR("could not read the image's snap_seq off disk: %s", cpp_strerror(r).c_str());
1400 // snap_ids must be descending in a snap context
1401 std::reverse(snap_ids.begin(), snap_ids.end());
1403 ::encode(snap_seq, *out);
1404 ::encode(snap_ids, *out);
1411 * @param object_prefix prefix for data object names (string)
1412 * @returns 0 on success, negative error code on failure
1414 int get_object_prefix(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1416 CLS_LOG(20, "get_object_prefix");
1418 string object_prefix;
1419 int r = read_key(hctx, "object_prefix", &object_prefix);
1421 CLS_ERR("failed to read the image's object prefix off of disk: %s",
1422 cpp_strerror(r).c_str());
1426 ::encode(object_prefix, *out);
1436 * @param pool_id (int64_t) of data pool or -1 if none
1437 * @returns 0 on success, negative error code on failure
1439 int get_data_pool(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1441 CLS_LOG(20, "get_data_pool");
1443 int64_t data_pool_id = -1;
1444 int r = read_key(hctx, "data_pool_id", &data_pool_id);
1448 CLS_ERR("error reading image data pool id: %s", cpp_strerror(r).c_str());
1452 ::encode(data_pool_id, *out);
1456 int get_snapshot_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1460 bufferlist::iterator iter = in->begin();
1462 ::decode(snap_id, iter);
1463 } catch (const buffer::error &err) {
1467 CLS_LOG(20, "get_snapshot_name snap_id=%llu", (unsigned long long)snap_id);
1469 if (snap_id == CEPH_NOSNAP)
1473 string snapshot_key;
1474 key_from_snap_id(snap_id, &snapshot_key);
1475 int r = read_key(hctx, snapshot_key, &snap);
1479 ::encode(snap.name, *out);
1484 int get_snapshot_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1488 bufferlist::iterator iter = in->begin();
1490 ::decode(snap_id, iter);
1491 } catch (const buffer::error &err) {
1495 CLS_LOG(20, "get_snapshot_timestamp snap_id=%llu", (unsigned long long)snap_id);
1497 if (snap_id == CEPH_NOSNAP) {
1502 string snapshot_key;
1503 key_from_snap_id(snap_id, &snapshot_key);
1504 int r = read_key(hctx, snapshot_key, &snap);
1509 ::encode(snap.timestamp, *out);
1514 * Retrieve namespace of a snapshot.
1517 * @param snap_id id of the snapshot (uint64_t)
1520 * @param SnapshotNamespace
1521 * @returns 0 on success, negative error code on failure.
1523 int get_snapshot_namespace(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1527 bufferlist::iterator iter = in->begin();
1529 ::decode(snap_id, iter);
1530 } catch (const buffer::error &err) {
1534 CLS_LOG(20, "get_snapshot_namespace snap_id=%" PRIu64, snap_id);
1536 if (snap_id == CEPH_NOSNAP) {
1541 string snapshot_key;
1542 key_from_snap_id(snap_id, &snapshot_key);
1543 int r = read_key(hctx, snapshot_key, &snap);
1548 ::encode(snap.snapshot_namespace, *out);
1554 * Adds a snapshot to an rbd header. Ensures the id and name are unique.
1557 * @param snap_name name of the snapshot (string)
1558 * @param snap_id id of the snapshot (uint64_t)
1559 * @param snap_namespace namespace of the snapshot (cls::rbd::SnapshotNamespaceOnDisk)
1562 * @returns 0 on success, negative error code on failure.
1563 * @returns -ESTALE if the input snap_id is less than the image's snap_seq
1564 * @returns -EEXIST if the id or name are already used by another snapshot
1566 int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1568 bufferlist snap_namebl, snap_idbl;
1569 cls_rbd_snap snap_meta;
1570 uint64_t snap_limit;
1573 bufferlist::iterator iter = in->begin();
1574 ::decode(snap_meta.name, iter);
1575 ::decode(snap_meta.id, iter);
1577 ::decode(snap_meta.snapshot_namespace, iter);
1579 } catch (const buffer::error &err) {
1583 if (boost::get<cls::rbd::UnknownSnapshotNamespace>(
1584 &snap_meta.snapshot_namespace.snapshot_namespace) != nullptr) {
1585 CLS_ERR("Unknown snapshot namespace provided");
1589 CLS_LOG(20, "snapshot_add name=%s id=%llu", snap_meta.name.c_str(),
1590 (unsigned long long)snap_meta.id.val);
1592 if (snap_meta.id > CEPH_MAXSNAP)
1595 uint64_t cur_snap_seq;
1596 int r = read_key(hctx, "snap_seq", &cur_snap_seq);
1598 CLS_ERR("Could not read image's snap_seq off disk: %s", cpp_strerror(r).c_str());
1602 // client lost a race with another snapshot creation.
1603 // snap_seq must be monotonically increasing.
1604 if (snap_meta.id < cur_snap_seq)
1607 r = read_key(hctx, "size", &snap_meta.image_size);
1609 CLS_ERR("Could not read image's size off disk: %s", cpp_strerror(r).c_str());
1612 r = read_key(hctx, "features", &snap_meta.features);
1614 CLS_ERR("Could not read image's features off disk: %s", cpp_strerror(r).c_str());
1617 r = read_key(hctx, "flags", &snap_meta.flags);
1618 if (r < 0 && r != -ENOENT) {
1619 CLS_ERR("Could not read image's flags off disk: %s", cpp_strerror(r).c_str());
1623 r = read_key(hctx, "snap_limit", &snap_limit);
1625 snap_limit = UINT64_MAX;
1627 CLS_ERR("Could not read snapshot limit off disk: %s", cpp_strerror(r).c_str());
1631 snap_meta.timestamp = ceph_clock_now();
1633 int max_read = RBD_MAX_KEYS_READ;
1634 uint64_t total_read = 0;
1635 string last_read = RBD_SNAP_KEY_PREFIX;
1638 map<string, bufferlist> vals;
1639 r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
1640 max_read, &vals, &more);
1644 total_read += vals.size();
1645 if (total_read >= snap_limit) {
1646 CLS_ERR("Attempt to create snapshot over limit of %" PRIu64, snap_limit);
1650 for (map<string, bufferlist>::iterator it = vals.begin();
1651 it != vals.end(); ++it) {
1652 cls_rbd_snap old_meta;
1653 bufferlist::iterator iter = it->second.begin();
1655 ::decode(old_meta, iter);
1656 } catch (const buffer::error &err) {
1657 snapid_t snap_id = snap_id_from_key(it->first);
1658 CLS_ERR("error decoding snapshot metadata for snap_id: %llu",
1659 (unsigned long long)snap_id.val);
1662 if ((snap_meta.name == old_meta.name &&
1663 snap_meta.snapshot_namespace == old_meta.snapshot_namespace) ||
1664 snap_meta.id == old_meta.id) {
1665 CLS_LOG(20, "snap_name %s or snap_id %llu matches existing snap %s %llu",
1666 snap_meta.name.c_str(), (unsigned long long)snap_meta.id.val,
1667 old_meta.name.c_str(), (unsigned long long)old_meta.id.val);
1673 last_read = vals.rbegin()->first;
1676 // snapshot inherits parent, if any
1677 cls_rbd_parent parent;
1678 r = read_key(hctx, "parent", &parent);
1679 if (r < 0 && r != -ENOENT)
1682 snap_meta.parent = parent;
1685 bufferlist snap_metabl, snap_seqbl;
1686 ::encode(snap_meta, snap_metabl);
1687 ::encode(snap_meta.id, snap_seqbl);
1689 string snapshot_key;
1690 key_from_snap_id(snap_meta.id, &snapshot_key);
1691 map<string, bufferlist> vals;
1692 vals["snap_seq"] = snap_seqbl;
1693 vals[snapshot_key] = snap_metabl;
1694 r = cls_cxx_map_set_vals(hctx, &vals);
1696 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1708 * @param src_snap_id old snap id of the snapshot (snapid_t)
1709 * @param dst_snap_name new name of the snapshot (string)
1712 * @returns 0 on success, negative error code on failure.
1714 int snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1716 bufferlist snap_namebl, snap_idbl;
1717 snapid_t src_snap_id;
1718 string src_snap_key,dst_snap_name;
1719 cls_rbd_snap snap_meta;
1723 bufferlist::iterator iter = in->begin();
1724 ::decode(src_snap_id, iter);
1725 ::decode(dst_snap_name, iter);
1726 } catch (const buffer::error &err) {
1730 CLS_LOG(20, "snapshot_rename id=%llu dst_name=%s", (unsigned long long)src_snap_id.val,
1731 dst_snap_name.c_str());
1733 int max_read = RBD_MAX_KEYS_READ;
1734 string last_read = RBD_SNAP_KEY_PREFIX;
1737 map<string, bufferlist> vals;
1738 r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
1739 max_read, &vals, &more);
1743 for (map<string, bufferlist>::iterator it = vals.begin();
1744 it != vals.end(); ++it) {
1745 bufferlist::iterator iter = it->second.begin();
1747 ::decode(snap_meta, iter);
1748 } catch (const buffer::error &err) {
1749 CLS_ERR("error decoding snapshot metadata for snap : %s",
1750 dst_snap_name.c_str());
1753 if (dst_snap_name == snap_meta.name) {
1754 CLS_LOG(20, "snap_name %s matches existing snap with snap id = %llu ",
1755 dst_snap_name.c_str(), (unsigned long long)snap_meta.id.val);
1760 last_read = vals.rbegin()->first;
1763 key_from_snap_id(src_snap_id, &src_snap_key);
1764 r = read_key(hctx, src_snap_key, &snap_meta);
1766 CLS_LOG(20, "cannot find existing snap with snap id = %llu ", (unsigned long long)src_snap_id);
1769 snap_meta.name = dst_snap_name;
1770 bufferlist snap_metabl;
1771 ::encode(snap_meta, snap_metabl);
1773 r = cls_cxx_map_set_val(hctx, src_snap_key, &snap_metabl);
1775 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1782 * Removes a snapshot from an rbd header.
1785 * @param snap_id the id of the snapshot to remove (uint64_t)
1788 * @returns 0 on success, negative error code on failure
1790 int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1795 bufferlist::iterator iter = in->begin();
1796 ::decode(snap_id, iter);
1797 } catch (const buffer::error &err) {
1801 CLS_LOG(20, "snapshot_remove id=%llu", (unsigned long long)snap_id.val);
1803 // check if the key exists. we can't rely on remove_key doing this for
1804 // us, since OMAPRMKEYS returns success if the key is not there.
1805 // bug or feature? sounds like a bug, since tmap did not have this
1806 // behavior, but cls_rgw may rely on it...
1808 string snapshot_key;
1809 key_from_snap_id(snap_id, &snapshot_key);
1810 int r = read_key(hctx, snapshot_key, &snap);
1814 if (snap.protection_status != RBD_PROTECTION_STATUS_UNPROTECTED)
1817 r = cls_cxx_map_remove_key(hctx, snapshot_key);
1819 CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
1827 * Returns a uint64_t of all the features supported by this class.
1829 int get_all_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1831 uint64_t all_features = RBD_FEATURES_ALL;
1832 ::encode(all_features, *out);
1837 * "Copy up" data from the parent of a clone to the clone's object(s).
1838 * Used for implementing copy-on-write for a clone image. Client
1839 * will pass down a chunk of data that fits completely within one
1840 * clone block (one object), and is aligned (starts at beginning of block),
1841 * but may be shorter (for non-full parent blocks). The class method
1842 * can't know the object size to validate the requested length,
1843 * so it just writes the data as given if the child object doesn't
1844 * already exist, and returns success if it does.
1847 * @param in bufferlist of data to write
1850 * @returns 0 on success, or if block already exists in child
1851 * negative error code on other error
1854 int copyup(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1856 // check for existence; if child object exists, just return success
1857 if (cls_cxx_stat(hctx, NULL, NULL) == 0)
1859 CLS_LOG(20, "copyup: writing length %d\n", in->length());
1860 return cls_cxx_write(hctx, 0, in->length(), in);
1864 /************************ rbd_id object methods **************************/
1871 * @param id the id stored in the object
1872 * @returns 0 on success, negative error code on failure
1874 int get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1877 int r = cls_cxx_stat(hctx, &size, NULL);
1885 r = cls_cxx_read(hctx, 0, size, &read_bl);
1887 CLS_ERR("get_id: could not read id: %s", cpp_strerror(r).c_str());
1893 bufferlist::iterator iter = read_bl.begin();
1895 } catch (const buffer::error &err) {
1904 * Set the id of an image. The object must already exist.
1907 * @param id the id of the image, as an alpha-numeric string
1910 * @returns 0 on success, -EEXIST if the atomic create fails,
1911 * negative error code on other error
1913 int set_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
1915 int r = check_exists(hctx);
1921 bufferlist::iterator iter = in->begin();
1923 } catch (const buffer::error &err) {
1927 if (!is_valid_id(id)) {
1928 CLS_ERR("set_id: invalid id '%s'", id.c_str());
1933 r = cls_cxx_stat(hctx, &size, NULL);
1939 CLS_LOG(20, "set_id: id=%s", id.c_str());
1941 bufferlist write_bl;
1942 ::encode(id, write_bl);
1943 return cls_cxx_write(hctx, 0, write_bl.length(), &write_bl);
1946 /*********************** methods for rbd_directory ***********************/
1948 static const string dir_key_for_id(const string &id)
1950 return RBD_DIR_ID_KEY_PREFIX + id;
1953 static const string dir_key_for_name(const string &name)
1955 return RBD_DIR_NAME_KEY_PREFIX + name;
1958 static const string dir_name_from_key(const string &key)
1960 return key.substr(strlen(RBD_DIR_NAME_KEY_PREFIX));
1963 static int dir_add_image_helper(cls_method_context_t hctx,
1964 const string &name, const string &id,
1965 bool check_for_unique_id)
1967 if (!name.size() || !is_valid_id(id)) {
1968 CLS_ERR("dir_add_image_helper: invalid name '%s' or id '%s'",
1969 name.c_str(), id.c_str());
1973 CLS_LOG(20, "dir_add_image_helper name=%s id=%s", name.c_str(), id.c_str());
1976 string name_key = dir_key_for_name(name);
1977 string id_key = dir_key_for_id(id);
1978 int r = read_key(hctx, name_key, &tmp);
1980 CLS_LOG(10, "name already exists");
1983 r = read_key(hctx, id_key, &tmp);
1984 if (r != -ENOENT && check_for_unique_id) {
1985 CLS_LOG(10, "id already exists");
1988 bufferlist id_bl, name_bl;
1989 ::encode(id, id_bl);
1990 ::encode(name, name_bl);
1991 map<string, bufferlist> omap_vals;
1992 omap_vals[name_key] = id_bl;
1993 omap_vals[id_key] = name_bl;
1994 return cls_cxx_map_set_vals(hctx, &omap_vals);
1997 static int dir_remove_image_helper(cls_method_context_t hctx,
1998 const string &name, const string &id)
2000 CLS_LOG(20, "dir_remove_image_helper name=%s id=%s",
2001 name.c_str(), id.c_str());
2003 string stored_name, stored_id;
2004 string name_key = dir_key_for_name(name);
2005 string id_key = dir_key_for_id(id);
2006 int r = read_key(hctx, name_key, &stored_id);
2009 CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
2012 r = read_key(hctx, id_key, &stored_name);
2014 CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
2018 // check if this op raced with a rename
2019 if (stored_name != name || stored_id != id) {
2020 CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
2021 stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
2025 r = cls_cxx_map_remove_key(hctx, name_key);
2027 CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
2031 r = cls_cxx_map_remove_key(hctx, id_key);
2033 CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
2041 * Rename an image in the directory, updating both indexes
2042 * atomically. This can't be done from the client calling
2043 * dir_add_image and dir_remove_image in one transaction because the
2044 * results of the first method are not visibale to later steps.
2047 * @param src original name of the image
2048 * @param dest new name of the image
2049 * @param id the id of the image
2052 * @returns -ESTALE if src and id do not map to each other
2053 * @returns -ENOENT if src or id are not in the directory
2054 * @returns -EEXIST if dest already exists
2055 * @returns 0 on success, negative error code on failure
2057 int dir_rename_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2059 string src, dest, id;
2061 bufferlist::iterator iter = in->begin();
2062 ::decode(src, iter);
2063 ::decode(dest, iter);
2065 } catch (const buffer::error &err) {
2069 int r = dir_remove_image_helper(hctx, src, id);
2072 // ignore duplicate id because the result of
2073 // remove_image_helper is not visible yet
2074 return dir_add_image_helper(hctx, dest, id, false);
2078 * Get the id of an image given its name.
2081 * @param name the name of the image
2084 * @param id the id of the image
2085 * @returns 0 on success, negative error code on failure
2087 int dir_get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2092 bufferlist::iterator iter = in->begin();
2093 ::decode(name, iter);
2094 } catch (const buffer::error &err) {
2098 CLS_LOG(20, "dir_get_id: name=%s", name.c_str());
2101 int r = read_key(hctx, dir_key_for_name(name), &id);
2104 CLS_ERR("error reading id for name '%s': %s", name.c_str(), cpp_strerror(r).c_str());
2112 * Get the name of an image given its id.
2115 * @param id the id of the image
2118 * @param name the name of the image
2119 * @returns 0 on success, negative error code on failure
2121 int dir_get_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2126 bufferlist::iterator iter = in->begin();
2128 } catch (const buffer::error &err) {
2132 CLS_LOG(20, "dir_get_name: id=%s", id.c_str());
2135 int r = read_key(hctx, dir_key_for_id(id), &name);
2137 CLS_ERR("error reading name for id '%s': %s", id.c_str(), cpp_strerror(r).c_str());
2140 ::encode(name, *out);
2145 * List the names and ids of the images in the directory, sorted by
2149 * @param start_after which name to begin listing after
2150 * (use the empty string to start at the beginning)
2151 * @param max_return the maximum number of names to list
2154 * @param images map from name to id of up to max_return images
2155 * @returns 0 on success, negative error code on failure
2157 int dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2160 uint64_t max_return;
2163 bufferlist::iterator iter = in->begin();
2164 ::decode(start_after, iter);
2165 ::decode(max_return, iter);
2166 } catch (const buffer::error &err) {
2170 int max_read = RBD_MAX_KEYS_READ;
2171 map<string, string> images;
2172 string last_read = dir_key_for_name(start_after);
2175 while (more && images.size() < max_return) {
2176 map<string, bufferlist> vals;
2177 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
2178 int r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
2179 max_read, &vals, &more);
2181 CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
2185 for (map<string, bufferlist>::iterator it = vals.begin();
2186 it != vals.end(); ++it) {
2188 bufferlist::iterator iter = it->second.begin();
2191 } catch (const buffer::error &err) {
2192 CLS_ERR("could not decode id of image '%s'", it->first.c_str());
2195 CLS_LOG(20, "adding '%s' -> '%s'", dir_name_from_key(it->first).c_str(), id.c_str());
2196 images[dir_name_from_key(it->first)] = id;
2197 if (images.size() >= max_return)
2200 if (!vals.empty()) {
2201 last_read = dir_key_for_name(images.rbegin()->first);
2205 ::encode(images, *out);
2211 * Add an image to the rbd directory. Creates the directory object if
2212 * needed, and updates the index from id to name and name to id.
2215 * @param name the name of the image
2216 * @param id the id of the image
2219 * @returns -EEXIST if the image name is already in the directory
2220 * @returns -EBADF if the image id is already in the directory
2221 * @returns 0 on success, negative error code on failure
2223 int dir_add_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2225 int r = cls_cxx_create(hctx, false);
2227 CLS_ERR("could not create directory: %s", cpp_strerror(r).c_str());
2233 bufferlist::iterator iter = in->begin();
2234 ::decode(name, iter);
2236 } catch (const buffer::error &err) {
2240 return dir_add_image_helper(hctx, name, id, true);
2244 * Remove an image from the rbd directory.
2247 * @param name the name of the image
2248 * @param id the id of the image
2251 * @returns -ESTALE if the name and id do not map to each other
2252 * @returns 0 on success, negative error code on failure
2254 int dir_remove_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2258 bufferlist::iterator iter = in->begin();
2259 ::decode(name, iter);
2261 } catch (const buffer::error &err) {
2265 return dir_remove_image_helper(hctx, name, id);
2268 int object_map_read(cls_method_context_t hctx, BitVector<2> &object_map)
2271 int r = cls_cxx_stat(hctx, &size, NULL);
2280 r = cls_cxx_read(hctx, 0, size, &bl);
2286 bufferlist::iterator iter = bl.begin();
2287 ::decode(object_map, iter);
2288 } catch (const buffer::error &err) {
2289 CLS_ERR("failed to decode object map: %s", err.what());
2296 * Load an rbd image's object map
2302 * @param object map bit vector
2303 * @returns 0 on success, negative error code on failure
2305 int object_map_load(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2307 BitVector<2> object_map;
2308 int r = object_map_read(hctx, object_map);
2313 object_map.set_crc_enabled(false);
2314 ::encode(object_map, *out);
2319 * Save an rbd image's object map
2322 * @param object map bit vector
2325 * @returns 0 on success, negative error code on failure
2327 int object_map_save(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2329 BitVector<2> object_map;
2331 bufferlist::iterator iter = in->begin();
2332 ::decode(object_map, iter);
2333 } catch (const buffer::error &err) {
2337 object_map.set_crc_enabled(true);
2340 ::encode(object_map, bl);
2341 CLS_LOG(20, "object_map_save: object size=%" PRIu64 ", byte size=%u",
2342 object_map.size(), bl.length());
2343 return cls_cxx_write_full(hctx, &bl);
2347 * Resize an rbd image's object map
2350 * @param object_count the max number of objects in the image
2351 * @param default_state the default state of newly created objects
2354 * @returns 0 on success, negative error code on failure
2356 int object_map_resize(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2358 uint64_t object_count;
2359 uint8_t default_state;
2361 bufferlist::iterator iter = in->begin();
2362 ::decode(object_count, iter);
2363 ::decode(default_state, iter);
2364 } catch (const buffer::error &err) {
2368 // protect against excessive memory requirements
2369 if (object_count > cls::rbd::MAX_OBJECT_MAP_OBJECT_COUNT) {
2370 CLS_ERR("object map too large: %" PRIu64, object_count);
2374 BitVector<2> object_map;
2375 int r = object_map_read(hctx, object_map);
2376 if ((r < 0) && (r != -ENOENT)) {
2380 size_t orig_object_map_size = object_map.size();
2381 if (object_count < orig_object_map_size) {
2382 for (uint64_t i = object_count + 1; i < orig_object_map_size; ++i) {
2383 if (object_map[i] != default_state) {
2384 CLS_ERR("object map indicates object still exists: %" PRIu64, i);
2388 object_map.resize(object_count);
2389 } else if (object_count > orig_object_map_size) {
2390 object_map.resize(object_count);
2391 for (uint64_t i = orig_object_map_size; i < object_count; ++i) {
2392 object_map[i] = default_state;
2397 ::encode(object_map, map);
2398 CLS_LOG(20, "object_map_resize: object size=%" PRIu64 ", byte size=%u",
2399 object_count, map.length());
2400 return cls_cxx_write_full(hctx, &map);
2404 * Update an rbd image's object map
2407 * @param start_object_no the start object iterator
2408 * @param end_object_no the end object iterator
2409 * @param new_object_state the new object state
2410 * @param current_object_state optional current object state filter
2413 * @returns 0 on success, negative error code on failure
2415 int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2417 uint64_t start_object_no;
2418 uint64_t end_object_no;
2419 uint8_t new_object_state;
2420 boost::optional<uint8_t> current_object_state;
2422 bufferlist::iterator iter = in->begin();
2423 ::decode(start_object_no, iter);
2424 ::decode(end_object_no, iter);
2425 ::decode(new_object_state, iter);
2426 ::decode(current_object_state, iter);
2427 } catch (const buffer::error &err) {
2428 CLS_ERR("failed to decode message");
2433 int r = cls_cxx_stat(hctx, &size, NULL);
2438 BitVector<2> object_map;
2439 bufferlist header_bl;
2440 r = cls_cxx_read2(hctx, 0, object_map.get_header_length(), &header_bl,
2441 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2443 CLS_ERR("object map header read failed");
2448 bufferlist::iterator it = header_bl.begin();
2449 object_map.decode_header(it);
2450 } catch (const buffer::error &err) {
2451 CLS_ERR("failed to decode object map header: %s", err.what());
2455 bufferlist footer_bl;
2456 r = cls_cxx_read2(hctx, object_map.get_footer_offset(),
2457 size - object_map.get_footer_offset(), &footer_bl,
2458 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2460 CLS_ERR("object map footer read failed");
2465 bufferlist::iterator it = footer_bl.begin();
2466 object_map.decode_footer(it);
2467 } catch (const buffer::error &err) {
2468 CLS_ERR("failed to decode object map footer: %s", err.what());
2471 if (start_object_no >= end_object_no || end_object_no > object_map.size()) {
2475 uint64_t byte_offset;
2476 uint64_t byte_length;
2477 object_map.get_data_extents(start_object_no,
2478 end_object_no - start_object_no,
2479 &byte_offset, &byte_length);
2482 r = cls_cxx_read2(hctx, object_map.get_header_length() + byte_offset,
2483 byte_length, &data_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2485 CLS_ERR("object map data read failed");
2490 bufferlist::iterator it = data_bl.begin();
2491 object_map.decode_data(it, byte_offset);
2492 } catch (const buffer::error &err) {
2493 CLS_ERR("failed to decode data chunk [%" PRIu64 "]: %s",
2494 byte_offset, err.what());
2498 bool updated = false;
2499 auto it = object_map.begin() + start_object_no;
2500 auto end_it = object_map.begin() + end_object_no;
2501 for (; it != end_it; ++it) {
2502 uint8_t state = *it;
2503 if ((!current_object_state || state == *current_object_state ||
2504 (*current_object_state == OBJECT_EXISTS &&
2505 state == OBJECT_EXISTS_CLEAN)) && state != new_object_state) {
2506 *it = new_object_state;
2512 CLS_LOG(20, "object_map_update: %" PRIu64 "~%" PRIu64 " -> %" PRIu64,
2513 byte_offset, byte_length,
2514 object_map.get_header_length() + byte_offset);
2517 object_map.encode_data(data_bl, byte_offset, byte_length);
2518 r = cls_cxx_write2(hctx, object_map.get_header_length() + byte_offset,
2519 data_bl.length(), &data_bl,
2520 CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2522 CLS_ERR("failed to write object map header: %s", cpp_strerror(r).c_str());
2527 object_map.encode_footer(footer_bl);
2528 r = cls_cxx_write2(hctx, object_map.get_footer_offset(), footer_bl.length(),
2529 &footer_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
2531 CLS_ERR("failed to write object map footer: %s", cpp_strerror(r).c_str());
2535 CLS_LOG(20, "object_map_update: no update necessary");
2542 * Mark all _EXISTS objects as _EXISTS_CLEAN so future writes to the
2543 * image HEAD can be tracked.
2549 * @returns 0 on success, negative error code on failure
2551 int object_map_snap_add(cls_method_context_t hctx, bufferlist *in,
2554 BitVector<2> object_map;
2555 int r = object_map_read(hctx, object_map);
2560 bool updated = false;
2561 for (uint64_t i = 0; i < object_map.size(); ++i) {
2562 if (object_map[i] == OBJECT_EXISTS) {
2563 object_map[i] = OBJECT_EXISTS_CLEAN;
2570 ::encode(object_map, bl);
2571 r = cls_cxx_write_full(hctx, &bl);
2577 * Mark all _EXISTS_CLEAN objects as _EXISTS in the current object map
2578 * if the provided snapshot object map object is marked as _EXISTS.
2581 * @param snapshot object map bit vector
2584 * @returns 0 on success, negative error code on failure
2586 int object_map_snap_remove(cls_method_context_t hctx, bufferlist *in,
2589 BitVector<2> src_object_map;
2591 bufferlist::iterator iter = in->begin();
2592 ::decode(src_object_map, iter);
2593 } catch (const buffer::error &err) {
2597 BitVector<2> dst_object_map;
2598 int r = object_map_read(hctx, dst_object_map);
2603 bool updated = false;
2604 for (uint64_t i = 0; i < dst_object_map.size(); ++i) {
2605 if (dst_object_map[i] == OBJECT_EXISTS_CLEAN &&
2606 (i >= src_object_map.size() || src_object_map[i] == OBJECT_EXISTS)) {
2607 dst_object_map[i] = OBJECT_EXISTS;
2614 ::encode(dst_object_map, bl);
2615 r = cls_cxx_write_full(hctx, &bl);
2620 static const string metadata_key_for_name(const string &name)
2622 return RBD_METADATA_KEY_PREFIX + name;
2625 static const string metadata_name_from_key(const string &key)
2627 return key.substr(strlen(RBD_METADATA_KEY_PREFIX));
2632 * @param start_after which name to begin listing after
2633 * (use the empty string to start at the beginning)
2634 * @param max_return the maximum number of names to list
2638 * @returns 0 on success, negative error code on failure
2640 int metadata_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2643 uint64_t max_return;
2646 bufferlist::iterator iter = in->begin();
2647 ::decode(start_after, iter);
2648 ::decode(max_return, iter);
2649 } catch (const buffer::error &err) {
2653 // TODO remove implicit support for zero during the N-release
2654 if (max_return == 0) {
2655 max_return = RBD_MAX_KEYS_READ;
2658 map<string, bufferlist> data;
2659 string last_read = metadata_key_for_name(start_after);
2662 while (more && data.size() < max_return) {
2663 map<string, bufferlist> raw_data;
2664 int max_read = MIN(RBD_MAX_KEYS_READ, max_return - data.size());
2665 int r = cls_cxx_map_get_vals(hctx, last_read, RBD_METADATA_KEY_PREFIX,
2666 max_read, &raw_data, &more);
2668 CLS_ERR("failed to read the vals off of disk: %s", cpp_strerror(r).c_str());
2672 for (auto& kv : raw_data) {
2673 data[metadata_name_from_key(kv.first)].swap(kv.second);
2676 if (!raw_data.empty()) {
2677 last_read = raw_data.rbegin()->first;
2681 ::encode(data, *out);
2687 * @param data <map(key, value)>
2690 * @returns 0 on success, negative error code on failure
2692 int metadata_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2694 map<string, bufferlist> data, raw_data;
2696 bufferlist::iterator iter = in->begin();
2698 ::decode(data, iter);
2699 } catch (const buffer::error &err) {
2703 for (map<string, bufferlist>::iterator it = data.begin();
2704 it != data.end(); ++it) {
2705 CLS_LOG(20, "metdata_set key=%s value=%.*s", it->first.c_str(),
2706 it->second.length(), it->second.c_str());
2707 raw_data[metadata_key_for_name(it->first)].swap(it->second);
2709 int r = cls_cxx_map_set_vals(hctx, &raw_data);
2711 CLS_ERR("error writing metadata: %s", cpp_strerror(r).c_str());
2723 * @returns 0 on success, negative error code on failure
2725 int metadata_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2729 bufferlist::iterator iter = in->begin();
2731 ::decode(key, iter);
2732 } catch (const buffer::error &err) {
2736 CLS_LOG(20, "metdata_set key=%s", key.c_str());
2738 int r = cls_cxx_map_remove_key(hctx, metadata_key_for_name(key));
2740 CLS_ERR("error remove metadata: %s", cpp_strerror(r).c_str());
2752 * @param metadata value associated with the key
2753 * @returns 0 on success, negative error code on failure
2755 int metadata_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2760 bufferlist::iterator iter = in->begin();
2762 ::decode(key, iter);
2763 } catch (const buffer::error &err) {
2767 CLS_LOG(20, "metdata_get key=%s", key.c_str());
2769 int r = cls_cxx_map_get_val(hctx, metadata_key_for_name(key), &value);
2771 CLS_ERR("error get metadata: %s", cpp_strerror(r).c_str());
2775 ::encode(value, *out);
2779 int snapshot_get_limit(cls_method_context_t hctx, bufferlist *in,
2782 uint64_t snap_limit;
2783 int r = read_key(hctx, "snap_limit", &snap_limit);
2785 snap_limit = UINT64_MAX;
2787 CLS_ERR("error retrieving snapshot limit: %s", cpp_strerror(r).c_str());
2791 CLS_LOG(20, "read snapshot limit %" PRIu64, snap_limit);
2792 ::encode(snap_limit, *out);
2797 int snapshot_set_limit(cls_method_context_t hctx, bufferlist *in,
2805 bufferlist::iterator iter = in->begin();
2806 ::decode(new_limit, iter);
2807 } catch (const buffer::error &err) {
2811 if (new_limit == UINT64_MAX) {
2812 CLS_LOG(20, "remove snapshot limit\n");
2813 rc = cls_cxx_map_remove_key(hctx, "snap_limit");
2815 CLS_LOG(20, "set snapshot limit to %" PRIu64 "\n", new_limit);
2816 ::encode(new_limit, bl);
2817 rc = cls_cxx_map_set_val(hctx, "snap_limit", &bl);
2824 /****************************** Old format *******************************/
2826 int old_snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2829 struct rbd_obj_header_ondisk *header;
2830 int rc = snap_read_header(hctx, bl);
2834 header = (struct rbd_obj_header_ondisk *)bl.c_str();
2835 bufferptr p(header->snap_names_len);
2836 char *buf = (char *)header;
2837 char *name = buf + sizeof(*header) + header->snap_count * sizeof(struct rbd_obj_snap_ondisk);
2838 char *end = name + header->snap_names_len;
2840 buf + sizeof(*header) + header->snap_count * sizeof(struct rbd_obj_snap_ondisk),
2841 header->snap_names_len);
2843 ::encode(header->snap_seq, *out);
2844 ::encode(header->snap_count, *out);
2846 for (unsigned i = 0; i < header->snap_count; i++) {
2848 ::encode(header->snaps[i].id, *out);
2849 ::encode(header->snaps[i].image_size, *out);
2852 name += strlen(name) + 1;
2860 int old_snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2863 struct rbd_obj_header_ondisk *header;
2865 bufferptr header_bp(sizeof(*header));
2866 struct rbd_obj_snap_ondisk *new_snaps;
2868 int rc = snap_read_header(hctx, bl);
2872 header = (struct rbd_obj_header_ondisk *)bl.c_str();
2874 int snaps_id_ofs = sizeof(*header);
2875 int names_ofs = snaps_id_ofs + sizeof(*new_snaps) * header->snap_count;
2876 const char *snap_name;
2877 const char *snap_names = ((char *)header) + names_ofs;
2878 const char *end = snap_names + header->snap_names_len;
2879 bufferlist::iterator iter = in->begin();
2885 ::decode(snap_id, iter);
2886 } catch (const buffer::error &err) {
2889 snap_name = s.c_str();
2891 if (header->snap_seq > snap_id)
2894 uint64_t snap_limit;
2895 rc = read_key(hctx, "snap_limit", &snap_limit);
2896 if (rc == -ENOENT) {
2897 snap_limit = UINT64_MAX;
2898 } else if (rc < 0) {
2902 if (header->snap_count >= snap_limit)
2905 const char *cur_snap_name;
2906 for (cur_snap_name = snap_names; cur_snap_name < end; cur_snap_name += strlen(cur_snap_name) + 1) {
2907 if (strncmp(cur_snap_name, snap_name, end - cur_snap_name) == 0)
2910 if (cur_snap_name > end)
2913 int snap_name_len = strlen(snap_name);
2915 bufferptr new_names_bp(header->snap_names_len + snap_name_len + 1);
2916 bufferptr new_snaps_bp(sizeof(*new_snaps) * (header->snap_count + 1));
2918 /* copy snap names and append to new snap name */
2919 char *new_snap_names = new_names_bp.c_str();
2920 strcpy(new_snap_names, snap_name);
2921 memcpy(new_snap_names + snap_name_len + 1, snap_names, header->snap_names_len);
2923 /* append new snap id */
2924 new_snaps = (struct rbd_obj_snap_ondisk *)new_snaps_bp.c_str();
2925 memcpy(new_snaps + 1, header->snaps, sizeof(*new_snaps) * header->snap_count);
2927 header->snap_count = header->snap_count + 1;
2928 header->snap_names_len = header->snap_names_len + snap_name_len + 1;
2929 header->snap_seq = snap_id;
2931 new_snaps[0].id = snap_id;
2932 new_snaps[0].image_size = header->image_size;
2934 memcpy(header_bp.c_str(), header, sizeof(*header));
2936 newbl.push_back(header_bp);
2937 newbl.push_back(new_snaps_bp);
2938 newbl.push_back(new_names_bp);
2940 rc = cls_cxx_write_full(hctx, &newbl);
2947 int old_snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
2950 struct rbd_obj_header_ondisk *header;
2952 bufferptr header_bp(sizeof(*header));
2954 int rc = snap_read_header(hctx, bl);
2958 header = (struct rbd_obj_header_ondisk *)bl.c_str();
2960 int snaps_id_ofs = sizeof(*header);
2961 int names_ofs = snaps_id_ofs + sizeof(struct rbd_obj_snap_ondisk) * header->snap_count;
2962 const char *snap_name;
2963 const char *snap_names = ((char *)header) + names_ofs;
2964 const char *orig_names = snap_names;
2965 const char *end = snap_names + header->snap_names_len;
2966 bufferlist::iterator iter = in->begin();
2970 struct rbd_obj_snap_ondisk snap;
2974 } catch (const buffer::error &err) {
2977 snap_name = s.c_str();
2979 for (i = 0; snap_names < end; i++) {
2980 if (strcmp(snap_names, snap_name) == 0) {
2981 snap = header->snaps[i];
2985 snap_names += strlen(snap_names) + 1;
2988 CLS_ERR("couldn't find snap %s\n", snap_name);
2992 header->snap_names_len = header->snap_names_len - (s.length() + 1);
2993 header->snap_count = header->snap_count - 1;
2995 bufferptr new_names_bp(header->snap_names_len);
2996 bufferptr new_snaps_bp(sizeof(header->snaps[0]) * header->snap_count);
2998 memcpy(header_bp.c_str(), header, sizeof(*header));
2999 newbl.push_back(header_bp);
3001 if (header->snap_count) {
3004 CLS_LOG(20, "i=%u\n", i);
3006 snaps_len = sizeof(header->snaps[0]) * i;
3007 names_len = snap_names - orig_names;
3008 memcpy(new_snaps_bp.c_str(), header->snaps, snaps_len);
3009 memcpy(new_names_bp.c_str(), orig_names, names_len);
3011 snap_names += s.length() + 1;
3013 if (i < header->snap_count) {
3014 memcpy(new_snaps_bp.c_str() + snaps_len,
3015 header->snaps + i + 1,
3016 sizeof(header->snaps[0]) * (header->snap_count - i));
3017 memcpy(new_names_bp.c_str() + names_len, snap_names , end - snap_names);
3019 newbl.push_back(new_snaps_bp);
3020 newbl.push_back(new_names_bp);
3023 rc = cls_cxx_write_full(hctx, &newbl);
3031 * rename snapshot of old format.
3034 * @param src_snap_id old snap id of the snapshot (snapid_t)
3035 * @param dst_snap_name new name of the snapshot (string)
3038 * @returns 0 on success, negative error code on failure.
3040 int old_snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
3043 struct rbd_obj_header_ondisk *header;
3045 bufferptr header_bp(sizeof(*header));
3046 snapid_t src_snap_id;
3047 const char *dst_snap_name;
3050 int rc = snap_read_header(hctx, bl);
3054 header = (struct rbd_obj_header_ondisk *)bl.c_str();
3056 int snaps_id_ofs = sizeof(*header);
3057 int names_ofs = snaps_id_ofs + sizeof(rbd_obj_snap_ondisk) * header->snap_count;
3058 const char *snap_names = ((char *)header) + names_ofs;
3059 const char *orig_names = snap_names;
3060 const char *end = snap_names + header->snap_names_len;
3061 bufferlist::iterator iter = in->begin();
3066 ::decode(src_snap_id, iter);
3067 ::decode(dst, iter);
3068 } catch (const buffer::error &err) {
3071 dst_snap_name = dst.c_str();
3073 const char *cur_snap_name;
3074 for (cur_snap_name = snap_names; cur_snap_name < end;
3075 cur_snap_name += strlen(cur_snap_name) + 1) {
3076 if (strcmp(cur_snap_name, dst_snap_name) == 0)
3079 if (cur_snap_name > end)
3081 for (i = 0; i < header->snap_count; i++) {
3082 if (src_snap_id == header->snaps[i].id) {
3086 snap_names += strlen(snap_names) + 1;
3089 CLS_ERR("couldn't find snap %llu\n", (unsigned long long)src_snap_id.val);
3093 CLS_LOG(20, "rename snap with snap id %llu to dest name %s", (unsigned long long)src_snap_id.val, dst_snap_name);
3094 header->snap_names_len = header->snap_names_len - strlen(snap_names) + dst.length();
3096 bufferptr new_names_bp(header->snap_names_len);
3097 bufferptr new_snaps_bp(sizeof(header->snaps[0]) * header->snap_count);
3099 if (header->snap_count) {
3101 CLS_LOG(20, "i=%u\n", i);
3103 names_len = snap_names - orig_names;
3104 memcpy(new_names_bp.c_str(), orig_names, names_len);
3106 strcpy(new_names_bp.c_str() + names_len, dst_snap_name);
3107 names_len += strlen(dst_snap_name) + 1;
3108 snap_names += strlen(snap_names) + 1;
3109 if (i < header->snap_count) {
3110 memcpy(new_names_bp.c_str() + names_len, snap_names , end - snap_names);
3112 memcpy(new_snaps_bp.c_str(), header->snaps, sizeof(header->snaps[0]) * header->snap_count);
3115 memcpy(header_bp.c_str(), header, sizeof(*header));
3116 newbl.push_back(header_bp);
3117 newbl.push_back(new_snaps_bp);
3118 newbl.push_back(new_names_bp);
3120 rc = cls_cxx_write_full(hctx, &newbl);
3129 static const std::string UUID("mirror_uuid");
3130 static const std::string MODE("mirror_mode");
3131 static const std::string PEER_KEY_PREFIX("mirror_peer_");
3132 static const std::string IMAGE_KEY_PREFIX("image_");
3133 static const std::string GLOBAL_KEY_PREFIX("global_");
3134 static const std::string STATUS_GLOBAL_KEY_PREFIX("status_global_");
3135 static const std::string INSTANCE_KEY_PREFIX("instance_");
3137 std::string peer_key(const std::string &uuid) {
3138 return PEER_KEY_PREFIX + uuid;
3141 std::string image_key(const string &image_id) {
3142 return IMAGE_KEY_PREFIX + image_id;
3145 std::string global_key(const string &global_id) {
3146 return GLOBAL_KEY_PREFIX + global_id;
3149 std::string status_global_key(const string &global_id) {
3150 return STATUS_GLOBAL_KEY_PREFIX + global_id;
3153 std::string instance_key(const string &instance_id) {
3154 return INSTANCE_KEY_PREFIX + instance_id;
3157 int uuid_get(cls_method_context_t hctx, std::string *mirror_uuid) {
3158 bufferlist mirror_uuid_bl;
3159 int r = cls_cxx_map_get_val(hctx, mirror::UUID, &mirror_uuid_bl);
3162 CLS_ERR("error reading mirror uuid: %s", cpp_strerror(r).c_str());
3167 *mirror_uuid = std::string(mirror_uuid_bl.c_str(), mirror_uuid_bl.length());
3171 int list_watchers(cls_method_context_t hctx,
3172 std::set<entity_inst_t> *entities) {
3173 obj_list_watch_response_t watchers;
3174 int r = cls_cxx_list_watchers(hctx, &watchers);
3175 if (r < 0 && r != -ENOENT) {
3176 CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
3181 for (auto &w : watchers.entries) {
3182 entities->emplace(w.name, w.addr);
3187 int read_peers(cls_method_context_t hctx,
3188 std::vector<cls::rbd::MirrorPeer> *peers) {
3189 std::string last_read = PEER_KEY_PREFIX;
3190 int max_read = RBD_MAX_KEYS_READ;
3193 std::map<std::string, bufferlist> vals;
3194 int r = cls_cxx_map_get_vals(hctx, last_read, PEER_KEY_PREFIX.c_str(),
3195 max_read, &vals, &more);
3197 CLS_ERR("error reading peers: %s", cpp_strerror(r).c_str());
3201 for (auto &it : vals) {
3203 bufferlist::iterator bl_it = it.second.begin();
3204 cls::rbd::MirrorPeer peer;
3205 ::decode(peer, bl_it);
3206 peers->push_back(peer);
3207 } catch (const buffer::error &err) {
3208 CLS_ERR("could not decode peer '%s'", it.first.c_str());
3213 if (!vals.empty()) {
3214 last_read = vals.rbegin()->first;
3220 int read_peer(cls_method_context_t hctx, const std::string &id,
3221 cls::rbd::MirrorPeer *peer) {
3223 int r = cls_cxx_map_get_val(hctx, peer_key(id), &bl);
3225 CLS_ERR("error reading peer '%s': %s", id.c_str(),
3226 cpp_strerror(r).c_str());
3231 bufferlist::iterator bl_it = bl.begin();
3232 ::decode(*peer, bl_it);
3233 } catch (const buffer::error &err) {
3234 CLS_ERR("could not decode peer '%s'", id.c_str());
3240 int write_peer(cls_method_context_t hctx, const std::string &id,
3241 const cls::rbd::MirrorPeer &peer) {
3245 int r = cls_cxx_map_set_val(hctx, peer_key(id), &bl);
3247 CLS_ERR("error writing peer '%s': %s", id.c_str(),
3248 cpp_strerror(r).c_str());
3254 int image_get(cls_method_context_t hctx, const string &image_id,
3255 cls::rbd::MirrorImage *mirror_image) {
3257 int r = cls_cxx_map_get_val(hctx, image_key(image_id), &bl);
3260 CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
3261 cpp_strerror(r).c_str());
3267 bufferlist::iterator it = bl.begin();
3268 ::decode(*mirror_image, it);
3269 } catch (const buffer::error &err) {
3270 CLS_ERR("could not decode mirrored image '%s'", image_id.c_str());
3277 int image_set(cls_method_context_t hctx, const string &image_id,
3278 const cls::rbd::MirrorImage &mirror_image) {
3280 ::encode(mirror_image, bl);
3282 cls::rbd::MirrorImage existing_mirror_image;
3283 int r = image_get(hctx, image_id, &existing_mirror_image);
3285 // make sure global id doesn't already exist
3286 std::string global_id_key = global_key(mirror_image.global_image_id);
3287 std::string image_id;
3288 r = read_key(hctx, global_id_key, &image_id);
3291 } else if (r != -ENOENT) {
3292 CLS_ERR("error reading global image id: '%s': '%s'", image_id.c_str(),
3293 cpp_strerror(r).c_str());
3297 // make sure this was not a race for disabling
3298 if (mirror_image.state == cls::rbd::MIRROR_IMAGE_STATE_DISABLING) {
3299 CLS_ERR("image '%s' is already disabled", image_id.c_str());
3303 CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
3304 cpp_strerror(r).c_str());
3306 } else if (existing_mirror_image.global_image_id !=
3307 mirror_image.global_image_id) {
3308 // cannot change the global id
3312 r = cls_cxx_map_set_val(hctx, image_key(image_id), &bl);
3314 CLS_ERR("error adding mirrored image '%s': %s", image_id.c_str(),
3315 cpp_strerror(r).c_str());
3319 bufferlist image_id_bl;
3320 ::encode(image_id, image_id_bl);
3321 r = cls_cxx_map_set_val(hctx, global_key(mirror_image.global_image_id),
3324 CLS_ERR("error adding global id for image '%s': %s", image_id.c_str(),
3325 cpp_strerror(r).c_str());
3331 int image_remove(cls_method_context_t hctx, const string &image_id) {
3333 cls::rbd::MirrorImage mirror_image;
3334 int r = image_get(hctx, image_id, &mirror_image);
3337 CLS_ERR("error reading mirrored image '%s': '%s'", image_id.c_str(),
3338 cpp_strerror(r).c_str());
3343 if (mirror_image.state != cls::rbd::MIRROR_IMAGE_STATE_DISABLING) {
3347 r = cls_cxx_map_remove_key(hctx, image_key(image_id));
3349 CLS_ERR("error removing mirrored image '%s': %s", image_id.c_str(),
3350 cpp_strerror(r).c_str());
3354 r = cls_cxx_map_remove_key(hctx, global_key(mirror_image.global_image_id));
3355 if (r < 0 && r != -ENOENT) {
3356 CLS_ERR("error removing global id for image '%s': %s", image_id.c_str(),
3357 cpp_strerror(r).c_str());
3361 r = cls_cxx_map_remove_key(hctx,
3362 status_global_key(mirror_image.global_image_id));
3363 if (r < 0 && r != -ENOENT) {
3364 CLS_ERR("error removing global status for image '%s': %s", image_id.c_str(),
3365 cpp_strerror(r).c_str());
3372 struct MirrorImageStatusOnDisk : cls::rbd::MirrorImageStatus {
3373 entity_inst_t origin;
3375 MirrorImageStatusOnDisk() {
3377 MirrorImageStatusOnDisk(const cls::rbd::MirrorImageStatus &status) :
3378 cls::rbd::MirrorImageStatus(status) {
3381 void encode_meta(bufferlist &bl, uint64_t features) const {
3382 ENCODE_START(1, 1, bl);
3383 ::encode(origin, bl, features);
3387 void encode(bufferlist &bl, uint64_t features) const {
3388 encode_meta(bl, features);
3389 cls::rbd::MirrorImageStatus::encode(bl);
3392 void decode_meta(bufferlist::iterator &it) {
3393 DECODE_START(1, it);
3394 ::decode(origin, it);
3398 void decode(bufferlist::iterator &it) {
3400 cls::rbd::MirrorImageStatus::decode(it);
3403 WRITE_CLASS_ENCODER_FEATURES(MirrorImageStatusOnDisk)
3405 int image_status_set(cls_method_context_t hctx, const string &global_image_id,
3406 const cls::rbd::MirrorImageStatus &status) {
3407 MirrorImageStatusOnDisk ondisk_status(status);
3408 ondisk_status.up = false;
3409 ondisk_status.last_update = ceph_clock_now();
3411 int r = cls_get_request_origin(hctx, &ondisk_status.origin);
3415 encode(ondisk_status, bl, cls_get_features(hctx));
3417 r = cls_cxx_map_set_val(hctx, status_global_key(global_image_id), &bl);
3419 CLS_ERR("error setting status for mirrored image, global id '%s': %s",
3420 global_image_id.c_str(), cpp_strerror(r).c_str());
3426 int image_status_remove(cls_method_context_t hctx,
3427 const string &global_image_id) {
3429 int r = cls_cxx_map_remove_key(hctx, status_global_key(global_image_id));
3431 CLS_ERR("error removing status for mirrored image, global id '%s': %s",
3432 global_image_id.c_str(), cpp_strerror(r).c_str());
3438 int image_status_get(cls_method_context_t hctx, const string &global_image_id,
3439 const std::set<entity_inst_t> &watchers,
3440 cls::rbd::MirrorImageStatus *status) {
3443 int r = cls_cxx_map_get_val(hctx, status_global_key(global_image_id), &bl);
3446 CLS_ERR("error reading status for mirrored image, global id '%s': '%s'",
3447 global_image_id.c_str(), cpp_strerror(r).c_str());
3452 MirrorImageStatusOnDisk ondisk_status;
3454 bufferlist::iterator it = bl.begin();
3455 decode(ondisk_status, it);
3456 } catch (const buffer::error &err) {
3457 CLS_ERR("could not decode status for mirrored image, global id '%s'",
3458 global_image_id.c_str());
3463 *status = static_cast<cls::rbd::MirrorImageStatus>(ondisk_status);
3464 status->up = (watchers.find(ondisk_status.origin) != watchers.end());
3468 int image_status_list(cls_method_context_t hctx,
3469 const std::string &start_after, uint64_t max_return,
3470 map<std::string, cls::rbd::MirrorImage> *mirror_images,
3471 map<std::string, cls::rbd::MirrorImageStatus> *mirror_statuses) {
3472 std::string last_read = image_key(start_after);
3473 int max_read = RBD_MAX_KEYS_READ;
3476 std::set<entity_inst_t> watchers;
3477 int r = list_watchers(hctx, &watchers);
3482 while (more && mirror_images->size() < max_return) {
3483 std::map<std::string, bufferlist> vals;
3484 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
3485 r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX, max_read, &vals,
3488 CLS_ERR("error reading mirror image directory by name: %s",
3489 cpp_strerror(r).c_str());
3493 for (auto it = vals.begin(); it != vals.end() &&
3494 mirror_images->size() < max_return; ++it) {
3495 const std::string &image_id = it->first.substr(IMAGE_KEY_PREFIX.size());
3496 cls::rbd::MirrorImage mirror_image;
3497 bufferlist::iterator iter = it->second.begin();
3499 ::decode(mirror_image, iter);
3500 } catch (const buffer::error &err) {
3501 CLS_ERR("could not decode mirror image payload of image '%s'",
3506 (*mirror_images)[image_id] = mirror_image;
3508 cls::rbd::MirrorImageStatus status;
3509 int r1 = image_status_get(hctx, mirror_image.global_image_id, watchers,
3515 (*mirror_statuses)[image_id] = status;
3517 if (!vals.empty()) {
3518 last_read = image_key(mirror_images->rbegin()->first);
3525 int image_status_get_summary(cls_method_context_t hctx,
3526 std::map<cls::rbd::MirrorImageStatusState, int> *states) {
3527 std::set<entity_inst_t> watchers;
3528 int r = list_watchers(hctx, &watchers);
3535 string last_read = IMAGE_KEY_PREFIX;
3536 int max_read = RBD_MAX_KEYS_READ;
3539 map<string, bufferlist> vals;
3540 r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX,
3541 max_read, &vals, &more);
3543 CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
3547 for (auto &list_it : vals) {
3548 const string &key = list_it.first;
3550 if (0 != key.compare(0, IMAGE_KEY_PREFIX.size(), IMAGE_KEY_PREFIX)) {
3554 cls::rbd::MirrorImage mirror_image;
3555 bufferlist::iterator iter = list_it.second.begin();
3557 ::decode(mirror_image, iter);
3558 } catch (const buffer::error &err) {
3559 CLS_ERR("could not decode mirror image payload for key '%s'",
3564 cls::rbd::MirrorImageStatus status;
3565 image_status_get(hctx, mirror_image.global_image_id, watchers, &status);
3567 cls::rbd::MirrorImageStatusState state = status.up ? status.state :
3568 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
3572 if (!vals.empty()) {
3573 last_read = vals.rbegin()->first;
3580 int image_status_remove_down(cls_method_context_t hctx) {
3581 std::set<entity_inst_t> watchers;
3582 int r = list_watchers(hctx, &watchers);
3587 string last_read = STATUS_GLOBAL_KEY_PREFIX;
3588 int max_read = RBD_MAX_KEYS_READ;
3591 map<string, bufferlist> vals;
3592 r = cls_cxx_map_get_vals(hctx, last_read, STATUS_GLOBAL_KEY_PREFIX,
3593 max_read, &vals, &more);
3595 CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
3599 for (auto &list_it : vals) {
3600 const string &key = list_it.first;
3602 if (0 != key.compare(0, STATUS_GLOBAL_KEY_PREFIX.size(),
3603 STATUS_GLOBAL_KEY_PREFIX)) {
3607 MirrorImageStatusOnDisk status;
3609 bufferlist::iterator it = list_it.second.begin();
3610 status.decode_meta(it);
3611 } catch (const buffer::error &err) {
3612 CLS_ERR("could not decode status metadata for mirrored image '%s'",
3617 if (watchers.find(status.origin) == watchers.end()) {
3618 CLS_LOG(20, "removing stale status object for key %s",
3620 int r1 = cls_cxx_map_remove_key(hctx, key);
3622 CLS_ERR("error removing stale status for key '%s': %s",
3623 key.c_str(), cpp_strerror(r1).c_str());
3629 if (!vals.empty()) {
3630 last_read = vals.rbegin()->first;
3637 int instances_list(cls_method_context_t hctx,
3638 std::vector<std::string> *instance_ids) {
3639 std::string last_read = INSTANCE_KEY_PREFIX;
3640 int max_read = RBD_MAX_KEYS_READ;
3643 std::map<std::string, bufferlist> vals;
3644 int r = cls_cxx_map_get_vals(hctx, last_read, INSTANCE_KEY_PREFIX.c_str(),
3645 max_read, &vals, &more);
3648 CLS_ERR("error reading mirror instances: %s", cpp_strerror(r).c_str());
3653 for (auto &it : vals) {
3654 instance_ids->push_back(it.first.substr(INSTANCE_KEY_PREFIX.size()));
3657 if (!vals.empty()) {
3658 last_read = vals.rbegin()->first;
3664 int instances_add(cls_method_context_t hctx, const string &instance_id) {
3667 int r = cls_cxx_map_set_val(hctx, instance_key(instance_id), &bl);
3669 CLS_ERR("error setting mirror instance %s: %s", instance_id.c_str(),
3670 cpp_strerror(r).c_str());
3676 int instances_remove(cls_method_context_t hctx, const string &instance_id) {
3678 int r = cls_cxx_map_remove_key(hctx, instance_key(instance_id));
3680 CLS_ERR("error removing mirror instance %s: %s", instance_id.c_str(),
3681 cpp_strerror(r).c_str());
3687 } // namespace mirror
3694 * @param uuid (std::string)
3695 * @returns 0 on success, negative error code on failure
3697 int mirror_uuid_get(cls_method_context_t hctx, bufferlist *in,
3699 std::string mirror_uuid;
3700 int r = mirror::uuid_get(hctx, &mirror_uuid);
3705 ::encode(mirror_uuid, *out);
3711 * @param mirror_uuid (std::string)
3714 * @returns 0 on success, negative error code on failure
3716 int mirror_uuid_set(cls_method_context_t hctx, bufferlist *in,
3718 std::string mirror_uuid;
3720 bufferlist::iterator bl_it = in->begin();
3721 ::decode(mirror_uuid, bl_it);
3722 } catch (const buffer::error &err) {
3726 if (mirror_uuid.empty()) {
3727 CLS_ERR("cannot set empty mirror uuid");
3731 uint32_t mirror_mode;
3732 int r = read_key(hctx, mirror::MODE, &mirror_mode);
3733 if (r < 0 && r != -ENOENT) {
3735 } else if (r == 0 && mirror_mode != cls::rbd::MIRROR_MODE_DISABLED) {
3736 CLS_ERR("cannot set mirror uuid while mirroring enabled");
3740 bufferlist mirror_uuid_bl;
3741 mirror_uuid_bl.append(mirror_uuid);
3742 r = cls_cxx_map_set_val(hctx, mirror::UUID, &mirror_uuid_bl);
3744 CLS_ERR("failed to set mirror uuid");
3755 * @param cls::rbd::MirrorMode (uint32_t)
3756 * @returns 0 on success, negative error code on failure
3758 int mirror_mode_get(cls_method_context_t hctx, bufferlist *in,
3760 uint32_t mirror_mode_decode;
3761 int r = read_key(hctx, mirror::MODE, &mirror_mode_decode);
3766 ::encode(mirror_mode_decode, *out);
3772 * @param mirror_mode (cls::rbd::MirrorMode) (uint32_t)
3775 * @returns 0 on success, negative error code on failure
3777 int mirror_mode_set(cls_method_context_t hctx, bufferlist *in,
3779 uint32_t mirror_mode_decode;
3781 bufferlist::iterator bl_it = in->begin();
3782 ::decode(mirror_mode_decode, bl_it);
3783 } catch (const buffer::error &err) {
3788 switch (static_cast<cls::rbd::MirrorMode>(mirror_mode_decode)) {
3789 case cls::rbd::MIRROR_MODE_DISABLED:
3792 case cls::rbd::MIRROR_MODE_IMAGE:
3793 case cls::rbd::MIRROR_MODE_POOL:
3797 CLS_ERR("invalid mirror mode: %d", mirror_mode_decode);
3803 std::string mirror_uuid;
3804 r = mirror::uuid_get(hctx, &mirror_uuid);
3812 ::encode(mirror_mode_decode, bl);
3814 r = cls_cxx_map_set_val(hctx, mirror::MODE, &bl);
3816 CLS_ERR("error enabling mirroring: %s", cpp_strerror(r).c_str());
3820 std::vector<cls::rbd::MirrorPeer> peers;
3821 r = mirror::read_peers(hctx, &peers);
3822 if (r < 0 && r != -ENOENT) {
3826 if (!peers.empty()) {
3827 CLS_ERR("mirroring peers still registered");
3831 r = remove_key(hctx, mirror::MODE);
3836 r = remove_key(hctx, mirror::UUID);
3849 * @param std::vector<cls::rbd::MirrorPeer>: collection of peers
3850 * @returns 0 on success, negative error code on failure
3852 int mirror_peer_list(cls_method_context_t hctx, bufferlist *in,
3854 std::vector<cls::rbd::MirrorPeer> peers;
3855 int r = mirror::read_peers(hctx, &peers);
3856 if (r < 0 && r != -ENOENT) {
3860 ::encode(peers, *out);
3866 * @param mirror_peer (cls::rbd::MirrorPeer)
3869 * @returns 0 on success, negative error code on failure
3871 int mirror_peer_add(cls_method_context_t hctx, bufferlist *in,
3873 cls::rbd::MirrorPeer mirror_peer;
3875 bufferlist::iterator it = in->begin();
3876 ::decode(mirror_peer, it);
3877 } catch (const buffer::error &err) {
3881 uint32_t mirror_mode_decode;
3882 int r = read_key(hctx, mirror::MODE, &mirror_mode_decode);
3883 if (r < 0 && r != -ENOENT) {
3885 } else if (r == -ENOENT ||
3886 mirror_mode_decode == cls::rbd::MIRROR_MODE_DISABLED) {
3887 CLS_ERR("mirroring must be enabled on the pool");
3889 } else if (!mirror_peer.is_valid()) {
3890 CLS_ERR("mirror peer is not valid");
3894 std::string mirror_uuid;
3895 r = mirror::uuid_get(hctx, &mirror_uuid);
3897 CLS_ERR("error retrieving mirroring uuid: %s", cpp_strerror(r).c_str());
3899 } else if (mirror_peer.uuid == mirror_uuid) {
3900 CLS_ERR("peer uuid '%s' matches pool mirroring uuid",
3901 mirror_uuid.c_str());
3905 std::vector<cls::rbd::MirrorPeer> peers;
3906 r = mirror::read_peers(hctx, &peers);
3907 if (r < 0 && r != -ENOENT) {
3911 for (auto const &peer : peers) {
3912 if (peer.uuid == mirror_peer.uuid) {
3913 CLS_ERR("peer uuid '%s' already exists",
3916 } else if (peer.cluster_name == mirror_peer.cluster_name &&
3917 (peer.pool_id == -1 || mirror_peer.pool_id == -1 ||
3918 peer.pool_id == mirror_peer.pool_id)) {
3919 CLS_ERR("peer cluster name '%s' already exists",
3920 peer.cluster_name.c_str());
3926 ::encode(mirror_peer, bl);
3927 r = cls_cxx_map_set_val(hctx, mirror::peer_key(mirror_peer.uuid),
3930 CLS_ERR("error adding peer: %s", cpp_strerror(r).c_str());
3938 * @param uuid (std::string)
3941 * @returns 0 on success, negative error code on failure
3943 int mirror_peer_remove(cls_method_context_t hctx, bufferlist *in,
3947 bufferlist::iterator it = in->begin();
3949 } catch (const buffer::error &err) {
3953 int r = cls_cxx_map_remove_key(hctx, mirror::peer_key(uuid));
3954 if (r < 0 && r != -ENOENT) {
3955 CLS_ERR("error removing peer: %s", cpp_strerror(r).c_str());
3963 * @param uuid (std::string)
3964 * @param client_name (std::string)
3967 * @returns 0 on success, negative error code on failure
3969 int mirror_peer_set_client(cls_method_context_t hctx, bufferlist *in,
3972 std::string client_name;
3974 bufferlist::iterator it = in->begin();
3976 ::decode(client_name, it);
3977 } catch (const buffer::error &err) {
3981 cls::rbd::MirrorPeer peer;
3982 int r = mirror::read_peer(hctx, uuid, &peer);
3987 peer.client_name = client_name;
3988 r = mirror::write_peer(hctx, uuid, peer);
3997 * @param uuid (std::string)
3998 * @param cluster_name (std::string)
4001 * @returns 0 on success, negative error code on failure
4003 int mirror_peer_set_cluster(cls_method_context_t hctx, bufferlist *in,
4006 std::string cluster_name;
4008 bufferlist::iterator it = in->begin();
4010 ::decode(cluster_name, it);
4011 } catch (const buffer::error &err) {
4015 cls::rbd::MirrorPeer peer;
4016 int r = mirror::read_peer(hctx, uuid, &peer);
4021 peer.cluster_name = cluster_name;
4022 r = mirror::write_peer(hctx, uuid, peer);
4031 * @param start_after which name to begin listing after
4032 * (use the empty string to start at the beginning)
4033 * @param max_return the maximum number of names to list
4036 * @param std::map<std::string, std::string>: local id to global id map
4037 * @returns 0 on success, negative error code on failure
4039 int mirror_image_list(cls_method_context_t hctx, bufferlist *in,
4041 std::string start_after;
4042 uint64_t max_return;
4044 bufferlist::iterator iter = in->begin();
4045 ::decode(start_after, iter);
4046 ::decode(max_return, iter);
4047 } catch (const buffer::error &err) {
4051 int max_read = RBD_MAX_KEYS_READ;
4053 std::map<std::string, std::string> mirror_images;
4054 std::string last_read = mirror::image_key(start_after);
4056 while (more && mirror_images.size() < max_return) {
4057 std::map<std::string, bufferlist> vals;
4058 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
4059 int r = cls_cxx_map_get_vals(hctx, last_read, mirror::IMAGE_KEY_PREFIX,
4060 max_read, &vals, &more);
4062 CLS_ERR("error reading mirror image directory by name: %s",
4063 cpp_strerror(r).c_str());
4067 for (auto it = vals.begin(); it != vals.end(); ++it) {
4068 const std::string &image_id =
4069 it->first.substr(mirror::IMAGE_KEY_PREFIX.size());
4070 cls::rbd::MirrorImage mirror_image;
4071 bufferlist::iterator iter = it->second.begin();
4073 ::decode(mirror_image, iter);
4074 } catch (const buffer::error &err) {
4075 CLS_ERR("could not decode mirror image payload of image '%s'",
4080 mirror_images[image_id] = mirror_image.global_image_id;
4081 if (mirror_images.size() >= max_return) {
4085 if (!vals.empty()) {
4086 last_read = mirror::image_key(mirror_images.rbegin()->first);
4090 ::encode(mirror_images, *out);
4096 * @param global_id (std::string)
4099 * @param std::string - image id
4100 * @returns 0 on success, negative error code on failure
4102 int mirror_image_get_image_id(cls_method_context_t hctx, bufferlist *in,
4104 std::string global_id;
4106 bufferlist::iterator it = in->begin();
4107 ::decode(global_id, it);
4108 } catch (const buffer::error &err) {
4112 std::string image_id;
4113 int r = read_key(hctx, mirror::global_key(global_id), &image_id);
4115 CLS_ERR("error retrieving image id for global id '%s': %s",
4116 global_id.c_str(), cpp_strerror(r).c_str());
4120 ::encode(image_id, *out);
4126 * @param image_id (std::string)
4129 * @param cls::rbd::MirrorImage - metadata associated with the image_id
4130 * @returns 0 on success, negative error code on failure
4132 int mirror_image_get(cls_method_context_t hctx, bufferlist *in,
4136 bufferlist::iterator it = in->begin();
4137 ::decode(image_id, it);
4138 } catch (const buffer::error &err) {
4142 cls::rbd::MirrorImage mirror_image;
4143 int r = mirror::image_get(hctx, image_id, &mirror_image);
4148 ::encode(mirror_image, *out);
4154 * @param image_id (std::string)
4155 * @param mirror_image (cls::rbd::MirrorImage)
4158 * @returns 0 on success, negative error code on failure
4159 * @returns -EEXIST if there's an existing image_id with a different global_image_id
4161 int mirror_image_set(cls_method_context_t hctx, bufferlist *in,
4164 cls::rbd::MirrorImage mirror_image;
4166 bufferlist::iterator it = in->begin();
4167 ::decode(image_id, it);
4168 ::decode(mirror_image, it);
4169 } catch (const buffer::error &err) {
4173 int r = mirror::image_set(hctx, image_id, mirror_image);
4182 * @param image_id (std::string)
4185 * @returns 0 on success, negative error code on failure
4187 int mirror_image_remove(cls_method_context_t hctx, bufferlist *in,
4191 bufferlist::iterator it = in->begin();
4192 ::decode(image_id, it);
4193 } catch (const buffer::error &err) {
4197 int r = mirror::image_remove(hctx, image_id);
4206 * @param global_image_id (std::string)
4207 * @param status (cls::rbd::MirrorImageStatus)
4210 * @returns 0 on success, negative error code on failure
4212 int mirror_image_status_set(cls_method_context_t hctx, bufferlist *in,
4214 string global_image_id;
4215 cls::rbd::MirrorImageStatus status;
4217 bufferlist::iterator it = in->begin();
4218 ::decode(global_image_id, it);
4219 ::decode(status, it);
4220 } catch (const buffer::error &err) {
4224 int r = mirror::image_status_set(hctx, global_image_id, status);
4233 * @param global_image_id (std::string)
4236 * @returns 0 on success, negative error code on failure
4238 int mirror_image_status_remove(cls_method_context_t hctx, bufferlist *in,
4240 string global_image_id;
4242 bufferlist::iterator it = in->begin();
4243 ::decode(global_image_id, it);
4244 } catch (const buffer::error &err) {
4248 int r = mirror::image_status_remove(hctx, global_image_id);
4257 * @param global_image_id (std::string)
4260 * @param cls::rbd::MirrorImageStatus - metadata associated with the global_image_id
4261 * @returns 0 on success, negative error code on failure
4263 int mirror_image_status_get(cls_method_context_t hctx, bufferlist *in,
4265 string global_image_id;
4267 bufferlist::iterator it = in->begin();
4268 ::decode(global_image_id, it);
4269 } catch (const buffer::error &err) {
4273 std::set<entity_inst_t> watchers;
4274 int r = mirror::list_watchers(hctx, &watchers);
4279 cls::rbd::MirrorImageStatus status;
4280 r = mirror::image_status_get(hctx, global_image_id, watchers, &status);
4285 ::encode(status, *out);
4291 * @param start_after which name to begin listing after
4292 * (use the empty string to start at the beginning)
4293 * @param max_return the maximum number of names to list
4296 * @param std::map<std::string, cls::rbd::MirrorImage>: image id to image map
4297 * @param std::map<std::string, cls::rbd::MirrorImageStatus>: image it to status map
4298 * @returns 0 on success, negative error code on failure
4300 int mirror_image_status_list(cls_method_context_t hctx, bufferlist *in,
4302 std::string start_after;
4303 uint64_t max_return;
4305 bufferlist::iterator iter = in->begin();
4306 ::decode(start_after, iter);
4307 ::decode(max_return, iter);
4308 } catch (const buffer::error &err) {
4312 map<std::string, cls::rbd::MirrorImage> images;
4313 map<std::string, cls::rbd::MirrorImageStatus> statuses;
4314 int r = mirror::image_status_list(hctx, start_after, max_return, &images,
4320 ::encode(images, *out);
4321 ::encode(statuses, *out);
4330 * @param std::map<cls::rbd::MirrorImageStatusState, int>: states counts
4331 * @returns 0 on success, negative error code on failure
4333 int mirror_image_status_get_summary(cls_method_context_t hctx, bufferlist *in,
4335 std::map<cls::rbd::MirrorImageStatusState, int> states;
4337 int r = mirror::image_status_get_summary(hctx, &states);
4342 ::encode(states, *out);
4351 * @returns 0 on success, negative error code on failure
4353 int mirror_image_status_remove_down(cls_method_context_t hctx, bufferlist *in,
4355 int r = mirror::image_status_remove_down(hctx);
4367 * @param std::vector<std::string>: instance ids
4368 * @returns 0 on success, negative error code on failure
4370 int mirror_instances_list(cls_method_context_t hctx, bufferlist *in,
4372 std::vector<std::string> instance_ids;
4374 int r = mirror::instances_list(hctx, &instance_ids);
4379 ::encode(instance_ids, *out);
4385 * @param instance_id (std::string)
4388 * @returns 0 on success, negative error code on failure
4390 int mirror_instances_add(cls_method_context_t hctx, bufferlist *in,
4392 std::string instance_id;
4394 bufferlist::iterator iter = in->begin();
4395 ::decode(instance_id, iter);
4396 } catch (const buffer::error &err) {
4400 int r = mirror::instances_add(hctx, instance_id);
4409 * @param instance_id (std::string)
4412 * @returns 0 on success, negative error code on failure
4414 int mirror_instances_remove(cls_method_context_t hctx, bufferlist *in,
4416 std::string instance_id;
4418 bufferlist::iterator iter = in->begin();
4419 ::decode(instance_id, iter);
4420 } catch (const buffer::error &err) {
4424 int r = mirror::instances_remove(hctx, instance_id);
4432 * Initialize the header with basic metadata.
4433 * Everything is stored as key/value pairs as omaps in the header object.
4439 * @return 0 on success, negative error code on failure
4441 int group_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4443 bufferlist snap_seqbl;
4444 uint64_t snap_seq = 0;
4445 ::encode(snap_seq, snap_seqbl);
4446 int r = cls_cxx_map_set_val(hctx, GROUP_SNAP_SEQ, &snap_seqbl);
4454 * List consistency groups from the directory.
4457 * @param start_after (std::string)
4458 * @param max_return (int64_t)
4461 * @param map of consistency groups (name, id)
4462 * @return 0 on success, negative error code on failure
4464 int group_dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4467 uint64_t max_return;
4470 bufferlist::iterator iter = in->begin();
4471 ::decode(start_after, iter);
4472 ::decode(max_return, iter);
4473 } catch (const buffer::error &err) {
4477 int max_read = RBD_MAX_KEYS_READ;
4479 map<string, string> groups;
4480 string last_read = dir_key_for_name(start_after);
4482 while (more && groups.size() < max_return) {
4483 map<string, bufferlist> vals;
4484 CLS_LOG(20, "last_read = '%s'", last_read.c_str());
4485 int r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
4486 max_read, &vals, &more);
4488 CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
4492 for (pair<string, bufferlist> val: vals) {
4494 bufferlist::iterator iter = val.second.begin();
4497 } catch (const buffer::error &err) {
4498 CLS_ERR("could not decode id of consistency group '%s'", val.first.c_str());
4501 CLS_LOG(20, "adding '%s' -> '%s'", dir_name_from_key(val.first).c_str(), id.c_str());
4502 groups[dir_name_from_key(val.first)] = id;
4503 if (groups.size() >= max_return)
4506 if (!vals.empty()) {
4507 last_read = dir_key_for_name(groups.rbegin()->first);
4511 ::encode(groups, *out);
4517 * Add a consistency group to the directory.
4520 * @param name (std::string)
4521 * @param id (std::string)
4524 * @return 0 on success, negative error code on failure
4526 int group_dir_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4528 int r = cls_cxx_create(hctx, false);
4531 CLS_ERR("could not create consistency group directory: %s",
4532 cpp_strerror(r).c_str());
4538 bufferlist::iterator iter = in->begin();
4539 ::decode(name, iter);
4541 } catch (const buffer::error &err) {
4545 if (!name.size() || !is_valid_id(id)) {
4546 CLS_ERR("invalid consistency group name '%s' or id '%s'",
4547 name.c_str(), id.c_str());
4551 CLS_LOG(20, "group_dir_add name=%s id=%s", name.c_str(), id.c_str());
4554 string name_key = dir_key_for_name(name);
4555 string id_key = dir_key_for_id(id);
4556 r = read_key(hctx, name_key, &tmp);
4558 CLS_LOG(10, "name already exists");
4561 r = read_key(hctx, id_key, &tmp);
4563 CLS_LOG(10, "id already exists");
4566 bufferlist id_bl, name_bl;
4567 ::encode(id, id_bl);
4568 ::encode(name, name_bl);
4569 map<string, bufferlist> omap_vals;
4570 omap_vals[name_key] = id_bl;
4571 omap_vals[id_key] = name_bl;
4572 return cls_cxx_map_set_vals(hctx, &omap_vals);
4576 * Remove a consistency group from the directory.
4579 * @param name (std::string)
4580 * @param id (std::string)
4583 * @return 0 on success, negative error code on failure
4585 int group_dir_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4589 bufferlist::iterator iter = in->begin();
4590 ::decode(name, iter);
4592 } catch (const buffer::error &err) {
4596 CLS_LOG(20, "group_dir_remove name=%s id=%s", name.c_str(), id.c_str());
4598 string stored_name, stored_id;
4599 string name_key = dir_key_for_name(name);
4600 string id_key = dir_key_for_id(id);
4602 int r = read_key(hctx, name_key, &stored_id);
4605 CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
4608 r = read_key(hctx, id_key, &stored_name);
4611 CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
4615 // check if this op raced with a rename
4616 if (stored_name != name || stored_id != id) {
4617 CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
4618 stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
4622 r = cls_cxx_map_remove_key(hctx, name_key);
4624 CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
4628 r = cls_cxx_map_remove_key(hctx, id_key);
4630 CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
4638 * Set state of an image in the consistency group.
4641 * @param image_status (cls::rbd::GroupImageStatus)
4644 * @return 0 on success, negative error code on failure
4646 int group_image_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4648 CLS_LOG(20, "group_image_set");
4650 cls::rbd::GroupImageStatus st;
4652 bufferlist::iterator iter = in->begin();
4654 } catch (const buffer::error &err) {
4658 string image_key = st.spec.image_key();
4660 bufferlist image_val_bl;
4661 ::encode(st.state, image_val_bl);
4662 int r = cls_cxx_map_set_val(hctx, image_key, &image_val_bl);
4671 * Remove reference to an image from the consistency group.
4674 * @param spec (cls::rbd::GroupImageSpec)
4677 * @return 0 on success, negative error code on failure
4679 int group_image_remove(cls_method_context_t hctx,
4680 bufferlist *in, bufferlist *out)
4682 CLS_LOG(20, "group_image_remove");
4683 cls::rbd::GroupImageSpec spec;
4685 bufferlist::iterator iter = in->begin();
4686 ::decode(spec, iter);
4687 } catch (const buffer::error &err) {
4691 string image_key = spec.image_key();
4693 int r = cls_cxx_map_remove_key(hctx, image_key);
4695 CLS_ERR("error removing image from group: %s", cpp_strerror(r).c_str());
4703 * List images in the consistency group.
4706 * @param start_after which name to begin listing after
4707 * (use the empty string to start at the beginning)
4708 * @param max_return the maximum number of names to list
4711 * @param tuples of descriptions of the images: image_id, pool_id, image reference state.
4712 * @return 0 on success, negative error code on failure
4714 int group_image_list(cls_method_context_t hctx,
4715 bufferlist *in, bufferlist *out)
4717 CLS_LOG(20, "group_image_list");
4718 cls::rbd::GroupImageSpec start_after;
4719 uint64_t max_return;
4721 bufferlist::iterator iter = in->begin();
4722 ::decode(start_after, iter);
4723 ::decode(max_return, iter);
4724 } catch (const buffer::error &err) {
4728 int max_read = RBD_MAX_KEYS_READ;
4729 std::map<string, bufferlist> vals;
4730 string last_read = start_after.image_key();
4731 std::vector<cls::rbd::GroupImageStatus> res;
4734 int r = cls_cxx_map_get_vals(hctx, last_read,cls::rbd::RBD_GROUP_IMAGE_KEY_PREFIX,
4735 max_read, &vals, &more);
4739 for (map<string, bufferlist>::iterator it = vals.begin();
4740 it != vals.end() && res.size() < max_return; ++it) {
4742 bufferlist::iterator iter = it->second.begin();
4743 cls::rbd::GroupImageLinkState state;
4745 ::decode(state, iter);
4746 } catch (const buffer::error &err) {
4747 CLS_ERR("error decoding state for image: %s", it->first.c_str());
4750 cls::rbd::GroupImageSpec spec;
4751 int r = cls::rbd::GroupImageSpec::from_key(it->first, &spec);
4755 CLS_LOG(20, "Discovered image %s %" PRId64 " %d", spec.image_id.c_str(),
4758 res.push_back(cls::rbd::GroupImageStatus(spec, state));
4760 if (res.size() > 0) {
4761 last_read = res.rbegin()->spec.image_key();
4764 } while (more && (res.size() < max_return));
4765 ::encode(res, *out);
4771 * Reference the consistency group this image belongs to.
4774 * @param group_id (std::string)
4775 * @param pool_id (int64_t)
4778 * @return 0 on success, negative error code on failure
4780 int image_add_group(cls_method_context_t hctx,
4781 bufferlist *in, bufferlist *out)
4783 CLS_LOG(20, "image_add_group");
4784 cls::rbd::GroupSpec new_group;
4786 bufferlist::iterator iter = in->begin();
4787 ::decode(new_group, iter);
4788 } catch (const buffer::error &err) {
4792 bufferlist existing_refbl;
4794 int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &existing_refbl);
4796 // If we are trying to link this image to the same group then return success.
4797 // If this image already belongs to another group then abort.
4798 cls::rbd::GroupSpec old_group;
4800 bufferlist::iterator iter = existing_refbl.begin();
4801 ::decode(old_group, iter);
4802 } catch (const buffer::error &err) {
4806 if ((old_group.group_id != new_group.group_id)
4807 || (old_group.pool_id != new_group.pool_id)) {
4810 return 0; // In this case the values are already correct
4812 } else if (r < 0 && r != -ENOENT) { // No entry means this image is not a member of any consistency group. So, we can use it.
4817 ::encode(new_group, refbl);
4818 r = cls_cxx_map_set_val(hctx, RBD_GROUP_REF, &refbl);
4828 * Remove image's pointer to the consistency group.
4831 * @param cg_id (std::string)
4832 * @param pool_id (int64_t)
4835 * @return 0 on success, negative error code on failure
4837 int image_remove_group(cls_method_context_t hctx,
4841 CLS_LOG(20, "image_remove_group");
4842 cls::rbd::GroupSpec spec;
4844 bufferlist::iterator iter = in->begin();
4845 ::decode(spec, iter);
4846 } catch (const buffer::error &err) {
4851 int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
4856 cls::rbd::GroupSpec ref_spec;
4857 bufferlist::iterator iter = refbl.begin();
4859 ::decode(ref_spec, iter);
4860 } catch (const buffer::error &err) {
4864 if (ref_spec.pool_id != spec.pool_id || ref_spec.group_id != spec.group_id) {
4868 r = cls_cxx_map_remove_key(hctx, RBD_GROUP_REF);
4877 * Retrieve the id and pool of the consistency group this image belongs to.
4884 * @return 0 on success, negative error code on failure
4886 int image_get_group(cls_method_context_t hctx,
4887 bufferlist *in, bufferlist *out)
4889 CLS_LOG(20, "image_get_group");
4891 int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
4892 if (r < 0 && r != -ENOENT) {
4896 cls::rbd::GroupSpec spec;
4899 bufferlist::iterator iter = refbl.begin();
4901 ::decode(spec, iter);
4902 } catch (const buffer::error &err) {
4907 ::encode(spec, *out);
4913 static const std::string IMAGE_KEY_PREFIX("id_");
4915 std::string image_key(const std::string &image_id) {
4916 return IMAGE_KEY_PREFIX + image_id;
4919 std::string image_id_from_key(const std::string &key) {
4920 return key.substr(IMAGE_KEY_PREFIX.size());
4923 } // namespace trash
4926 * Add an image entry to the rbd trash. Creates the trash object if
4927 * needed, and stores the trash spec information of the deleted image.
4930 * @param id the id of the image
4931 * @param trash_spec the spec info of the deleted image
4934 * @returns -EEXIST if the image id is already in the trash
4935 * @returns 0 on success, negative error code on failure
4937 int trash_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4939 int r = cls_cxx_create(hctx, false);
4941 CLS_ERR("could not create trash: %s", cpp_strerror(r).c_str());
4946 cls::rbd::TrashImageSpec trash_spec;
4948 bufferlist::iterator iter = in->begin();
4950 ::decode(trash_spec, iter);
4951 } catch (const buffer::error &err) {
4955 if (!is_valid_id(id)) {
4956 CLS_ERR("trash_add: invalid id '%s'", id.c_str());
4960 CLS_LOG(20, "trash_add id=%s", id.c_str());
4962 string key = trash::image_key(id);
4963 cls::rbd::TrashImageSpec tmp;
4964 r = read_key(hctx, key, &tmp);
4965 if (r < 0 && r != -ENOENT) {
4966 CLS_ERR("could not read key %s entry from trash: %s", key.c_str(),
4967 cpp_strerror(r).c_str());
4969 } else if (r == 0) {
4970 CLS_LOG(10, "id already exists");
4974 map<string, bufferlist> omap_vals;
4975 ::encode(trash_spec, omap_vals[key]);
4976 return cls_cxx_map_set_vals(hctx, &omap_vals);
4980 * Removes an image entry from the rbd trash object.
4984 * @param id the id of the image
4987 * @returns -ENOENT if the image id does not exist in the trash
4988 * @returns 0 on success, negative error code on failure
4990 int trash_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
4994 bufferlist::iterator iter = in->begin();
4996 } catch (const buffer::error &err) {
5000 CLS_LOG(20, "trash_remove id=%s", id.c_str());
5002 string key = trash::image_key(id);
5004 int r = cls_cxx_map_get_val(hctx, key, &tmp);
5007 CLS_ERR("error reading entry key %s: %s", key.c_str(), cpp_strerror(r).c_str());
5012 r = cls_cxx_map_remove_key(hctx, key);
5014 CLS_ERR("error removing entry: %s", cpp_strerror(r).c_str());
5022 * Returns the list of trash spec entries registered in the rbd_trash
5026 * @param start_after which name to begin listing after
5027 * (use the empty string to start at the beginning)
5028 * @param max_return the maximum number of names to list
5031 * @param data the map between image id and trash spec info
5033 * @returns 0 on success, negative error code on failure
5035 int trash_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
5038 uint64_t max_return;
5041 bufferlist::iterator iter = in->begin();
5042 ::decode(start_after, iter);
5043 ::decode(max_return, iter);
5044 } catch (const buffer::error &err) {
5048 map<string, cls::rbd::TrashImageSpec> data;
5049 string last_read = trash::image_key(start_after);
5052 CLS_LOG(20, "trash_get_images");
5053 while (data.size() < max_return) {
5054 map<string, bufferlist> raw_data;
5055 int max_read = std::min<int32_t>(RBD_MAX_KEYS_READ,
5056 max_return - data.size());
5057 int r = cls_cxx_map_get_vals(hctx, last_read, trash::IMAGE_KEY_PREFIX,
5058 max_read, &raw_data, &more);
5060 CLS_ERR("failed to read the vals off of disk: %s",
5061 cpp_strerror(r).c_str());
5064 if (raw_data.empty()) {
5068 map<string, bufferlist>::iterator it = raw_data.begin();
5069 for (; it != raw_data.end(); ++it) {
5070 ::decode(data[trash::image_id_from_key(it->first)], it->second);
5077 last_read = raw_data.rbegin()->first;
5080 ::encode(data, *out);
5085 * Returns the trash spec entry of an image registered in the rbd_trash
5089 * @param id the id of the image
5092 * @param out the trash spec entry
5094 * @returns 0 on success, negative error code on failure
5096 int trash_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
5100 bufferlist::iterator iter = in->begin();
5102 } catch (const buffer::error &err) {
5106 CLS_LOG(20, "trash_get_image id=%s", id.c_str());
5109 string key = trash::image_key(id);
5111 int r = cls_cxx_map_get_val(hctx, key, out);
5113 CLS_ERR("error reading image from trash '%s': '%s'", id.c_str(),
5114 cpp_strerror(r).c_str());
5121 CLS_LOG(20, "Loaded rbd class!");
5123 cls_handle_t h_class;
5124 cls_method_handle_t h_create;
5125 cls_method_handle_t h_get_features;
5126 cls_method_handle_t h_set_features;
5127 cls_method_handle_t h_get_size;
5128 cls_method_handle_t h_set_size;
5129 cls_method_handle_t h_get_parent;
5130 cls_method_handle_t h_set_parent;
5131 cls_method_handle_t h_get_protection_status;
5132 cls_method_handle_t h_set_protection_status;
5133 cls_method_handle_t h_get_stripe_unit_count;
5134 cls_method_handle_t h_set_stripe_unit_count;
5135 cls_method_handle_t h_get_create_timestamp;
5136 cls_method_handle_t h_get_flags;
5137 cls_method_handle_t h_set_flags;
5138 cls_method_handle_t h_remove_parent;
5139 cls_method_handle_t h_add_child;
5140 cls_method_handle_t h_remove_child;
5141 cls_method_handle_t h_get_children;
5142 cls_method_handle_t h_get_snapcontext;
5143 cls_method_handle_t h_get_object_prefix;
5144 cls_method_handle_t h_get_data_pool;
5145 cls_method_handle_t h_get_snapshot_name;
5146 cls_method_handle_t h_get_snapshot_namespace;
5147 cls_method_handle_t h_get_snapshot_timestamp;
5148 cls_method_handle_t h_snapshot_add;
5149 cls_method_handle_t h_snapshot_remove;
5150 cls_method_handle_t h_snapshot_rename;
5151 cls_method_handle_t h_get_all_features;
5152 cls_method_handle_t h_copyup;
5153 cls_method_handle_t h_get_id;
5154 cls_method_handle_t h_set_id;
5155 cls_method_handle_t h_dir_get_id;
5156 cls_method_handle_t h_dir_get_name;
5157 cls_method_handle_t h_dir_list;
5158 cls_method_handle_t h_dir_add_image;
5159 cls_method_handle_t h_dir_remove_image;
5160 cls_method_handle_t h_dir_rename_image;
5161 cls_method_handle_t h_object_map_load;
5162 cls_method_handle_t h_object_map_save;
5163 cls_method_handle_t h_object_map_resize;
5164 cls_method_handle_t h_object_map_update;
5165 cls_method_handle_t h_object_map_snap_add;
5166 cls_method_handle_t h_object_map_snap_remove;
5167 cls_method_handle_t h_metadata_set;
5168 cls_method_handle_t h_metadata_remove;
5169 cls_method_handle_t h_metadata_list;
5170 cls_method_handle_t h_metadata_get;
5171 cls_method_handle_t h_snapshot_get_limit;
5172 cls_method_handle_t h_snapshot_set_limit;
5173 cls_method_handle_t h_old_snapshots_list;
5174 cls_method_handle_t h_old_snapshot_add;
5175 cls_method_handle_t h_old_snapshot_remove;
5176 cls_method_handle_t h_old_snapshot_rename;
5177 cls_method_handle_t h_mirror_uuid_get;
5178 cls_method_handle_t h_mirror_uuid_set;
5179 cls_method_handle_t h_mirror_mode_get;
5180 cls_method_handle_t h_mirror_mode_set;
5181 cls_method_handle_t h_mirror_peer_list;
5182 cls_method_handle_t h_mirror_peer_add;
5183 cls_method_handle_t h_mirror_peer_remove;
5184 cls_method_handle_t h_mirror_peer_set_client;
5185 cls_method_handle_t h_mirror_peer_set_cluster;
5186 cls_method_handle_t h_mirror_image_list;
5187 cls_method_handle_t h_mirror_image_get_image_id;
5188 cls_method_handle_t h_mirror_image_get;
5189 cls_method_handle_t h_mirror_image_set;
5190 cls_method_handle_t h_mirror_image_remove;
5191 cls_method_handle_t h_mirror_image_status_set;
5192 cls_method_handle_t h_mirror_image_status_remove;
5193 cls_method_handle_t h_mirror_image_status_get;
5194 cls_method_handle_t h_mirror_image_status_list;
5195 cls_method_handle_t h_mirror_image_status_get_summary;
5196 cls_method_handle_t h_mirror_image_status_remove_down;
5197 cls_method_handle_t h_mirror_instances_list;
5198 cls_method_handle_t h_mirror_instances_add;
5199 cls_method_handle_t h_mirror_instances_remove;
5200 cls_method_handle_t h_group_create;
5201 cls_method_handle_t h_group_dir_list;
5202 cls_method_handle_t h_group_dir_add;
5203 cls_method_handle_t h_group_dir_remove;
5204 cls_method_handle_t h_group_image_remove;
5205 cls_method_handle_t h_group_image_list;
5206 cls_method_handle_t h_group_image_set;
5207 cls_method_handle_t h_image_add_group;
5208 cls_method_handle_t h_image_remove_group;
5209 cls_method_handle_t h_image_get_group;
5210 cls_method_handle_t h_trash_add;
5211 cls_method_handle_t h_trash_remove;
5212 cls_method_handle_t h_trash_list;
5213 cls_method_handle_t h_trash_get;
5215 cls_register("rbd", &h_class);
5216 cls_register_cxx_method(h_class, "create",
5217 CLS_METHOD_RD | CLS_METHOD_WR,
5219 cls_register_cxx_method(h_class, "get_features",
5221 get_features, &h_get_features);
5222 cls_register_cxx_method(h_class, "set_features",
5223 CLS_METHOD_RD | CLS_METHOD_WR,
5224 set_features, &h_set_features);
5225 cls_register_cxx_method(h_class, "get_size",
5227 get_size, &h_get_size);
5228 cls_register_cxx_method(h_class, "set_size",
5229 CLS_METHOD_RD | CLS_METHOD_WR,
5230 set_size, &h_set_size);
5231 cls_register_cxx_method(h_class, "get_snapcontext",
5233 get_snapcontext, &h_get_snapcontext);
5234 cls_register_cxx_method(h_class, "get_object_prefix",
5236 get_object_prefix, &h_get_object_prefix);
5237 cls_register_cxx_method(h_class, "get_data_pool", CLS_METHOD_RD,
5238 get_data_pool, &h_get_data_pool);
5239 cls_register_cxx_method(h_class, "get_snapshot_name",
5241 get_snapshot_name, &h_get_snapshot_name);
5242 cls_register_cxx_method(h_class, "get_snapshot_namespace",
5244 get_snapshot_namespace, &h_get_snapshot_namespace);
5245 cls_register_cxx_method(h_class, "get_snapshot_timestamp",
5247 get_snapshot_timestamp, &h_get_snapshot_timestamp);
5248 cls_register_cxx_method(h_class, "snapshot_add",
5249 CLS_METHOD_RD | CLS_METHOD_WR,
5250 snapshot_add, &h_snapshot_add);
5251 cls_register_cxx_method(h_class, "snapshot_remove",
5252 CLS_METHOD_RD | CLS_METHOD_WR,
5253 snapshot_remove, &h_snapshot_remove);
5254 cls_register_cxx_method(h_class, "snapshot_rename",
5255 CLS_METHOD_RD | CLS_METHOD_WR,
5256 snapshot_rename, &h_snapshot_rename);
5257 cls_register_cxx_method(h_class, "get_all_features",
5259 get_all_features, &h_get_all_features);
5260 cls_register_cxx_method(h_class, "copyup",
5261 CLS_METHOD_RD | CLS_METHOD_WR,
5263 cls_register_cxx_method(h_class, "get_parent",
5265 get_parent, &h_get_parent);
5266 cls_register_cxx_method(h_class, "set_parent",
5267 CLS_METHOD_RD | CLS_METHOD_WR,
5268 set_parent, &h_set_parent);
5269 cls_register_cxx_method(h_class, "remove_parent",
5270 CLS_METHOD_RD | CLS_METHOD_WR,
5271 remove_parent, &h_remove_parent);
5272 cls_register_cxx_method(h_class, "set_protection_status",
5273 CLS_METHOD_RD | CLS_METHOD_WR,
5274 set_protection_status, &h_set_protection_status);
5275 cls_register_cxx_method(h_class, "get_protection_status",
5277 get_protection_status, &h_get_protection_status);
5278 cls_register_cxx_method(h_class, "get_stripe_unit_count",
5280 get_stripe_unit_count, &h_get_stripe_unit_count);
5281 cls_register_cxx_method(h_class, "set_stripe_unit_count",
5282 CLS_METHOD_RD | CLS_METHOD_WR,
5283 set_stripe_unit_count, &h_set_stripe_unit_count);
5284 cls_register_cxx_method(h_class, "get_create_timestamp",
5286 get_create_timestamp, &h_get_create_timestamp);
5287 cls_register_cxx_method(h_class, "get_flags",
5289 get_flags, &h_get_flags);
5290 cls_register_cxx_method(h_class, "set_flags",
5291 CLS_METHOD_RD | CLS_METHOD_WR,
5292 set_flags, &h_set_flags);
5293 cls_register_cxx_method(h_class, "metadata_list",
5295 metadata_list, &h_metadata_list);
5296 cls_register_cxx_method(h_class, "metadata_set",
5297 CLS_METHOD_RD | CLS_METHOD_WR,
5298 metadata_set, &h_metadata_set);
5299 cls_register_cxx_method(h_class, "metadata_remove",
5300 CLS_METHOD_RD | CLS_METHOD_WR,
5301 metadata_remove, &h_metadata_remove);
5302 cls_register_cxx_method(h_class, "metadata_get",
5304 metadata_get, &h_metadata_get);
5305 cls_register_cxx_method(h_class, "snapshot_get_limit",
5307 snapshot_get_limit, &h_snapshot_get_limit);
5308 cls_register_cxx_method(h_class, "snapshot_set_limit",
5310 snapshot_set_limit, &h_snapshot_set_limit);
5312 /* methods for the rbd_children object */
5313 cls_register_cxx_method(h_class, "add_child",
5314 CLS_METHOD_RD | CLS_METHOD_WR,
5315 add_child, &h_add_child);
5316 cls_register_cxx_method(h_class, "remove_child",
5317 CLS_METHOD_RD | CLS_METHOD_WR,
5318 remove_child, &h_remove_child);
5319 cls_register_cxx_method(h_class, "get_children",
5321 get_children, &h_get_children);
5323 /* methods for the rbd_id.$image_name objects */
5324 cls_register_cxx_method(h_class, "get_id",
5327 cls_register_cxx_method(h_class, "set_id",
5328 CLS_METHOD_RD | CLS_METHOD_WR,
5331 /* methods for the rbd_directory object */
5332 cls_register_cxx_method(h_class, "dir_get_id",
5334 dir_get_id, &h_dir_get_id);
5335 cls_register_cxx_method(h_class, "dir_get_name",
5337 dir_get_name, &h_dir_get_name);
5338 cls_register_cxx_method(h_class, "dir_list",
5340 dir_list, &h_dir_list);
5341 cls_register_cxx_method(h_class, "dir_add_image",
5342 CLS_METHOD_RD | CLS_METHOD_WR,
5343 dir_add_image, &h_dir_add_image);
5344 cls_register_cxx_method(h_class, "dir_remove_image",
5345 CLS_METHOD_RD | CLS_METHOD_WR,
5346 dir_remove_image, &h_dir_remove_image);
5347 cls_register_cxx_method(h_class, "dir_rename_image",
5348 CLS_METHOD_RD | CLS_METHOD_WR,
5349 dir_rename_image, &h_dir_rename_image);
5351 /* methods for the rbd_object_map.$image_id object */
5352 cls_register_cxx_method(h_class, "object_map_load",
5354 object_map_load, &h_object_map_load);
5355 cls_register_cxx_method(h_class, "object_map_save",
5356 CLS_METHOD_RD | CLS_METHOD_WR,
5357 object_map_save, &h_object_map_save);
5358 cls_register_cxx_method(h_class, "object_map_resize",
5359 CLS_METHOD_RD | CLS_METHOD_WR,
5360 object_map_resize, &h_object_map_resize);
5361 cls_register_cxx_method(h_class, "object_map_update",
5362 CLS_METHOD_RD | CLS_METHOD_WR,
5363 object_map_update, &h_object_map_update);
5364 cls_register_cxx_method(h_class, "object_map_snap_add",
5365 CLS_METHOD_RD | CLS_METHOD_WR,
5366 object_map_snap_add, &h_object_map_snap_add);
5367 cls_register_cxx_method(h_class, "object_map_snap_remove",
5368 CLS_METHOD_RD | CLS_METHOD_WR,
5369 object_map_snap_remove, &h_object_map_snap_remove);
5371 /* methods for the old format */
5372 cls_register_cxx_method(h_class, "snap_list",
5374 old_snapshots_list, &h_old_snapshots_list);
5375 cls_register_cxx_method(h_class, "snap_add",
5376 CLS_METHOD_RD | CLS_METHOD_WR,
5377 old_snapshot_add, &h_old_snapshot_add);
5378 cls_register_cxx_method(h_class, "snap_remove",
5379 CLS_METHOD_RD | CLS_METHOD_WR,
5380 old_snapshot_remove, &h_old_snapshot_remove);
5381 cls_register_cxx_method(h_class, "snap_rename",
5382 CLS_METHOD_RD | CLS_METHOD_WR,
5383 old_snapshot_rename, &h_old_snapshot_rename);
5385 /* methods for the rbd_mirroring object */
5386 cls_register_cxx_method(h_class, "mirror_uuid_get", CLS_METHOD_RD,
5387 mirror_uuid_get, &h_mirror_uuid_get);
5388 cls_register_cxx_method(h_class, "mirror_uuid_set",
5389 CLS_METHOD_RD | CLS_METHOD_WR,
5390 mirror_uuid_set, &h_mirror_uuid_set);
5391 cls_register_cxx_method(h_class, "mirror_mode_get", CLS_METHOD_RD,
5392 mirror_mode_get, &h_mirror_mode_get);
5393 cls_register_cxx_method(h_class, "mirror_mode_set",
5394 CLS_METHOD_RD | CLS_METHOD_WR,
5395 mirror_mode_set, &h_mirror_mode_set);
5396 cls_register_cxx_method(h_class, "mirror_peer_list", CLS_METHOD_RD,
5397 mirror_peer_list, &h_mirror_peer_list);
5398 cls_register_cxx_method(h_class, "mirror_peer_add",
5399 CLS_METHOD_RD | CLS_METHOD_WR,
5400 mirror_peer_add, &h_mirror_peer_add);
5401 cls_register_cxx_method(h_class, "mirror_peer_remove",
5402 CLS_METHOD_RD | CLS_METHOD_WR,
5403 mirror_peer_remove, &h_mirror_peer_remove);
5404 cls_register_cxx_method(h_class, "mirror_peer_set_client",
5405 CLS_METHOD_RD | CLS_METHOD_WR,
5406 mirror_peer_set_client, &h_mirror_peer_set_client);
5407 cls_register_cxx_method(h_class, "mirror_peer_set_cluster",
5408 CLS_METHOD_RD | CLS_METHOD_WR,
5409 mirror_peer_set_cluster, &h_mirror_peer_set_cluster);
5410 cls_register_cxx_method(h_class, "mirror_image_list", CLS_METHOD_RD,
5411 mirror_image_list, &h_mirror_image_list);
5412 cls_register_cxx_method(h_class, "mirror_image_get_image_id", CLS_METHOD_RD,
5413 mirror_image_get_image_id,
5414 &h_mirror_image_get_image_id);
5415 cls_register_cxx_method(h_class, "mirror_image_get", CLS_METHOD_RD,
5416 mirror_image_get, &h_mirror_image_get);
5417 cls_register_cxx_method(h_class, "mirror_image_set",
5418 CLS_METHOD_RD | CLS_METHOD_WR,
5419 mirror_image_set, &h_mirror_image_set);
5420 cls_register_cxx_method(h_class, "mirror_image_remove",
5421 CLS_METHOD_RD | CLS_METHOD_WR,
5422 mirror_image_remove, &h_mirror_image_remove);
5423 cls_register_cxx_method(h_class, "mirror_image_status_set",
5424 CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PROMOTE,
5425 mirror_image_status_set, &h_mirror_image_status_set);
5426 cls_register_cxx_method(h_class, "mirror_image_status_remove",
5427 CLS_METHOD_RD | CLS_METHOD_WR,
5428 mirror_image_status_remove,
5429 &h_mirror_image_status_remove);
5430 cls_register_cxx_method(h_class, "mirror_image_status_get", CLS_METHOD_RD,
5431 mirror_image_status_get, &h_mirror_image_status_get);
5432 cls_register_cxx_method(h_class, "mirror_image_status_list", CLS_METHOD_RD,
5433 mirror_image_status_list,
5434 &h_mirror_image_status_list);
5435 cls_register_cxx_method(h_class, "mirror_image_status_get_summary",
5436 CLS_METHOD_RD, mirror_image_status_get_summary,
5437 &h_mirror_image_status_get_summary);
5438 cls_register_cxx_method(h_class, "mirror_image_status_remove_down",
5439 CLS_METHOD_RD | CLS_METHOD_WR,
5440 mirror_image_status_remove_down,
5441 &h_mirror_image_status_remove_down);
5442 cls_register_cxx_method(h_class, "mirror_instances_list", CLS_METHOD_RD,
5443 mirror_instances_list, &h_mirror_instances_list);
5444 cls_register_cxx_method(h_class, "mirror_instances_add",
5445 CLS_METHOD_RD | CLS_METHOD_WR | CLS_METHOD_PROMOTE,
5446 mirror_instances_add, &h_mirror_instances_add);
5447 cls_register_cxx_method(h_class, "mirror_instances_remove",
5448 CLS_METHOD_RD | CLS_METHOD_WR,
5449 mirror_instances_remove,
5450 &h_mirror_instances_remove);
5451 /* methods for the consistency groups feature */
5452 cls_register_cxx_method(h_class, "group_create",
5453 CLS_METHOD_RD | CLS_METHOD_WR,
5454 group_create, &h_group_create);
5455 cls_register_cxx_method(h_class, "group_dir_list",
5457 group_dir_list, &h_group_dir_list);
5458 cls_register_cxx_method(h_class, "group_dir_add",
5459 CLS_METHOD_RD | CLS_METHOD_WR,
5460 group_dir_add, &h_group_dir_add);
5461 cls_register_cxx_method(h_class, "group_dir_remove",
5462 CLS_METHOD_RD | CLS_METHOD_WR,
5463 group_dir_remove, &h_group_dir_remove);
5464 cls_register_cxx_method(h_class, "group_image_remove",
5465 CLS_METHOD_RD | CLS_METHOD_WR,
5466 group_image_remove, &h_group_image_remove);
5467 cls_register_cxx_method(h_class, "group_image_list",
5468 CLS_METHOD_RD | CLS_METHOD_WR,
5469 group_image_list, &h_group_image_list);
5470 cls_register_cxx_method(h_class, "group_image_set",
5471 CLS_METHOD_RD | CLS_METHOD_WR,
5472 group_image_set, &h_group_image_set);
5473 cls_register_cxx_method(h_class, "image_add_group",
5474 CLS_METHOD_RD | CLS_METHOD_WR,
5475 image_add_group, &h_image_add_group);
5476 cls_register_cxx_method(h_class, "image_remove_group",
5477 CLS_METHOD_RD | CLS_METHOD_WR,
5478 image_remove_group, &h_image_remove_group);
5479 cls_register_cxx_method(h_class, "image_get_group",
5481 image_get_group, &h_image_get_group);
5483 /* rbd_trash object methods */
5484 cls_register_cxx_method(h_class, "trash_add",
5485 CLS_METHOD_RD | CLS_METHOD_WR,
5486 trash_add, &h_trash_add);
5487 cls_register_cxx_method(h_class, "trash_remove",
5488 CLS_METHOD_RD | CLS_METHOD_WR,
5489 trash_remove, &h_trash_remove);
5490 cls_register_cxx_method(h_class, "trash_list",
5492 trash_list, &h_trash_list);
5493 cls_register_cxx_method(h_class, "trash_get",
5495 trash_get, &h_trash_get);