Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd_mirror / PoolReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "PoolReplayer.h"
5 #include <boost/bind.hpp>
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/ceph_argparse.h"
9 #include "common/code_environment.h"
10 #include "common/common_init.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "include/stringify.h"
14 #include "cls/rbd/cls_rbd_client.h"
15 #include "global/global_context.h"
16 #include "librbd/internal.h"
17 #include "librbd/Utils.h"
18 #include "librbd/Watcher.h"
19 #include "librbd/api/Mirror.h"
20 #include "InstanceReplayer.h"
21 #include "InstanceWatcher.h"
22 #include "LeaderWatcher.h"
23 #include "ServiceDaemon.h"
24 #include "Threads.h"
25
26 #define dout_context g_ceph_context
27 #define dout_subsys ceph_subsys_rbd_mirror
28 #undef dout_prefix
29 #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
30                            << this << " " << __func__ << ": "
31
32 using std::chrono::seconds;
33 using std::map;
34 using std::string;
35 using std::unique_ptr;
36 using std::vector;
37
38 using librbd::cls_client::dir_get_name;
39 using librbd::util::create_async_context_callback;
40
41 namespace rbd {
42 namespace mirror {
43
44 namespace {
45
46 const std::string SERVICE_DAEMON_LEADER_KEY("leader");
47 const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
48 const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
49
50 const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS {
51   {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
52
53 class PoolReplayerAdminSocketCommand {
54 public:
55   PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
56     : pool_replayer(pool_replayer) {
57   }
58   virtual ~PoolReplayerAdminSocketCommand() {}
59   virtual bool call(Formatter *f, stringstream *ss) = 0;
60 protected:
61   PoolReplayer *pool_replayer;
62 };
63
64 class StatusCommand : public PoolReplayerAdminSocketCommand {
65 public:
66   explicit StatusCommand(PoolReplayer *pool_replayer)
67     : PoolReplayerAdminSocketCommand(pool_replayer) {
68   }
69
70   bool call(Formatter *f, stringstream *ss) override {
71     pool_replayer->print_status(f, ss);
72     return true;
73   }
74 };
75
76 class StartCommand : public PoolReplayerAdminSocketCommand {
77 public:
78   explicit StartCommand(PoolReplayer *pool_replayer)
79     : PoolReplayerAdminSocketCommand(pool_replayer) {
80   }
81
82   bool call(Formatter *f, stringstream *ss) override {
83     pool_replayer->start();
84     return true;
85   }
86 };
87
88 class StopCommand : public PoolReplayerAdminSocketCommand {
89 public:
90   explicit StopCommand(PoolReplayer *pool_replayer)
91     : PoolReplayerAdminSocketCommand(pool_replayer) {
92   }
93
94   bool call(Formatter *f, stringstream *ss) override {
95     pool_replayer->stop(true);
96     return true;
97   }
98 };
99
100 class RestartCommand : public PoolReplayerAdminSocketCommand {
101 public:
102   explicit RestartCommand(PoolReplayer *pool_replayer)
103     : PoolReplayerAdminSocketCommand(pool_replayer) {
104   }
105
106   bool call(Formatter *f, stringstream *ss) override {
107     pool_replayer->restart();
108     return true;
109   }
110 };
111
112 class FlushCommand : public PoolReplayerAdminSocketCommand {
113 public:
114   explicit FlushCommand(PoolReplayer *pool_replayer)
115     : PoolReplayerAdminSocketCommand(pool_replayer) {
116   }
117
118   bool call(Formatter *f, stringstream *ss) override {
119     pool_replayer->flush();
120     return true;
121   }
122 };
123
124 class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand {
125 public:
126   explicit LeaderReleaseCommand(PoolReplayer *pool_replayer)
127     : PoolReplayerAdminSocketCommand(pool_replayer) {
128   }
129
130   bool call(Formatter *f, stringstream *ss) override {
131     pool_replayer->release_leader();
132     return true;
133   }
134 };
135
136 class PoolReplayerAdminSocketHook : public AdminSocketHook {
137 public:
138   PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
139                                 PoolReplayer *pool_replayer)
140     : admin_socket(cct->get_admin_socket()) {
141     std::string command;
142     int r;
143
144     command = "rbd mirror status " + name;
145     r = admin_socket->register_command(command, command, this,
146                                        "get status for rbd mirror " + name);
147     if (r == 0) {
148       commands[command] = new StatusCommand(pool_replayer);
149     }
150
151     command = "rbd mirror start " + name;
152     r = admin_socket->register_command(command, command, this,
153                                        "start rbd mirror " + name);
154     if (r == 0) {
155       commands[command] = new StartCommand(pool_replayer);
156     }
157
158     command = "rbd mirror stop " + name;
159     r = admin_socket->register_command(command, command, this,
160                                        "stop rbd mirror " + name);
161     if (r == 0) {
162       commands[command] = new StopCommand(pool_replayer);
163     }
164
165     command = "rbd mirror restart " + name;
166     r = admin_socket->register_command(command, command, this,
167                                        "restart rbd mirror " + name);
168     if (r == 0) {
169       commands[command] = new RestartCommand(pool_replayer);
170     }
171
172     command = "rbd mirror flush " + name;
173     r = admin_socket->register_command(command, command, this,
174                                        "flush rbd mirror " + name);
175     if (r == 0) {
176       commands[command] = new FlushCommand(pool_replayer);
177     }
178
179     command = "rbd mirror leader release " + name;
180     r = admin_socket->register_command(command, command, this,
181                                        "release rbd mirror leader " + name);
182     if (r == 0) {
183       commands[command] = new LeaderReleaseCommand(pool_replayer);
184     }
185   }
186
187   ~PoolReplayerAdminSocketHook() override {
188     for (Commands::const_iterator i = commands.begin(); i != commands.end();
189          ++i) {
190       (void)admin_socket->unregister_command(i->first);
191       delete i->second;
192     }
193   }
194
195   bool call(std::string command, cmdmap_t& cmdmap, std::string format,
196             bufferlist& out) override {
197     Commands::const_iterator i = commands.find(command);
198     assert(i != commands.end());
199     Formatter *f = Formatter::create(format);
200     stringstream ss;
201     bool r = i->second->call(f, &ss);
202     delete f;
203     out.append(ss);
204     return r;
205   }
206
207 private:
208   typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands;
209
210   AdminSocket *admin_socket;
211   Commands commands;
212 };
213
214 } // anonymous namespace
215
216 PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
217                            ServiceDaemon<librbd::ImageCtx>* service_daemon,
218                            ImageDeleter<>* image_deleter,
219                            int64_t local_pool_id, const peer_t &peer,
220                            const std::vector<const char*> &args) :
221   m_threads(threads),
222   m_service_daemon(service_daemon),
223   m_image_deleter(image_deleter),
224   m_local_pool_id(local_pool_id),
225   m_peer(peer),
226   m_args(args),
227   m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
228   m_local_pool_watcher_listener(this, true),
229   m_remote_pool_watcher_listener(this, false),
230   m_pool_replayer_thread(this),
231   m_leader_listener(this)
232 {
233 }
234
235 PoolReplayer::~PoolReplayer()
236 {
237   delete m_asok_hook;
238   shut_down();
239 }
240
241 bool PoolReplayer::is_blacklisted() const {
242   Mutex::Locker locker(m_lock);
243   return m_blacklisted;
244 }
245
246 bool PoolReplayer::is_leader() const {
247   Mutex::Locker locker(m_lock);
248   return m_leader_watcher && m_leader_watcher->is_leader();
249 }
250
251 bool PoolReplayer::is_running() const {
252   return m_pool_replayer_thread.is_started();
253 }
254
255 void PoolReplayer::init()
256 {
257   assert(!m_pool_replayer_thread.is_started());
258
259   // reset state
260   m_stopping = false;
261   m_blacklisted = false;
262
263   dout(20) << "replaying for " << m_peer << dendl;
264   int r = init_rados(g_ceph_context->_conf->cluster,
265                      g_ceph_context->_conf->name.to_str(),
266                      "local cluster", &m_local_rados, false);
267   if (r < 0) {
268     m_callout_id = m_service_daemon->add_or_update_callout(
269       m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
270       "unable to connect to local cluster");
271     return;
272   }
273
274   r = init_rados(m_peer.cluster_name, m_peer.client_name,
275                  std::string("remote peer ") + stringify(m_peer),
276                  &m_remote_rados, true);
277   if (r < 0) {
278     m_callout_id = m_service_daemon->add_or_update_callout(
279       m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
280       "unable to connect to remote cluster");
281     return;
282   }
283
284   r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
285   if (r < 0) {
286     derr << "error accessing local pool " << m_local_pool_id << ": "
287          << cpp_strerror(r) << dendl;
288     return;
289   }
290
291   std::string local_mirror_uuid;
292   r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
293                                           &local_mirror_uuid);
294   if (r < 0) {
295     derr << "failed to retrieve local mirror uuid from pool "
296          << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
297     m_callout_id = m_service_daemon->add_or_update_callout(
298       m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
299       "unable to query local mirror uuid");
300     return;
301   }
302
303   r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
304                                    m_remote_io_ctx);
305   if (r < 0) {
306     derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
307          << ": " << cpp_strerror(r) << dendl;
308     m_callout_id = m_service_daemon->add_or_update_callout(
309       m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
310       "unable to access remote pool");
311     return;
312   }
313
314   dout(20) << "connected to " << m_peer << dendl;
315
316   m_instance_replayer.reset(InstanceReplayer<>::create(
317     m_threads, m_service_daemon, m_image_deleter, m_local_rados,
318     local_mirror_uuid, m_local_pool_id));
319   m_instance_replayer->init();
320   m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
321
322   m_instance_watcher.reset(InstanceWatcher<>::create(
323     m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
324   r = m_instance_watcher->init();
325   if (r < 0) {
326     derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
327     m_callout_id = m_service_daemon->add_or_update_callout(
328       m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
329       "unable to initialize instance messenger object");
330     return;
331   }
332
333   m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
334                                              &m_leader_listener));
335   r = m_leader_watcher->init();
336   if (r < 0) {
337     derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
338     m_callout_id = m_service_daemon->add_or_update_callout(
339       m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
340       "unable to initialize leader messenger object");
341     return;
342   }
343
344   if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
345     m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
346     m_callout_id = service_daemon::CALLOUT_ID_NONE;
347   }
348
349   m_pool_replayer_thread.create("pool replayer");
350 }
351
352 void PoolReplayer::shut_down() {
353   m_stopping = true;
354   {
355     Mutex::Locker l(m_lock);
356     m_cond.Signal();
357   }
358   if (m_pool_replayer_thread.is_started()) {
359     m_pool_replayer_thread.join();
360   }
361   if (m_leader_watcher) {
362     m_leader_watcher->shut_down();
363     m_leader_watcher.reset();
364   }
365   if (m_instance_watcher) {
366     m_instance_watcher->shut_down();
367     m_instance_watcher.reset();
368   }
369   if (m_instance_replayer) {
370     m_instance_replayer->shut_down();
371     m_instance_replayer.reset();
372   }
373
374   assert(!m_local_pool_watcher);
375   assert(!m_remote_pool_watcher);
376   m_local_rados.reset();
377   m_remote_rados.reset();
378 }
379
380 int PoolReplayer::init_rados(const std::string &cluster_name,
381                              const std::string &client_name,
382                              const std::string &description,
383                              RadosRef *rados_ref,
384                              bool strip_cluster_overrides) {
385   rados_ref->reset(new librados::Rados());
386
387   // NOTE: manually bootstrap a CephContext here instead of via
388   // the librados API to avoid mixing global singletons between
389   // the librados shared library and the daemon
390   // TODO: eliminate intermingling of global singletons within Ceph APIs
391   CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
392   if (client_name.empty() || !iparams.name.from_str(client_name)) {
393     derr << "error initializing cluster handle for " << description << dendl;
394     return -EINVAL;
395   }
396
397   CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
398                                     CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
399   cct->_conf->cluster = cluster_name;
400
401   // librados::Rados::conf_read_file
402   int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
403   if (r < 0) {
404     derr << "could not read ceph conf for " << description << ": "
405          << cpp_strerror(r) << dendl;
406     cct->put();
407     return r;
408   }
409
410   // preserve cluster-specific config settings before applying environment/cli
411   // overrides
412   std::map<std::string, std::string> config_values;
413   if (strip_cluster_overrides) {
414     // remote peer connections shouldn't apply cluster-specific
415     // configuration settings
416     for (auto& key : UNIQUE_PEER_CONFIG_KEYS) {
417       config_values[key] = cct->_conf->get_val<std::string>(key);
418     }
419   }
420
421   cct->_conf->parse_env();
422
423   // librados::Rados::conf_parse_env
424   std::vector<const char*> args;
425   env_to_vec(args, nullptr);
426   r = cct->_conf->parse_argv(args);
427   if (r < 0) {
428     derr << "could not parse environment for " << description << ":"
429          << cpp_strerror(r) << dendl;
430     cct->put();
431     return r;
432   }
433
434   if (!m_args.empty()) {
435     // librados::Rados::conf_parse_argv
436     args = m_args;
437     r = cct->_conf->parse_argv(args);
438     if (r < 0) {
439       derr << "could not parse command line args for " << description << ": "
440            << cpp_strerror(r) << dendl;
441       cct->put();
442       return r;
443     }
444   }
445
446   if (strip_cluster_overrides) {
447     // remote peer connections shouldn't apply cluster-specific
448     // configuration settings
449     for (auto& pair : config_values) {
450       auto value = cct->_conf->get_val<std::string>(pair.first);
451       if (pair.second != value) {
452         dout(0) << "reverting global config option override: "
453                 << pair.first << ": " << value << " -> " << pair.second
454                 << dendl;
455         cct->_conf->set_val_or_die(pair.first, pair.second);
456       }
457     }
458   }
459
460   if (!g_ceph_context->_conf->admin_socket.empty()) {
461     cct->_conf->set_val_or_die("admin_socket",
462                                "$run_dir/$name.$pid.$cluster.$cctid.asok");
463   }
464
465   // disable unnecessary librbd cache
466   cct->_conf->set_val_or_die("rbd_cache", "false");
467   cct->_conf->apply_changes(nullptr);
468   cct->_conf->complain_about_parse_errors(cct);
469
470   r = (*rados_ref)->init_with_context(cct);
471   assert(r == 0);
472   cct->put();
473
474   r = (*rados_ref)->connect();
475   if (r < 0) {
476     derr << "error connecting to " << description << ": "
477          << cpp_strerror(r) << dendl;
478     return r;
479   }
480
481   return 0;
482 }
483
484 void PoolReplayer::run()
485 {
486   dout(20) << "enter" << dendl;
487
488   while (!m_stopping) {
489     std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
490                                  m_peer.cluster_name;
491     if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
492       m_asok_hook_name = asok_hook_name;
493       delete m_asok_hook;
494
495       m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context,
496                                                     m_asok_hook_name, this);
497     }
498
499     Mutex::Locker locker(m_lock);
500     if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
501         (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
502       m_blacklisted = true;
503       m_stopping = true;
504       break;
505     }
506
507     if (!m_stopping) {
508       m_cond.WaitInterval(m_lock, utime_t(1, 0));
509     }
510   }
511 }
512
513 void PoolReplayer::print_status(Formatter *f, stringstream *ss)
514 {
515   dout(20) << "enter" << dendl;
516
517   if (!f) {
518     return;
519   }
520
521   Mutex::Locker l(m_lock);
522
523   f->open_object_section("pool_replayer_status");
524   f->dump_string("pool", m_local_io_ctx.get_pool_name());
525   f->dump_stream("peer") << m_peer;
526   f->dump_string("instance_id", m_instance_watcher->get_instance_id());
527
528   std::string leader_instance_id;
529   m_leader_watcher->get_leader_instance_id(&leader_instance_id);
530   f->dump_string("leader_instance_id", leader_instance_id);
531
532   bool leader = m_leader_watcher->is_leader();
533   f->dump_bool("leader", leader);
534   if (leader) {
535     std::vector<std::string> instance_ids;
536     m_leader_watcher->list_instances(&instance_ids);
537     f->open_array_section("instances");
538     for (auto instance_id : instance_ids) {
539       f->dump_string("instance_id", instance_id);
540     }
541     f->close_section();
542   }
543
544   f->dump_string("local_cluster_admin_socket",
545                  reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf->
546                      get_val<std::string>("admin_socket"));
547   f->dump_string("remote_cluster_admin_socket",
548                  reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
549                      get_val<std::string>("admin_socket"));
550
551   f->open_object_section("sync_throttler");
552   m_instance_watcher->print_sync_status(f, ss);
553   f->close_section();
554
555   m_instance_replayer->print_status(f, ss);
556
557   f->close_section();
558   f->flush(*ss);
559 }
560
561 void PoolReplayer::start()
562 {
563   dout(20) << "enter" << dendl;
564
565   Mutex::Locker l(m_lock);
566
567   if (m_stopping) {
568     return;
569   }
570
571   m_instance_replayer->start();
572 }
573
574 void PoolReplayer::stop(bool manual)
575 {
576   dout(20) << "enter: manual=" << manual << dendl;
577
578   Mutex::Locker l(m_lock);
579   if (!manual) {
580     m_stopping = true;
581     m_cond.Signal();
582     return;
583   } else if (m_stopping) {
584     return;
585   }
586
587   m_instance_replayer->stop();
588 }
589
590 void PoolReplayer::restart()
591 {
592   dout(20) << "enter" << dendl;
593
594   Mutex::Locker l(m_lock);
595
596   if (m_stopping) {
597     return;
598   }
599
600   m_instance_replayer->restart();
601 }
602
603 void PoolReplayer::flush()
604 {
605   dout(20) << "enter" << dendl;
606
607   Mutex::Locker l(m_lock);
608
609   if (m_stopping || m_manual_stop) {
610     return;
611   }
612
613   m_instance_replayer->flush();
614 }
615
616 void PoolReplayer::release_leader()
617 {
618   dout(20) << "enter" << dendl;
619
620   Mutex::Locker l(m_lock);
621
622   if (m_stopping || !m_leader_watcher) {
623     return;
624   }
625
626   m_leader_watcher->release_leader();
627 }
628
629 void PoolReplayer::handle_update(const std::string &mirror_uuid,
630                                  ImageIds &&added_image_ids,
631                                  ImageIds &&removed_image_ids) {
632   if (m_stopping) {
633     return;
634   }
635
636   dout(10) << "mirror_uuid=" << mirror_uuid << ", "
637            << "added_count=" << added_image_ids.size() << ", "
638            << "removed_count=" << removed_image_ids.size() << dendl;
639   Mutex::Locker locker(m_lock);
640   if (!m_leader_watcher->is_leader()) {
641     return;
642   }
643
644   m_service_daemon->add_or_update_attribute(
645     m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY,
646     m_local_pool_watcher->get_image_count());
647   if (m_remote_pool_watcher) {
648     m_service_daemon->add_or_update_attribute(
649       m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY,
650       m_remote_pool_watcher->get_image_count());
651   }
652
653   m_update_op_tracker.start_op();
654   Context *ctx = new FunctionContext([this](int r) {
655       dout(20) << "complete handle_update: r=" << r << dendl;
656       m_update_op_tracker.finish_op();
657     });
658
659   C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
660
661   for (auto &image_id : added_image_ids) {
662     // for now always send to myself (the leader)
663     std::string &instance_id = m_instance_watcher->get_instance_id();
664     m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
665                                              gather_ctx->new_sub());
666   }
667
668   if (!mirror_uuid.empty()) {
669     for (auto &image_id : removed_image_ids) {
670       // for now always send to myself (the leader)
671       std::string &instance_id = m_instance_watcher->get_instance_id();
672       m_instance_watcher->notify_peer_image_removed(instance_id,
673                                                     image_id.global_id,
674                                                     mirror_uuid,
675                                                     gather_ctx->new_sub());
676     }
677   }
678
679   gather_ctx->activate();
680 }
681
682 void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
683   dout(20) << dendl;
684
685   m_service_daemon->add_or_update_attribute(m_local_pool_id,
686                                             SERVICE_DAEMON_LEADER_KEY, true);
687   m_instance_watcher->handle_acquire_leader();
688   init_local_pool_watcher(on_finish);
689 }
690
691 void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
692   dout(20) << dendl;
693
694   m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY);
695   m_instance_watcher->handle_release_leader();
696   shut_down_pool_watchers(on_finish);
697 }
698
699 void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
700   dout(20) << dendl;
701
702   Mutex::Locker locker(m_lock);
703   assert(!m_local_pool_watcher);
704   m_local_pool_watcher.reset(new PoolWatcher<>(
705     m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
706
707   // ensure the initial set of local images is up-to-date
708   // after acquiring the leader role
709   auto ctx = new FunctionContext([this, on_finish](int r) {
710       handle_init_local_pool_watcher(r, on_finish);
711     });
712   m_local_pool_watcher->init(create_async_context_callback(
713     m_threads->work_queue, ctx));
714 }
715
716 void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) {
717   dout(20) << "r=" << r << dendl;
718   if (r < 0) {
719     derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
720     on_finish->complete(r);
721     return;
722   }
723
724   init_remote_pool_watcher(on_finish);
725 }
726
727 void PoolReplayer::init_remote_pool_watcher(Context *on_finish) {
728   dout(20) << dendl;
729
730   Mutex::Locker locker(m_lock);
731   assert(!m_remote_pool_watcher);
732   m_remote_pool_watcher.reset(new PoolWatcher<>(
733     m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
734   m_remote_pool_watcher->init(create_async_context_callback(
735     m_threads->work_queue, on_finish));
736
737   m_cond.Signal();
738 }
739
740 void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
741   dout(20) << dendl;
742
743   {
744     Mutex::Locker locker(m_lock);
745     if (m_local_pool_watcher) { 
746       Context *ctx = new FunctionContext([this, on_finish](int r) {
747           handle_shut_down_pool_watchers(r, on_finish);
748         });
749       ctx = create_async_context_callback(m_threads->work_queue, ctx);
750
751       auto gather_ctx = new C_Gather(g_ceph_context, ctx);
752       m_local_pool_watcher->shut_down(gather_ctx->new_sub());
753       if (m_remote_pool_watcher) {
754         m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
755       }
756       gather_ctx->activate();
757       return;
758     }
759   }
760
761   on_finish->complete(0);
762 }
763
764 void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
765   dout(20) << "r=" << r << dendl;
766
767   {
768     Mutex::Locker locker(m_lock);
769     assert(m_local_pool_watcher);
770     m_local_pool_watcher.reset();
771
772     if (m_remote_pool_watcher) {
773       m_remote_pool_watcher.reset();
774     }
775   }
776   wait_for_update_ops(on_finish);
777 }
778
779 void PoolReplayer::wait_for_update_ops(Context *on_finish) {
780   dout(20) << dendl;
781
782   Mutex::Locker locker(m_lock);
783
784   Context *ctx = new FunctionContext([this, on_finish](int r) {
785       handle_wait_for_update_ops(r, on_finish);
786     });
787   ctx = create_async_context_callback(m_threads->work_queue, ctx);
788
789   m_update_op_tracker.wait_for_ops(ctx);
790 }
791
792 void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
793   dout(20) << "r=" << r << dendl;
794
795   assert(r == 0);
796
797   Mutex::Locker locker(m_lock);
798   m_instance_replayer->release_all(on_finish);
799 }
800
801 void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) {
802   dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
803
804   m_instance_watcher->handle_update_leader(leader_instance_id);
805 }
806
807 } // namespace mirror
808 } // namespace rbd