Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / librbd / api / Mirror.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/api/Mirror.h"
5 #include "include/rados/librados.hpp"
6 #include "common/dout.h"
7 #include "common/errno.h"
8 #include "cls/rbd/cls_rbd_client.h"
9 #include "librbd/ExclusiveLock.h"
10 #include "librbd/ImageCtx.h"
11 #include "librbd/ImageState.h"
12 #include "librbd/Journal.h"
13 #include "librbd/Utils.h"
14 #include "librbd/api/Image.h"
15 #include "librbd/mirror/DemoteRequest.h"
16 #include "librbd/mirror/DisableRequest.h"
17 #include "librbd/mirror/EnableRequest.h"
18 #include "librbd/mirror/GetInfoRequest.h"
19 #include "librbd/mirror/GetStatusRequest.h"
20 #include "librbd/mirror/PromoteRequest.h"
21 #include "librbd/mirror/Types.h"
22 #include "librbd/MirroringWatcher.h"
23 #include <boost/scope_exit.hpp>
24
25 #define dout_subsys ceph_subsys_rbd
26 #undef dout_prefix
27 #define dout_prefix *_dout << "librbd::api::Mirror: " << __func__ << ": "
28
29 namespace librbd {
30 namespace api {
31
32 namespace {
33
34 template <typename I>
35 int validate_mirroring_enabled(I *ictx) {
36   CephContext *cct = ictx->cct;
37   cls::rbd::MirrorImage mirror_image_internal;
38   int r = cls_client::mirror_image_get(&ictx->md_ctx, ictx->id,
39       &mirror_image_internal);
40   if (r < 0 && r != -ENOENT) {
41     lderr(cct) << "failed to retrieve mirroring state: " << cpp_strerror(r)
42                << dendl;
43     return r;
44   } else if (mirror_image_internal.state !=
45                cls::rbd::MIRROR_IMAGE_STATE_ENABLED) {
46     lderr(cct) << "mirroring is not currently enabled" << dendl;
47     return -EINVAL;
48   }
49   return 0;
50 }
51
52 int list_mirror_images(librados::IoCtx& io_ctx,
53                        std::set<std::string>& mirror_image_ids) {
54   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
55
56   std::string last_read = "";
57   int max_read = 1024;
58   int r;
59   do {
60     std::map<std::string, std::string> mirror_images;
61     r =  cls_client::mirror_image_list(&io_ctx, last_read, max_read,
62                                        &mirror_images);
63     if (r < 0 && r != -ENOENT) {
64       lderr(cct) << "error listing mirrored image directory: "
65                  << cpp_strerror(r) << dendl;
66       return r;
67     }
68     for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) {
69       mirror_image_ids.insert(it->first);
70     }
71     if (!mirror_images.empty()) {
72       last_read = mirror_images.rbegin()->first;
73     }
74     r = mirror_images.size();
75   } while (r == max_read);
76
77   return 0;
78 }
79
80 struct C_ImageGetInfo : public Context {
81   mirror_image_info_t *mirror_image_info;
82   Context *on_finish;
83
84   cls::rbd::MirrorImage mirror_image;
85   mirror::PromotionState promotion_state;
86
87   C_ImageGetInfo(mirror_image_info_t *mirror_image_info, Context *on_finish)
88     : mirror_image_info(mirror_image_info), on_finish(on_finish) {
89   }
90
91   void finish(int r) override {
92     if (r < 0) {
93       on_finish->complete(r);
94       return;
95     }
96
97     mirror_image_info->global_id = mirror_image.global_image_id;
98     mirror_image_info->state = static_cast<rbd_mirror_image_state_t>(
99       mirror_image.state);
100     mirror_image_info->primary = (
101       promotion_state == mirror::PROMOTION_STATE_PRIMARY);
102     on_finish->complete(0);
103   }
104 };
105
106 struct C_ImageGetStatus : public C_ImageGetInfo {
107   std::string image_name;
108   mirror_image_status_t *mirror_image_status;
109
110   cls::rbd::MirrorImageStatus mirror_image_status_internal;
111
112   C_ImageGetStatus(const std::string &image_name,
113                    mirror_image_status_t *mirror_image_status,
114                    Context *on_finish)
115     : C_ImageGetInfo(&mirror_image_status->info, on_finish),
116       image_name(image_name), mirror_image_status(mirror_image_status) {
117   }
118
119   void finish(int r) override {
120     if (r < 0) {
121       on_finish->complete(r);
122       return;
123     }
124
125     mirror_image_status->name = image_name;
126     mirror_image_status->state = static_cast<mirror_image_status_state_t>(
127       mirror_image_status_internal.state);
128     mirror_image_status->description = mirror_image_status_internal.description;
129     mirror_image_status->last_update =
130       mirror_image_status_internal.last_update.sec();
131     mirror_image_status->up = mirror_image_status_internal.up;
132     C_ImageGetInfo::finish(0);
133   }
134 };
135
136 } // anonymous namespace
137
138 template <typename I>
139 int Mirror<I>::image_enable(I *ictx, bool relax_same_pool_parent_check) {
140   CephContext *cct = ictx->cct;
141   ldout(cct, 20) << "ictx=" << ictx << dendl;
142
143   int r = ictx->state->refresh_if_required();
144   if (r < 0) {
145     return r;
146   }
147
148   cls::rbd::MirrorMode mirror_mode;
149   r = cls_client::mirror_mode_get(&ictx->md_ctx, &mirror_mode);
150   if (r < 0) {
151     lderr(cct) << "cannot enable mirroring: failed to retrieve mirror mode: "
152                << cpp_strerror(r) << dendl;
153     return r;
154   }
155
156   if (mirror_mode != cls::rbd::MIRROR_MODE_IMAGE) {
157     lderr(cct) << "cannot enable mirroring in the current pool mirroring mode"
158                << dendl;
159     return -EINVAL;
160   }
161
162   // is mirroring not enabled for the parent?
163   {
164     RWLock::RLocker l(ictx->parent_lock);
165     ImageCtx *parent = ictx->parent;
166     if (parent) {
167       if (relax_same_pool_parent_check &&
168           parent->md_ctx.get_id() == ictx->md_ctx.get_id()) {
169         if (!parent->test_features(RBD_FEATURE_JOURNALING)) {
170           lderr(cct) << "journaling is not enabled for the parent" << dendl;
171           return -EINVAL;
172         }
173       } else {
174         cls::rbd::MirrorImage mirror_image_internal;
175         r = cls_client::mirror_image_get(&(parent->md_ctx), parent->id,
176                                          &mirror_image_internal);
177         if (r == -ENOENT) {
178           lderr(cct) << "mirroring is not enabled for the parent" << dendl;
179           return -EINVAL;
180         }
181       }
182     }
183   }
184
185   if ((ictx->features & RBD_FEATURE_JOURNALING) == 0) {
186     lderr(cct) << "cannot enable mirroring: journaling is not enabled" << dendl;
187     return -EINVAL;
188   }
189
190   C_SaferCond ctx;
191   auto req = mirror::EnableRequest<ImageCtx>::create(ictx, &ctx);
192   req->send();
193
194   r = ctx.wait();
195   if (r < 0) {
196     lderr(cct) << "cannot enable mirroring: " << cpp_strerror(r) << dendl;
197     return r;
198   }
199
200   return 0;
201 }
202
203 template <typename I>
204 int Mirror<I>::image_disable(I *ictx, bool force) {
205   CephContext *cct = ictx->cct;
206   ldout(cct, 20) << "ictx=" << ictx << dendl;
207
208   int r = ictx->state->refresh_if_required();
209   if (r < 0) {
210     return r;
211   }
212
213   cls::rbd::MirrorMode mirror_mode;
214   r = cls_client::mirror_mode_get(&ictx->md_ctx, &mirror_mode);
215   if (r < 0) {
216     lderr(cct) << "cannot disable mirroring: failed to retrieve pool "
217       "mirroring mode: " << cpp_strerror(r) << dendl;
218     return r;
219   }
220
221   if (mirror_mode != cls::rbd::MIRROR_MODE_IMAGE) {
222     lderr(cct) << "cannot disable mirroring in the current pool mirroring "
223       "mode" << dendl;
224     return -EINVAL;
225   }
226
227   // is mirroring  enabled for the child?
228   cls::rbd::MirrorImage mirror_image_internal;
229   r = cls_client::mirror_image_get(&ictx->md_ctx, ictx->id,
230                                    &mirror_image_internal);
231   if (r == -ENOENT) {
232     // mirroring is not enabled for this image
233     ldout(cct, 20) << "ignoring disable command: mirroring is not enabled for "
234                    << "this image" << dendl;
235     return 0;
236   } else if (r == -EOPNOTSUPP) {
237     ldout(cct, 5) << "mirroring not supported by OSD" << dendl;
238     return r;
239   } else if (r < 0) {
240     lderr(cct) << "failed to retrieve mirror image metadata: "
241                << cpp_strerror(r) << dendl;
242     return r;
243   }
244
245   mirror_image_internal.state = cls::rbd::MIRROR_IMAGE_STATE_DISABLING;
246   r = cls_client::mirror_image_set(&ictx->md_ctx, ictx->id,
247                                    mirror_image_internal);
248   if (r < 0) {
249     lderr(cct) << "cannot disable mirroring: " << cpp_strerror(r) << dendl;
250     return r;
251   } else {
252     bool rollback = false;
253     BOOST_SCOPE_EXIT_ALL(ictx, &mirror_image_internal, &rollback) {
254       if (rollback) {
255         CephContext *cct = ictx->cct;
256         mirror_image_internal.state = cls::rbd::MIRROR_IMAGE_STATE_ENABLED;
257         int r = cls_client::mirror_image_set(&ictx->md_ctx, ictx->id,
258                                              mirror_image_internal);
259         if (r < 0) {
260           lderr(cct) << "failed to re-enable image mirroring: "
261                      << cpp_strerror(r) << dendl;
262         }
263       }
264     };
265
266     {
267       RWLock::RLocker l(ictx->snap_lock);
268       map<librados::snap_t, SnapInfo> snap_info = ictx->snap_info;
269       for (auto &info : snap_info) {
270         ParentSpec parent_spec(ictx->md_ctx.get_id(), ictx->id, info.first);
271         map< pair<int64_t, string>, set<string> > image_info;
272
273         r = Image<I>::list_children(ictx, parent_spec, &image_info);
274         if (r < 0) {
275           rollback = true;
276           return r;
277         }
278         if (image_info.empty())
279           continue;
280
281         librados::Rados rados(ictx->md_ctx);
282         for (auto &info: image_info) {
283           librados::IoCtx ioctx;
284           r = rados.ioctx_create2(info.first.first, ioctx);
285           if (r < 0) {
286             rollback = true;
287             lderr(cct) << "error accessing child image pool "
288                        << info.first.second  << dendl;
289             return r;
290           }
291           for (auto &id_it : info.second) {
292             cls::rbd::MirrorImage mirror_image_internal;
293             r = cls_client::mirror_image_get(&ioctx, id_it,
294                                              &mirror_image_internal);
295             if (r != -ENOENT) {
296               rollback = true;
297               lderr(cct) << "mirroring is enabled on one or more children "
298                          << dendl;
299               return -EBUSY;
300             }
301           }
302         }
303       }
304     }
305
306     C_SaferCond ctx;
307     auto req = mirror::DisableRequest<ImageCtx>::create(ictx, force, true,
308                                                         &ctx);
309     req->send();
310
311     r = ctx.wait();
312     if (r < 0) {
313       lderr(cct) << "cannot disable mirroring: " << cpp_strerror(r) << dendl;
314       rollback = true;
315       return r;
316     }
317   }
318
319   return 0;
320 }
321
322 template <typename I>
323 int Mirror<I>::image_promote(I *ictx, bool force) {
324   CephContext *cct = ictx->cct;
325
326   C_SaferCond ctx;
327   Mirror<I>::image_promote(ictx, force, &ctx);
328   int r = ctx.wait();
329   if (r < 0) {
330     lderr(cct) << "failed to promote image" << dendl;
331     return r;
332   }
333
334   return 0;
335 }
336
337 template <typename I>
338 void Mirror<I>::image_promote(I *ictx, bool force, Context *on_finish) {
339   CephContext *cct = ictx->cct;
340   ldout(cct, 20) << "ictx=" << ictx << ", "
341                  << "force=" << force << dendl;
342
343   auto req = mirror::PromoteRequest<>::create(*ictx, force, on_finish);
344   req->send();
345 }
346
347 template <typename I>
348 int Mirror<I>::image_demote(I *ictx) {
349   CephContext *cct = ictx->cct;
350
351   C_SaferCond ctx;
352   Mirror<I>::image_demote(ictx, &ctx);
353   int r = ctx.wait();
354   if (r < 0) {
355     lderr(cct) << "failed to demote image" << dendl;
356     return r;
357   }
358
359   return 0;
360 }
361
362 template <typename I>
363 void Mirror<I>::image_demote(I *ictx, Context *on_finish) {
364   CephContext *cct = ictx->cct;
365   ldout(cct, 20) << "ictx=" << ictx << dendl;
366
367   auto req = mirror::DemoteRequest<>::create(*ictx, on_finish);
368   req->send();
369 }
370
371 template <typename I>
372 int Mirror<I>::image_resync(I *ictx) {
373   CephContext *cct = ictx->cct;
374   ldout(cct, 20) << "ictx=" << ictx << dendl;
375
376   int r = ictx->state->refresh_if_required();
377   if (r < 0) {
378     return r;
379   }
380
381   r = validate_mirroring_enabled(ictx);
382   if (r < 0) {
383     return r;
384   }
385
386   C_SaferCond tag_owner_ctx;
387   bool is_tag_owner;
388   Journal<I>::is_tag_owner(ictx, &is_tag_owner, &tag_owner_ctx);
389   r = tag_owner_ctx.wait();
390   if (r < 0) {
391     lderr(cct) << "failed to determine tag ownership: " << cpp_strerror(r)
392                << dendl;
393     return r;
394   } else if (is_tag_owner) {
395     lderr(cct) << "image is primary, cannot resync to itself" << dendl;
396     return -EINVAL;
397   }
398
399   // flag the journal indicating that we want to rebuild the local image
400   r = Journal<I>::request_resync(ictx);
401   if (r < 0) {
402     lderr(cct) << "failed to request resync: " << cpp_strerror(r) << dendl;
403     return r;
404   }
405
406   return 0;
407 }
408
409 template <typename I>
410 void Mirror<I>::image_get_info(I *ictx, mirror_image_info_t *mirror_image_info,
411                                size_t info_size, Context *on_finish) {
412   CephContext *cct = ictx->cct;
413   ldout(cct, 20) << "ictx=" << ictx << dendl;
414   if (info_size < sizeof(mirror_image_info_t)) {
415     on_finish->complete(-ERANGE);
416     return;
417   }
418
419   auto ctx = new C_ImageGetInfo(mirror_image_info, on_finish);
420   auto req = mirror::GetInfoRequest<I>::create(*ictx, &ctx->mirror_image,
421                                                &ctx->promotion_state,
422                                                ctx);
423   req->send();
424 }
425
426 template <typename I>
427 int Mirror<I>::image_get_info(I *ictx, mirror_image_info_t *mirror_image_info,
428                               size_t info_size) {
429   C_SaferCond ctx;
430   image_get_info(ictx, mirror_image_info, info_size, &ctx);
431
432   int r = ctx.wait();
433   if (r < 0) {
434     return r;
435   }
436   return 0;
437 }
438
439 template <typename I>
440 void Mirror<I>::image_get_status(I *ictx, mirror_image_status_t *status,
441                                  size_t status_size, Context *on_finish) {
442   CephContext *cct = ictx->cct;
443   ldout(cct, 20) << "ictx=" << ictx << dendl;
444   if (status_size < sizeof(mirror_image_status_t)) {
445     on_finish->complete(-ERANGE);
446     return;
447   }
448
449   auto ctx = new C_ImageGetStatus(ictx->name, status, on_finish);
450   auto req = mirror::GetStatusRequest<I>::create(
451     *ictx, &ctx->mirror_image_status_internal, &ctx->mirror_image,
452     &ctx->promotion_state, ctx);
453   req->send();
454 }
455
456 template <typename I>
457 int Mirror<I>::image_get_status(I *ictx, mirror_image_status_t *status,
458                                 size_t status_size) {
459   C_SaferCond ctx;
460   image_get_status(ictx, status, status_size, &ctx);
461
462   int r = ctx.wait();
463   if (r < 0) {
464     return r;
465   }
466   return 0;
467 }
468
469 template <typename I>
470 int Mirror<I>::mode_get(librados::IoCtx& io_ctx,
471                         rbd_mirror_mode_t *mirror_mode) {
472   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
473   ldout(cct, 20) << dendl;
474
475   cls::rbd::MirrorMode mirror_mode_internal;
476   int r = cls_client::mirror_mode_get(&io_ctx, &mirror_mode_internal);
477   if (r < 0) {
478     lderr(cct) << "failed to retrieve mirror mode: " << cpp_strerror(r)
479                << dendl;
480     return r;
481   }
482
483   switch (mirror_mode_internal) {
484   case cls::rbd::MIRROR_MODE_DISABLED:
485   case cls::rbd::MIRROR_MODE_IMAGE:
486   case cls::rbd::MIRROR_MODE_POOL:
487     *mirror_mode = static_cast<rbd_mirror_mode_t>(mirror_mode_internal);
488     break;
489   default:
490     lderr(cct) << "unknown mirror mode ("
491                << static_cast<uint32_t>(mirror_mode_internal) << ")"
492                << dendl;
493     return -EINVAL;
494   }
495   return 0;
496 }
497
498 template <typename I>
499 int Mirror<I>::mode_set(librados::IoCtx& io_ctx,
500                         rbd_mirror_mode_t mirror_mode) {
501   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
502   ldout(cct, 20) << dendl;
503
504   cls::rbd::MirrorMode next_mirror_mode;
505   switch (mirror_mode) {
506   case RBD_MIRROR_MODE_DISABLED:
507   case RBD_MIRROR_MODE_IMAGE:
508   case RBD_MIRROR_MODE_POOL:
509     next_mirror_mode = static_cast<cls::rbd::MirrorMode>(mirror_mode);
510     break;
511   default:
512     lderr(cct) << "unknown mirror mode ("
513                << static_cast<uint32_t>(mirror_mode) << ")" << dendl;
514     return -EINVAL;
515   }
516
517   int r;
518   if (next_mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
519     // fail early if pool still has peers registered and attempting to disable
520     std::vector<cls::rbd::MirrorPeer> mirror_peers;
521     r = cls_client::mirror_peer_list(&io_ctx, &mirror_peers);
522     if (r < 0 && r != -ENOENT) {
523       lderr(cct) << "failed to list peers: " << cpp_strerror(r) << dendl;
524       return r;
525     } else if (!mirror_peers.empty()) {
526       lderr(cct) << "mirror peers still registered" << dendl;
527       return -EBUSY;
528     }
529   }
530
531   cls::rbd::MirrorMode current_mirror_mode;
532   r = cls_client::mirror_mode_get(&io_ctx, &current_mirror_mode);
533   if (r < 0) {
534     lderr(cct) << "failed to retrieve mirror mode: " << cpp_strerror(r)
535                << dendl;
536     return r;
537   }
538
539   if (current_mirror_mode == next_mirror_mode) {
540     return 0;
541   } else if (current_mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
542     uuid_d uuid_gen;
543     uuid_gen.generate_random();
544     r = cls_client::mirror_uuid_set(&io_ctx, uuid_gen.to_string());
545     if (r < 0) {
546       lderr(cct) << "failed to allocate mirroring uuid: " << cpp_strerror(r)
547                  << dendl;
548       return r;
549     }
550   }
551
552   if (current_mirror_mode != cls::rbd::MIRROR_MODE_IMAGE) {
553     r = cls_client::mirror_mode_set(&io_ctx, cls::rbd::MIRROR_MODE_IMAGE);
554     if (r < 0) {
555       lderr(cct) << "failed to set mirror mode to image: "
556                  << cpp_strerror(r) << dendl;
557       return r;
558     }
559
560     r = MirroringWatcher<>::notify_mode_updated(io_ctx,
561                                                 cls::rbd::MIRROR_MODE_IMAGE);
562     if (r < 0) {
563       lderr(cct) << "failed to send update notification: " << cpp_strerror(r)
564                  << dendl;
565     }
566   }
567
568   if (next_mirror_mode == cls::rbd::MIRROR_MODE_IMAGE) {
569     return 0;
570   }
571
572   if (next_mirror_mode == cls::rbd::MIRROR_MODE_POOL) {
573     map<string, string> images;
574     r = Image<I>::list_images(io_ctx, &images);
575     if (r < 0) {
576       lderr(cct) << "failed listing images: " << cpp_strerror(r) << dendl;
577       return r;
578     }
579
580     for (const auto& img_pair : images) {
581       uint64_t features;
582       r = cls_client::get_features(&io_ctx,
583                                    util::header_name(img_pair.second),
584                                    CEPH_NOSNAP, &features);
585       if (r < 0) {
586         lderr(cct) << "error getting features for image " << img_pair.first
587                    << ": " << cpp_strerror(r) << dendl;
588         return r;
589       }
590
591       if ((features & RBD_FEATURE_JOURNALING) != 0) {
592         I *img_ctx = I::create("", img_pair.second, nullptr, io_ctx, false);
593         r = img_ctx->state->open(false);
594         if (r < 0) {
595           lderr(cct) << "error opening image "<< img_pair.first << ": "
596                      << cpp_strerror(r) << dendl;
597           return r;
598         }
599
600         r = image_enable(img_ctx, true);
601         int close_r = img_ctx->state->close();
602         if (r < 0) {
603           lderr(cct) << "error enabling mirroring for image "
604                      << img_pair.first << ": " << cpp_strerror(r) << dendl;
605           return r;
606         } else if (close_r < 0) {
607           lderr(cct) << "failed to close image " << img_pair.first << ": "
608                      << cpp_strerror(close_r) << dendl;
609           return close_r;
610         }
611       }
612     }
613   } else if (next_mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
614     std::set<std::string> image_ids;
615     r = list_mirror_images(io_ctx, image_ids);
616     if (r < 0) {
617       lderr(cct) << "failed listing images: " << cpp_strerror(r) << dendl;
618       return r;
619     }
620
621     for (const auto& img_id : image_ids) {
622       if (current_mirror_mode == cls::rbd::MIRROR_MODE_IMAGE) {
623         cls::rbd::MirrorImage mirror_image;
624         r = cls_client::mirror_image_get(&io_ctx, img_id, &mirror_image);
625         if (r < 0 && r != -ENOENT) {
626           lderr(cct) << "failed to retrieve mirroring state for image id "
627                      << img_id << ": " << cpp_strerror(r) << dendl;
628           return r;
629         }
630         if (mirror_image.state == cls::rbd::MIRROR_IMAGE_STATE_ENABLED) {
631           lderr(cct) << "failed to disable mirror mode: there are still "
632                      << "images with mirroring enabled" << dendl;
633           return -EINVAL;
634         }
635       } else {
636         I *img_ctx = I::create("", img_id, nullptr, io_ctx, false);
637         r = img_ctx->state->open(false);
638         if (r < 0) {
639           lderr(cct) << "error opening image id "<< img_id << ": "
640                      << cpp_strerror(r) << dendl;
641           return r;
642         }
643
644         r = image_disable(img_ctx, false);
645         int close_r = img_ctx->state->close();
646         if (r < 0) {
647           lderr(cct) << "error disabling mirroring for image id " << img_id
648                      << cpp_strerror(r) << dendl;
649           return r;
650         } else if (close_r < 0) {
651           lderr(cct) << "failed to close image id " << img_id << ": "
652                      << cpp_strerror(close_r) << dendl;
653           return close_r;
654         }
655       }
656     }
657   }
658
659   r = cls_client::mirror_mode_set(&io_ctx, next_mirror_mode);
660   if (r < 0) {
661     lderr(cct) << "failed to set mirror mode: " << cpp_strerror(r) << dendl;
662     return r;
663   }
664
665   r = MirroringWatcher<>::notify_mode_updated(io_ctx, next_mirror_mode);
666   if (r < 0) {
667     lderr(cct) << "failed to send update notification: " << cpp_strerror(r)
668                << dendl;
669   }
670   return 0;
671 }
672
673 template <typename I>
674 int Mirror<I>::peer_add(librados::IoCtx& io_ctx, std::string *uuid,
675                         const std::string &cluster_name,
676                         const std::string &client_name) {
677   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
678   ldout(cct, 20) << "name=" << cluster_name << ", "
679                  << "client=" << client_name << dendl;
680
681   if (cct->_conf->cluster == cluster_name) {
682     lderr(cct) << "cannot add self as remote peer" << dendl;
683     return -EINVAL;
684   }
685
686   int r;
687   do {
688     uuid_d uuid_gen;
689     uuid_gen.generate_random();
690
691     *uuid = uuid_gen.to_string();
692     r = cls_client::mirror_peer_add(&io_ctx, *uuid, cluster_name,
693                                     client_name);
694     if (r == -ESTALE) {
695       ldout(cct, 5) << "duplicate UUID detected, retrying" << dendl;
696     } else if (r < 0) {
697       lderr(cct) << "failed to add mirror peer '" << uuid << "': "
698                  << cpp_strerror(r) << dendl;
699       return r;
700     }
701   } while (r == -ESTALE);
702   return 0;
703 }
704
705 template <typename I>
706 int Mirror<I>::peer_remove(librados::IoCtx& io_ctx, const std::string &uuid) {
707   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
708   ldout(cct, 20) << "uuid=" << uuid << dendl;
709
710   int r = cls_client::mirror_peer_remove(&io_ctx, uuid);
711   if (r < 0 && r != -ENOENT) {
712     lderr(cct) << "failed to remove peer '" << uuid << "': "
713                << cpp_strerror(r) << dendl;
714     return r;
715   }
716   return 0;
717 }
718
719 template <typename I>
720 int Mirror<I>::peer_list(librados::IoCtx& io_ctx,
721                          std::vector<mirror_peer_t> *peers) {
722   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
723   ldout(cct, 20) << dendl;
724
725   std::vector<cls::rbd::MirrorPeer> mirror_peers;
726   int r = cls_client::mirror_peer_list(&io_ctx, &mirror_peers);
727   if (r < 0 && r != -ENOENT) {
728     lderr(cct) << "failed to list peers: " << cpp_strerror(r) << dendl;
729     return r;
730   }
731
732   peers->clear();
733   peers->reserve(mirror_peers.size());
734   for (auto &mirror_peer : mirror_peers) {
735     mirror_peer_t peer;
736     peer.uuid = mirror_peer.uuid;
737     peer.cluster_name = mirror_peer.cluster_name;
738     peer.client_name = mirror_peer.client_name;
739     peers->push_back(peer);
740   }
741   return 0;
742 }
743
744 template <typename I>
745 int Mirror<I>::peer_set_client(librados::IoCtx& io_ctx, const std::string &uuid,
746                                const std::string &client_name) {
747   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
748   ldout(cct, 20) << "uuid=" << uuid << ", "
749                  << "client=" << client_name << dendl;
750
751   int r = cls_client::mirror_peer_set_client(&io_ctx, uuid, client_name);
752   if (r < 0) {
753     lderr(cct) << "failed to update client '" << uuid << "': "
754                << cpp_strerror(r) << dendl;
755     return r;
756   }
757   return 0;
758 }
759
760 template <typename I>
761 int Mirror<I>::peer_set_cluster(librados::IoCtx& io_ctx,
762                                 const std::string &uuid,
763                                 const std::string &cluster_name) {
764   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
765   ldout(cct, 20) << "uuid=" << uuid << ", "
766                  << "cluster=" << cluster_name << dendl;
767
768   int r = cls_client::mirror_peer_set_cluster(&io_ctx, uuid, cluster_name);
769   if (r < 0) {
770     lderr(cct) << "failed to update cluster '" << uuid << "': "
771                << cpp_strerror(r) << dendl;
772     return r;
773   }
774   return 0;
775 }
776
777 template <typename I>
778 int Mirror<I>::image_status_list(librados::IoCtx& io_ctx,
779                                   const std::string &start_id, size_t max,
780                                   IdToMirrorImageStatus *images) {
781   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
782   int r;
783
784   map<string, string> id_to_name;
785   {
786     map<string, string> name_to_id;
787     r = Image<I>::list_images(io_ctx, &name_to_id);
788     if (r < 0) {
789       return r;
790     }
791     for (auto it : name_to_id) {
792       id_to_name[it.second] = it.first;
793     }
794   }
795
796   map<std::string, cls::rbd::MirrorImage> images_;
797   map<std::string, cls::rbd::MirrorImageStatus> statuses_;
798
799   r = librbd::cls_client::mirror_image_status_list(&io_ctx, start_id, max,
800                                                    &images_, &statuses_);
801   if (r < 0 && r != -ENOENT) {
802     lderr(cct) << "failed to list mirror image statuses: "
803                << cpp_strerror(r) << dendl;
804     return r;
805   }
806
807   cls::rbd::MirrorImageStatus unknown_status(
808     cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN, "status not found");
809
810   for (auto it = images_.begin(); it != images_.end(); ++it) {
811     auto &image_id = it->first;
812     auto &info = it->second;
813     if (info.state == cls::rbd::MIRROR_IMAGE_STATE_DISABLED) {
814       continue;
815     }
816
817     auto &image_name = id_to_name[image_id];
818     if (image_name.empty()) {
819       lderr(cct) << "failed to find image name for image " << image_id << ", "
820                  << "using image id as name" << dendl;
821       image_name = image_id;
822     }
823     auto s_it = statuses_.find(image_id);
824     auto &s = s_it != statuses_.end() ? s_it->second : unknown_status;
825     (*images)[image_id] = mirror_image_status_t{
826       image_name,
827       mirror_image_info_t{
828         info.global_image_id,
829         static_cast<mirror_image_state_t>(info.state),
830         false}, // XXX: To set "primary" right would require an additional call.
831       static_cast<mirror_image_status_state_t>(s.state),
832       s.description,
833       s.last_update.sec(),
834       s.up};
835   }
836
837   return 0;
838 }
839
840 template <typename I>
841 int Mirror<I>::image_status_summary(librados::IoCtx& io_ctx,
842                                     MirrorImageStatusStates *states) {
843   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
844
845   std::map<cls::rbd::MirrorImageStatusState, int> states_;
846   int r = cls_client::mirror_image_status_get_summary(&io_ctx, &states_);
847   if (r < 0 && r != -ENOENT) {
848     lderr(cct) << "failed to get mirror status summary: "
849                << cpp_strerror(r) << dendl;
850     return r;
851   }
852   for (auto &s : states_) {
853     (*states)[static_cast<mirror_image_status_state_t>(s.first)] = s.second;
854   }
855   return 0;
856 }
857
858 } // namespace api
859 } // namespace librbd
860
861 template class librbd::api::Mirror<librbd::ImageCtx>;