Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd / action / MirrorPool.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "tools/rbd/ArgumentTypes.h"
5 #include "tools/rbd/Shell.h"
6 #include "tools/rbd/Utils.h"
7 #include "include/Context.h"
8 #include "include/stringify.h"
9 #include "include/rbd/librbd.hpp"
10 #include "common/config.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "common/Formatter.h"
14 #include "common/TextTable.h"
15 #include "common/Throttle.h"
16 #include "global/global_context.h"
17 #include <functional>
18 #include <iostream>
19 #include <boost/program_options.hpp>
20 #include <boost/regex.hpp>
21 #include "include/assert.h"
22
23 #include <atomic>
24
25 #define dout_context g_ceph_context
26 #define dout_subsys ceph_subsys_rbd
27 #undef dout_prefix
28 #define dout_prefix *_dout << "rbd::action::MirrorPool: "
29
30 namespace rbd {
31 namespace action {
32 namespace mirror_pool {
33
34 namespace at = argument_types;
35 namespace po = boost::program_options;
36
37 namespace {
38
39 int validate_mirroring_enabled(librados::IoCtx& io_ctx) {
40   librbd::RBD rbd;
41   rbd_mirror_mode_t mirror_mode;
42   int r = rbd.mirror_mode_get(io_ctx, &mirror_mode);
43   if (r < 0) {
44     std::cerr << "rbd: failed to retrieve mirror mode: "
45               << cpp_strerror(r) << std::endl;
46     return r;
47   }
48
49   if (mirror_mode == RBD_MIRROR_MODE_DISABLED) {
50     std::cerr << "rbd: mirroring not enabled on the pool" << std::endl;
51     return -EINVAL;
52   }
53   return 0;
54 }
55
56 int validate_uuid(const std::string &uuid) {
57   boost::regex pattern("^[A-F0-9]{8}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{4}-[A-F0-9]{12}$",
58                        boost::regex::icase);
59   boost::smatch match;
60   if (!boost::regex_match(uuid, match, pattern)) {
61     std::cerr << "rbd: invalid uuid '" << uuid << "'" << std::endl;
62     return -EINVAL;
63   }
64   return 0;
65 }
66
67 void add_uuid_option(po::options_description *positional) {
68   positional->add_options()
69     ("uuid", po::value<std::string>(), "peer uuid");
70 }
71
72 int get_uuid(const po::variables_map &vm, size_t arg_index,
73              std::string *uuid) {
74   *uuid = utils::get_positional_argument(vm, arg_index);
75   if (uuid->empty()) {
76     std::cerr << "rbd: must specify peer uuid" << std::endl;
77     return -EINVAL;
78   }
79   return validate_uuid(*uuid);
80 }
81
82 int get_remote_cluster_spec(const po::variables_map &vm,
83                             const std::string &spec,
84                             std::string *remote_client_name,
85                             std::string *remote_cluster) {
86   if (vm.count("remote-client-name")) {
87     *remote_client_name = vm["remote-client-name"].as<std::string>();
88   }
89   if (vm.count("remote-cluster")) {
90     *remote_cluster = vm["remote-cluster"].as<std::string>();
91   }
92
93   if (!spec.empty()) {
94     boost::regex pattern("^(?:(client\\.[^@]+)@)?([^/@]+)$");
95     boost::smatch match;
96     if (!boost::regex_match(spec, match, pattern)) {
97       std::cerr << "rbd: invalid spec '" << spec << "'" << std::endl;
98       return -EINVAL;
99     }
100     if (match[1].matched) {
101       *remote_client_name = match[1];
102     }
103     *remote_cluster = match[2];
104   }
105
106   if (remote_cluster->empty()) {
107     std::cerr << "rbd: remote cluster was not specified" << std::endl;
108     return -EINVAL;
109   }
110   return 0;
111 }
112
113 void format_mirror_peers(const std::string &config_path,
114                          at::Format::Formatter formatter,
115                          const std::vector<librbd::mirror_peer_t> &peers) {
116   if (formatter != nullptr) {
117     formatter->open_array_section("peers");
118     for (auto &peer : peers) {
119       formatter->open_object_section("peer");
120       formatter->dump_string("uuid", peer.uuid);
121       formatter->dump_string("cluster_name", peer.cluster_name);
122       formatter->dump_string("client_name", peer.client_name);
123       formatter->close_section();
124     }
125     formatter->close_section();
126   } else {
127     std::cout << "Peers: ";
128     if (peers.empty()) {
129       std::cout << "none" << std::endl;
130     } else {
131       TextTable tbl;
132       tbl.define_column("", TextTable::LEFT, TextTable::LEFT);
133       tbl.define_column("UUID", TextTable::LEFT, TextTable::LEFT);
134       tbl.define_column("NAME", TextTable::LEFT, TextTable::LEFT);
135       tbl.define_column("CLIENT", TextTable::LEFT, TextTable::LEFT);
136       for (auto &peer : peers) {
137         tbl << " "
138             << peer.uuid
139             << peer.cluster_name
140             << peer.client_name
141             << TextTable::endrow;
142       }
143       std::cout << std::endl << tbl;
144     }
145   }
146 }
147
148 class ImageRequestBase {
149 public:
150   void send() {
151     dout(20) << this << " " << __func__ << ": image_name=" << m_image_name
152              << dendl;
153
154     auto ctx = new FunctionContext([this](int r) {
155         handle_finalize(r);
156       });
157
158     // will pause here until slots are available
159     m_finalize_ctx = m_throttle.start_op(ctx);
160
161     open_image();
162   }
163
164 protected:
165   ImageRequestBase(librados::IoCtx &io_ctx, OrderedThrottle &throttle,
166                    const std::string &image_name)
167     : m_io_ctx(io_ctx), m_throttle(throttle), m_image_name(image_name) {
168   }
169   virtual ~ImageRequestBase() {
170   }
171
172   virtual bool skip_get_info() const {
173     return false;
174   }
175   virtual void get_info(librbd::Image &image, librbd::mirror_image_info_t *info,
176                         librbd::RBD::AioCompletion *aio_comp) {
177     image.aio_mirror_image_get_info(info, sizeof(librbd::mirror_image_info_t),
178                                     aio_comp);
179   }
180
181   virtual bool skip_action(const librbd::mirror_image_info_t &info) const {
182     return false;
183   }
184   virtual void execute_action(librbd::Image &image,
185                               librbd::RBD::AioCompletion *aio_comp) = 0;
186   virtual void handle_execute_action(int r) {
187     dout(20) << this << " " << __func__ << ": r=" << r << dendl;
188
189     if (r < 0 && r != -ENOENT) {
190       std::cerr << "rbd: failed to " << get_action_type() << " image "
191                 << m_image_name << ": " << cpp_strerror(r) << std::endl;
192       m_ret_val = r;
193     }
194
195     close_image();
196   }
197
198   virtual void finalize_action() {
199   }
200   virtual std::string get_action_type() const = 0;
201
202 private:
203   /**
204    * @verbatim
205    *
206    * <start>
207    *    |
208    *    v
209    * OPEN_IMAGE
210    *    |
211    *    v
212    * GET_INFO
213    *    |
214    *    v
215    * EXECUTE_ACTION
216    *    |
217    *    v
218    * CLOSE_IMAGE
219    *    |
220    *    v
221    * FINALIZE_ACTION
222    *    |
223    *    v
224    * <finish>
225    *
226    * @endverbatim
227    */
228
229   librados::IoCtx &m_io_ctx;
230   OrderedThrottle &m_throttle;
231   const std::string m_image_name;
232
233   librbd::Image m_image;
234   Context *m_finalize_ctx;
235
236   librbd::mirror_image_info_t m_mirror_image_info;
237
238   int m_ret_val = 0;
239
240   void open_image() {
241     dout(20) << this << " " << __func__ << dendl;
242
243     librbd::RBD rbd;
244     auto aio_completion = utils::create_aio_completion<
245       ImageRequestBase, &ImageRequestBase::handle_open_image>(this);
246     rbd.aio_open(m_io_ctx, m_image, m_image_name.c_str(), nullptr,
247                  aio_completion);
248   }
249
250   void handle_open_image(int r) {
251     dout(20) << this << " " << __func__ << ": r=" << r << dendl;
252
253     if (r < 0) {
254       std::cerr << "rbd: failed to open image "
255                 << m_image_name << ": " << cpp_strerror(r) << std::endl;
256       m_finalize_ctx->complete(r);
257       return;
258     }
259
260     get_info();
261   }
262
263   void get_info() {
264     if (skip_get_info()) {
265       execute_action();
266       return;
267     }
268     dout(20) << this << " " << __func__ << dendl;
269
270     auto aio_completion = utils::create_aio_completion<
271       ImageRequestBase, &ImageRequestBase::handle_get_info>(this);
272     get_info(m_image, &m_mirror_image_info, aio_completion);
273   }
274
275   void handle_get_info(int r) {
276     dout(20) << this << " " << __func__ << ": r=" << r << dendl;
277
278     if (r == -ENOENT) {
279       close_image();
280       return;
281     } else if (r < 0) {
282       std::cerr << "rbd: failed to retrieve mirror image info for "
283                 << m_image_name << ": " << cpp_strerror(r) << std::endl;
284       m_ret_val = r;
285       close_image();
286       return;
287     }
288
289     execute_action();
290   }
291
292   void execute_action() {
293     if (skip_action(m_mirror_image_info)) {
294       close_image();
295       return;
296     }
297     dout(20) << this << " " << __func__ << dendl;
298
299     auto aio_completion = utils::create_aio_completion<
300       ImageRequestBase, &ImageRequestBase::handle_execute_action>(this);
301     execute_action(m_image, aio_completion);
302   }
303
304   void close_image() {
305     dout(20) << this << " " << __func__ << dendl;
306
307     auto aio_completion = utils::create_aio_completion<
308       ImageRequestBase, &ImageRequestBase::handle_close_image>(this);
309     m_image.aio_close(aio_completion);
310   }
311
312   void handle_close_image(int r) {
313     dout(20) << this << " " << __func__ << ": r=" << r << dendl;
314
315     if (r < 0) {
316       std::cerr << "rbd: failed to close image "
317                 << m_image_name << ": " << cpp_strerror(r) << std::endl;
318     }
319
320     m_finalize_ctx->complete(r);
321   }
322
323   void handle_finalize(int r) {
324     dout(20) << this << " " << __func__ << ": r=" << r << dendl;
325
326     if (r == 0 && m_ret_val < 0) {
327       r = m_ret_val;
328     }
329     if (r >= 0) {
330       finalize_action();
331     }
332     m_throttle.end_op(r);
333   }
334
335 };
336
337 class PromoteImageRequest : public ImageRequestBase {
338 public:
339   PromoteImageRequest(librados::IoCtx &io_ctx, OrderedThrottle &throttle,
340                       const std::string &image_name, std::atomic<unsigned> *counter,
341                       bool force)
342     : ImageRequestBase(io_ctx, throttle, image_name), m_counter(counter),
343       m_force(force) {
344   }
345
346 protected:
347   bool skip_action(const librbd::mirror_image_info_t &info) const override {
348     return (info.state != RBD_MIRROR_IMAGE_ENABLED || info.primary);
349   }
350
351   void execute_action(librbd::Image &image,
352                       librbd::RBD::AioCompletion *aio_comp) override {
353     image.aio_mirror_image_promote(m_force, aio_comp);
354   }
355
356   void handle_execute_action(int r) override {
357     if (r >= 0) {
358       (*m_counter)++;
359     }
360     ImageRequestBase::handle_execute_action(r);
361   }
362
363   std::string get_action_type() const override {
364     return "promote";
365   }
366
367 private:
368   std::atomic<unsigned> *m_counter = nullptr;
369   bool m_force;
370 };
371
372 class DemoteImageRequest : public ImageRequestBase {
373 public:
374   DemoteImageRequest(librados::IoCtx &io_ctx, OrderedThrottle &throttle,
375                      const std::string &image_name, std::atomic<unsigned> *counter)
376     : ImageRequestBase(io_ctx, throttle, image_name), m_counter(counter) {
377   }
378
379 protected:
380   bool skip_action(const librbd::mirror_image_info_t &info) const override {
381     return (info.state != RBD_MIRROR_IMAGE_ENABLED || !info.primary);
382   }
383
384   void execute_action(librbd::Image &image,
385                       librbd::RBD::AioCompletion *aio_comp) override {
386     image.aio_mirror_image_demote(aio_comp);
387   }
388   void handle_execute_action(int r) override {
389     if (r >= 0) {
390       (*m_counter)++;
391     }
392     ImageRequestBase::handle_execute_action(r);
393   }
394
395   std::string get_action_type() const override {
396     return "demote";
397   }
398
399 private:
400   std::atomic<unsigned> *m_counter = nullptr;
401 };
402
403 class StatusImageRequest : public ImageRequestBase {
404 public:
405   StatusImageRequest(librados::IoCtx &io_ctx, OrderedThrottle &throttle,
406                      const std::string &image_name,
407                      at::Format::Formatter formatter)
408     : ImageRequestBase(io_ctx, throttle, image_name),
409       m_formatter(formatter) {
410   }
411
412 protected:
413   bool skip_get_info() const override {
414     return true;
415   }
416
417   void execute_action(librbd::Image &image,
418                       librbd::RBD::AioCompletion *aio_comp) override {
419     image.aio_mirror_image_get_status(&m_mirror_image_status,
420                                       sizeof(m_mirror_image_status), aio_comp);
421   }
422
423   void finalize_action() override {
424     if (m_mirror_image_status.info.global_id.empty()) {
425       return;
426     }
427
428     std::string state = utils::mirror_image_status_state(m_mirror_image_status);
429     std::string last_update = (
430       m_mirror_image_status.last_update == 0 ?
431         "" : utils::timestr(m_mirror_image_status.last_update));
432
433     if (m_formatter != nullptr) {
434       m_formatter->open_object_section("image");
435       m_formatter->dump_string("name", m_mirror_image_status.name);
436       m_formatter->dump_string("global_id",
437                                m_mirror_image_status.info.global_id);
438       m_formatter->dump_string("state", state);
439       m_formatter->dump_string("description",
440                                m_mirror_image_status.description);
441       m_formatter->dump_string("last_update", last_update);
442       m_formatter->close_section(); // image
443     } else {
444       std::cout << "\n" << m_mirror_image_status.name << ":\n"
445                 << "  global_id:   "
446                 << m_mirror_image_status.info.global_id << "\n"
447                 << "  state:       " << state << "\n"
448                 << "  description: "
449                 << m_mirror_image_status.description << "\n"
450                 << "  last_update: " << last_update << std::endl;
451     }
452   }
453
454   std::string get_action_type() const override {
455     return "status";
456   }
457
458 private:
459   at::Format::Formatter m_formatter;
460   librbd::mirror_image_status_t m_mirror_image_status;
461
462 };
463
464 template <typename RequestT>
465 class ImageRequestAllocator {
466 public:
467   template <class... Args>
468   RequestT *operator()(librados::IoCtx &io_ctx, OrderedThrottle &throttle,
469                        const std::string &image_name, Args&&... args) {
470     return new RequestT(io_ctx, throttle, image_name,
471                         std::forward<Args>(args)...);
472   }
473 };
474
475 template <typename RequestT>
476 class ImageRequestGenerator {
477 public:
478   template <class... Args>
479   ImageRequestGenerator(librados::IoCtx &io_ctx, Args&&... args)
480     : m_io_ctx(io_ctx),
481       m_factory(std::bind(ImageRequestAllocator<RequestT>(),
482                           std::ref(m_io_ctx), std::ref(m_throttle),
483                           std::placeholders::_1, std::forward<Args>(args)...)),
484       m_throttle(g_conf->get_val<int64_t>("rbd_concurrent_management_ops"),
485                  true) {
486   }
487
488   int execute() {
489     // use the alphabetical list of image names for pool-level
490     // mirror image operations
491     librbd::RBD rbd;
492     int r = rbd.list(m_io_ctx, m_image_names);
493     if (r < 0 && r != -ENOENT) {
494       std::cerr << "rbd: failed to list images within pool" << std::endl;
495       return r;
496     }
497
498     for (auto &image_name : m_image_names) {
499       auto request = m_factory(image_name);
500       request->send();
501     }
502
503     return m_throttle.wait_for_ret();
504   }
505 private:
506   typedef std::function<RequestT*(const std::string&)>  Factory;
507
508   librados::IoCtx &m_io_ctx;
509   Factory m_factory;
510
511   OrderedThrottle m_throttle;
512
513   std::vector<std::string> m_image_names;
514
515 };
516
517 } // anonymous namespace
518
519 void get_peer_add_arguments(po::options_description *positional,
520                             po::options_description *options) {
521   at::add_pool_options(positional, options);
522   positional->add_options()
523     ("remote-cluster-spec", "remote cluster spec\n"
524      "(example: [<client name>@]<cluster name>");
525   options->add_options()
526     ("remote-client-name", po::value<std::string>(), "remote client name")
527     ("remote-cluster", po::value<std::string>(), "remote cluster name");
528 }
529
530 int execute_peer_add(const po::variables_map &vm) {
531   size_t arg_index = 0;
532   std::string pool_name = utils::get_pool_name(vm, &arg_index);
533
534   std::string remote_client_name = g_ceph_context->_conf->name.to_str();
535   std::string remote_cluster;
536   int r = get_remote_cluster_spec(
537     vm, utils::get_positional_argument(vm, arg_index),
538     &remote_client_name, &remote_cluster);
539   if (r < 0) {
540     return r;
541   }
542
543   std::string config_path;
544   if (vm.count(at::CONFIG_PATH)) {
545     config_path = vm[at::CONFIG_PATH].as<std::string>();
546   }
547
548   librados::Rados rados;
549   librados::IoCtx io_ctx;
550   r = utils::init(pool_name, &rados, &io_ctx);
551   if (r < 0) {
552     return r;
553   }
554
555   r = validate_mirroring_enabled(io_ctx);
556   if (r < 0) {
557     return r;
558   }
559
560   // TODO: temporary restriction to prevent adding multiple peers
561   // until rbd-mirror daemon can properly handle the scenario
562   librbd::RBD rbd;
563   std::vector<librbd::mirror_peer_t> mirror_peers;
564   r = rbd.mirror_peer_list(io_ctx, &mirror_peers);
565   if (r < 0) {
566     std::cerr << "rbd: failed to list mirror peers" << std::endl;
567     return r;
568   }
569   if (!mirror_peers.empty()) {
570     std::cerr << "rbd: multiple peers are not currently supported" << std::endl;
571     return -EINVAL;
572   }
573
574   std::string uuid;
575   r = rbd.mirror_peer_add(io_ctx, &uuid, remote_cluster, remote_client_name);
576   if (r < 0) {
577     std::cerr << "rbd: error adding mirror peer" << std::endl;
578     return r;
579   }
580
581   std::cout << uuid << std::endl;
582   return 0;
583 }
584
585 void get_peer_remove_arguments(po::options_description *positional,
586                                po::options_description *options) {
587   at::add_pool_options(positional, options);
588   add_uuid_option(positional);
589 }
590
591 int execute_peer_remove(const po::variables_map &vm) {
592   size_t arg_index = 0;
593   std::string pool_name = utils::get_pool_name(vm, &arg_index);
594
595   std::string uuid;
596   int r = get_uuid(vm, arg_index, &uuid);
597   if (r < 0) {
598     return r;
599   }
600
601   librados::Rados rados;
602   librados::IoCtx io_ctx;
603   r = utils::init(pool_name, &rados, &io_ctx);
604   if (r < 0) {
605     return r;
606   }
607
608   r = validate_mirroring_enabled(io_ctx);
609   if (r < 0) {
610     return r;
611   }
612
613   librbd::RBD rbd;
614   r = rbd.mirror_peer_remove(io_ctx, uuid);
615   if (r < 0) {
616     std::cerr << "rbd: error removing mirror peer" << std::endl;
617     return r;
618   }
619   return 0;
620 }
621
622 void get_peer_set_arguments(po::options_description *positional,
623                             po::options_description *options) {
624   at::add_pool_options(positional, options);
625   add_uuid_option(positional);
626   positional->add_options()
627     ("key", "peer parameter [client or cluster]")
628     ("value", "new client or cluster name");
629 }
630
631 int execute_peer_set(const po::variables_map &vm) {
632   size_t arg_index = 0;
633   std::string pool_name = utils::get_pool_name(vm, &arg_index);
634
635   std::string uuid;
636   int r = get_uuid(vm, arg_index++, &uuid);
637   if (r < 0) {
638     return r;
639   }
640
641   std::string key = utils::get_positional_argument(vm, arg_index++);
642   if (key != "client" && key != "cluster") {
643     std::cerr << "rbd: must specify 'client' or 'cluster' key." << std::endl;
644     return -EINVAL;
645   }
646
647   std::string value = utils::get_positional_argument(vm, arg_index++);
648   if (value.empty()) {
649     std::cerr << "rbd: must specify new " << key << " value." << std::endl;
650   }
651
652   librados::Rados rados;
653   librados::IoCtx io_ctx;
654   r = utils::init(pool_name, &rados, &io_ctx);
655   if (r < 0) {
656     return r;
657   }
658
659   r = validate_mirroring_enabled(io_ctx);
660   if (r < 0) {
661     return r;
662   }
663
664   librbd::RBD rbd;
665   if (key == "client") {
666     r = rbd.mirror_peer_set_client(io_ctx, uuid.c_str(), value.c_str());
667   } else {
668     r = rbd.mirror_peer_set_cluster(io_ctx, uuid.c_str(), value.c_str());
669   }
670   if (r < 0) {
671     return r;
672   }
673   return 0;
674 }
675
676 void get_disable_arguments(po::options_description *positional,
677                            po::options_description *options) {
678   at::add_pool_options(positional, options);
679 }
680
681 void get_enable_arguments(po::options_description *positional,
682                           po::options_description *options) {
683   at::add_pool_options(positional, options);
684   positional->add_options()
685     ("mode", "mirror mode [image or pool]");
686 }
687
688 int execute_enable_disable(const std::string &pool_name,
689                            rbd_mirror_mode_t next_mirror_mode,
690                            const std::string &mode) {
691   librados::Rados rados;
692   librados::IoCtx io_ctx;
693   rbd_mirror_mode_t current_mirror_mode;
694
695   int r = utils::init(pool_name, &rados, &io_ctx);
696   if (r < 0) {
697     return r;
698   }
699
700   librbd::RBD rbd;
701   r = rbd.mirror_mode_get(io_ctx, &current_mirror_mode);
702   if (r < 0) {
703     std::cerr << "rbd: failed to retrieve mirror mode: "
704               << cpp_strerror(r) << std::endl;
705     return r;
706   }
707
708   if (current_mirror_mode == next_mirror_mode) {
709     if (mode == "disabled") {
710       std::cout << "mirroring is already " << mode << std::endl;
711     } else {
712       std::cout << "mirroring is already configured for "
713                 << mode << " mode" << std::endl;
714     }
715     return 0;
716   } else if (next_mirror_mode == RBD_MIRROR_MODE_IMAGE &&
717              current_mirror_mode == RBD_MIRROR_MODE_POOL) {
718     std::cout << "note: changing mirroring mode from pool to image"
719               << std::endl;
720   } else if (next_mirror_mode == RBD_MIRROR_MODE_POOL &&
721              current_mirror_mode == RBD_MIRROR_MODE_IMAGE) {
722     std::cout << "note: changing mirroring mode from image to pool"
723               << std::endl;
724   }
725
726   r = rbd.mirror_mode_set(io_ctx, next_mirror_mode);
727   if (r < 0) {
728     return r;
729   }
730   return 0;
731 }
732
733 int execute_disable(const po::variables_map &vm) {
734   size_t arg_index = 0;
735   std::string pool_name = utils::get_pool_name(vm, &arg_index);
736
737   return execute_enable_disable(pool_name, RBD_MIRROR_MODE_DISABLED,
738                                 "disabled");
739 }
740
741 int execute_enable(const po::variables_map &vm) {
742   size_t arg_index = 0;
743   std::string pool_name = utils::get_pool_name(vm, &arg_index);
744
745   rbd_mirror_mode_t mirror_mode;
746   std::string mode = utils::get_positional_argument(vm, arg_index++);
747   if (mode == "image") {
748     mirror_mode = RBD_MIRROR_MODE_IMAGE;
749   } else if (mode == "pool") {
750     mirror_mode = RBD_MIRROR_MODE_POOL;
751   } else {
752     std::cerr << "rbd: must specify 'image' or 'pool' mode." << std::endl;
753     return -EINVAL;
754   }
755
756   return execute_enable_disable(pool_name, mirror_mode, mode);
757 }
758
759 void get_info_arguments(po::options_description *positional,
760                         po::options_description *options) {
761   at::add_pool_options(positional, options);
762   at::add_format_options(options);
763 }
764
765 int execute_info(const po::variables_map &vm) {
766   size_t arg_index = 0;
767   std::string pool_name = utils::get_pool_name(vm, &arg_index);
768
769   at::Format::Formatter formatter;
770   int r = utils::get_formatter(vm, &formatter);
771   if (r < 0) {
772     return r;
773   }
774
775   std::string config_path;
776   if (vm.count(at::CONFIG_PATH)) {
777     config_path = vm[at::CONFIG_PATH].as<std::string>();
778   }
779
780   librados::Rados rados;
781   librados::IoCtx io_ctx;
782   r = utils::init(pool_name, &rados, &io_ctx);
783   if (r < 0) {
784     return r;
785   }
786
787   librbd::RBD rbd;
788   rbd_mirror_mode_t mirror_mode;
789   r = rbd.mirror_mode_get(io_ctx, &mirror_mode);
790   if (r < 0) {
791     return r;
792   }
793
794   std::vector<librbd::mirror_peer_t> mirror_peers;
795   r = rbd.mirror_peer_list(io_ctx, &mirror_peers);
796   if (r < 0) {
797     return r;
798   }
799
800   std::string mirror_mode_desc;
801   switch (mirror_mode) {
802   case RBD_MIRROR_MODE_DISABLED:
803     mirror_mode_desc = "disabled";
804     break;
805   case RBD_MIRROR_MODE_IMAGE:
806     mirror_mode_desc = "image";
807     break;
808   case RBD_MIRROR_MODE_POOL:
809     mirror_mode_desc = "pool";
810     break;
811   default:
812     mirror_mode_desc = "unknown";
813     break;
814   }
815
816   if (formatter != nullptr) {
817     formatter->open_object_section("mirror");
818     formatter->dump_string("mode", mirror_mode_desc);
819   } else {
820     std::cout << "Mode: " << mirror_mode_desc << std::endl;
821   }
822
823   if (mirror_mode != RBD_MIRROR_MODE_DISABLED) {
824     format_mirror_peers(config_path, formatter, mirror_peers);
825   }
826   if (formatter != nullptr) {
827     formatter->close_section();
828     formatter->flush(std::cout);
829   }
830   return 0;
831 }
832
833 void get_status_arguments(po::options_description *positional,
834                           po::options_description *options) {
835   at::add_pool_options(positional, options);
836   at::add_format_options(options);
837   at::add_verbose_option(options);
838 }
839
840 int execute_status(const po::variables_map &vm) {
841   size_t arg_index = 0;
842   std::string pool_name = utils::get_pool_name(vm, &arg_index);
843
844   at::Format::Formatter formatter;
845   int r = utils::get_formatter(vm, &formatter);
846   if (r < 0) {
847     return r;
848   }
849
850   bool verbose = vm[at::VERBOSE].as<bool>();
851
852   std::string config_path;
853   if (vm.count(at::CONFIG_PATH)) {
854     config_path = vm[at::CONFIG_PATH].as<std::string>();
855   }
856
857   librados::Rados rados;
858   librados::IoCtx io_ctx;
859   r = utils::init(pool_name, &rados, &io_ctx);
860   if (r < 0) {
861     return r;
862   }
863
864   r = validate_mirroring_enabled(io_ctx);
865   if (r < 0) {
866     return r;
867   }
868
869   librbd::RBD rbd;
870
871   std::map<librbd::mirror_image_status_state_t, int> states;
872   r = rbd.mirror_image_status_summary(io_ctx, &states);
873   if (r < 0) {
874     std::cerr << "rbd: failed to get status summary for mirrored images: "
875               << cpp_strerror(r) << std::endl;
876     return r;
877   }
878
879   if (formatter != nullptr) {
880     formatter->open_object_section("status");
881   }
882
883   enum Health {Ok = 0, Warning = 1, Error = 2} health = Ok;
884   const char *names[] = {"OK", "WARNING", "ERROR"};
885   int total = 0;
886
887   for (auto &it : states) {
888     auto &state = it.first;
889     if (health < Warning &&
890         (state != MIRROR_IMAGE_STATUS_STATE_REPLAYING &&
891          state != MIRROR_IMAGE_STATUS_STATE_STOPPED)) {
892       health = Warning;
893     }
894     if (health < Error &&
895         state == MIRROR_IMAGE_STATUS_STATE_ERROR) {
896       health = Error;
897     }
898     total += it.second;
899   }
900
901   if (formatter != nullptr) {
902     formatter->open_object_section("summary");
903     formatter->dump_string("health", names[health]);
904     formatter->open_object_section("states");
905     for (auto &it : states) {
906       std::string state_name = utils::mirror_image_status_state(it.first);
907       formatter->dump_int(state_name.c_str(), it.second);
908     }
909     formatter->close_section(); // states
910     formatter->close_section(); // summary
911   } else {
912     std::cout << "health: " << names[health] << std::endl;
913     std::cout << "images: " << total << " total" << std::endl;
914     for (auto &it : states) {
915       std::cout << "    " << it.second << " "
916                 << utils::mirror_image_status_state(it.first) << std::endl;
917     }
918   }
919
920   int ret = 0;
921
922   if (verbose) {
923     if (formatter != nullptr) {
924       formatter->open_array_section("images");
925     }
926
927     ImageRequestGenerator<StatusImageRequest> generator(io_ctx, formatter);
928     ret = generator.execute();
929
930     if (formatter != nullptr) {
931       formatter->close_section(); // images
932     }
933   }
934
935   if (formatter != nullptr) {
936     formatter->close_section(); // status
937     formatter->flush(std::cout);
938   }
939
940   return ret;
941 }
942
943 void get_promote_arguments(po::options_description *positional,
944                            po::options_description *options) {
945   options->add_options()
946     ("force", po::bool_switch(),
947      "promote even if not cleanly demoted by remote cluster");
948   at::add_pool_options(positional, options);
949 }
950
951 int execute_promote(const po::variables_map &vm) {
952   size_t arg_index = 0;
953   std::string pool_name = utils::get_pool_name(vm, &arg_index);
954
955   librados::Rados rados;
956   librados::IoCtx io_ctx;
957   int r = utils::init(pool_name, &rados, &io_ctx);
958   if (r < 0) {
959     return r;
960   }
961
962   r = validate_mirroring_enabled(io_ctx);
963   if (r < 0) {
964     return r;
965   }
966
967   std::atomic<unsigned> counter = { 0 };
968   ImageRequestGenerator<PromoteImageRequest> generator(io_ctx, &counter,
969                                                        vm["force"].as<bool>());
970   r = generator.execute();
971
972   std::cout << "Promoted " << counter.load() << " mirrored images" << std::endl;
973   return r;
974 }
975
976 void get_demote_arguments(po::options_description *positional,
977                            po::options_description *options) {
978   at::add_pool_options(positional, options);
979 }
980
981 int execute_demote(const po::variables_map &vm) {
982   size_t arg_index = 0;
983   std::string pool_name = utils::get_pool_name(vm, &arg_index);
984
985   librados::Rados rados;
986   librados::IoCtx io_ctx;
987   int r = utils::init(pool_name, &rados, &io_ctx);
988   if (r < 0) {
989     return r;
990   }
991
992   r = validate_mirroring_enabled(io_ctx);
993   if (r < 0) {
994     return r;
995   }
996
997   std::atomic<unsigned> counter { 0 };
998   ImageRequestGenerator<DemoteImageRequest> generator(io_ctx, &counter);
999   r = generator.execute();
1000
1001   std::cout << "Demoted " << counter.load() << " mirrored images" << std::endl;
1002   return r;
1003 }
1004
1005 Shell::Action action_add(
1006   {"mirror", "pool", "peer", "add"}, {},
1007   "Add a mirroring peer to a pool.", "",
1008   &get_peer_add_arguments, &execute_peer_add);
1009 Shell::Action action_remove(
1010   {"mirror", "pool", "peer", "remove"}, {},
1011   "Remove a mirroring peer from a pool.", "",
1012   &get_peer_remove_arguments, &execute_peer_remove);
1013 Shell::Action action_set(
1014   {"mirror", "pool", "peer", "set"}, {},
1015   "Update mirroring peer settings.", "",
1016   &get_peer_set_arguments, &execute_peer_set);
1017
1018 Shell::Action action_disable(
1019   {"mirror", "pool", "disable"}, {},
1020   "Disable RBD mirroring by default within a pool.", "",
1021   &get_disable_arguments, &execute_disable);
1022 Shell::Action action_enable(
1023   {"mirror", "pool", "enable"}, {},
1024   "Enable RBD mirroring by default within a pool.", "",
1025   &get_enable_arguments, &execute_enable);
1026 Shell::Action action_info(
1027   {"mirror", "pool", "info"}, {},
1028   "Show information about the pool mirroring configuration.", {},
1029   &get_info_arguments, &execute_info);
1030 Shell::Action action_status(
1031   {"mirror", "pool", "status"}, {},
1032   "Show status for all mirrored images in the pool.", {},
1033   &get_status_arguments, &execute_status);
1034 Shell::Action action_promote(
1035   {"mirror", "pool", "promote"}, {},
1036   "Promote all non-primary images in the pool.", {},
1037   &get_promote_arguments, &execute_promote);
1038 Shell::Action action_demote(
1039   {"mirror", "pool", "demote"}, {},
1040   "Demote all primary images in the pool.", {},
1041   &get_demote_arguments, &execute_demote);
1042
1043 } // namespace mirror_pool
1044 } // namespace action
1045 } // namespace rbd