Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd_mirror / ImageReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "include/stringify.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "common/Timer.h"
11 #include "common/WorkQueue.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "journal/ReplayHandler.h"
15 #include "journal/Settings.h"
16 #include "librbd/ExclusiveLock.h"
17 #include "librbd/ImageCtx.h"
18 #include "librbd/ImageState.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Operations.h"
21 #include "librbd/Utils.h"
22 #include "librbd/journal/Replay.h"
23 #include "ImageDeleter.h"
24 #include "ImageReplayer.h"
25 #include "Threads.h"
26 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
27 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
29 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
30 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
31 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
32
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
35 #undef dout_prefix
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
37                            << __func__ << ": "
38
39 using std::map;
40 using std::string;
41 using std::unique_ptr;
42 using std::shared_ptr;
43 using std::vector;
44
45 namespace rbd {
46 namespace mirror {
47
48 using librbd::util::create_context_callback;
49 using librbd::util::create_rados_callback;
50 using namespace rbd::mirror::image_replayer;
51
52 template <typename I>
53 std::ostream &operator<<(std::ostream &os,
54                          const typename ImageReplayer<I>::State &state);
55
56 namespace {
57
58 template <typename I>
59 struct ReplayHandler : public ::journal::ReplayHandler {
60   ImageReplayer<I> *replayer;
61   ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
62   void get() override {}
63   void put() override {}
64
65   void handle_entries_available() override {
66     replayer->handle_replay_ready();
67   }
68   void handle_complete(int r) override {
69     std::stringstream ss;
70     if (r < 0) {
71       ss << "replay completed with error: " << cpp_strerror(r);
72     }
73     replayer->handle_replay_complete(r, ss.str());
74   }
75 };
76
77 template <typename I>
78 class ImageReplayerAdminSocketCommand {
79 public:
80   ImageReplayerAdminSocketCommand(const std::string &desc,
81                                   ImageReplayer<I> *replayer)
82     : desc(desc), replayer(replayer) {
83   }
84   virtual ~ImageReplayerAdminSocketCommand() {}
85   virtual bool call(Formatter *f, stringstream *ss) = 0;
86
87   std::string desc;
88   ImageReplayer<I> *replayer;
89   bool registered = false;
90 };
91
92 template <typename I>
93 class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
94 public:
95   explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
96     : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
97   }
98
99   bool call(Formatter *f, stringstream *ss) override {
100     this->replayer->print_status(f, ss);
101     return true;
102   }
103 };
104
105 template <typename I>
106 class StartCommand : public ImageReplayerAdminSocketCommand<I> {
107 public:
108   explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
109     : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
110   }
111
112   bool call(Formatter *f, stringstream *ss) override {
113     this->replayer->start(nullptr, true);
114     return true;
115   }
116 };
117
118 template <typename I>
119 class StopCommand : public ImageReplayerAdminSocketCommand<I> {
120 public:
121   explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
122     : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
123   }
124
125   bool call(Formatter *f, stringstream *ss) override {
126     this->replayer->stop(nullptr, true);
127     return true;
128   }
129 };
130
131 template <typename I>
132 class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
133 public:
134   explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
135     : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
136   }
137
138   bool call(Formatter *f, stringstream *ss) override {
139     this->replayer->restart();
140     return true;
141   }
142 };
143
144 template <typename I>
145 class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
146 public:
147   explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
148     : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
149   }
150
151   bool call(Formatter *f, stringstream *ss) override {
152     C_SaferCond cond;
153     this->replayer->flush(&cond);
154     int r = cond.wait();
155     if (r < 0) {
156       *ss << "flush: " << cpp_strerror(r);
157       return false;
158     }
159     return true;
160   }
161 };
162
163 template <typename I>
164 class ImageReplayerAdminSocketHook : public AdminSocketHook {
165 public:
166   ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
167                                ImageReplayer<I> *replayer)
168     : admin_socket(cct->get_admin_socket()),
169       commands{{"rbd mirror flush " + name,
170                 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
171                {"rbd mirror restart " + name,
172                 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
173                {"rbd mirror start " + name,
174                 new StartCommand<I>("start rbd mirror " + name, replayer)},
175                {"rbd mirror status " + name,
176                 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
177                {"rbd mirror stop " + name,
178                 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
179   }
180
181   int register_commands() {
182     for (auto &it : commands) {
183       int r = admin_socket->register_command(it.first, it.first, this,
184                                              it.second->desc);
185       if (r < 0) {
186         return r;
187       }
188       it.second->registered = true;
189     }
190     return 0;
191   }
192
193   ~ImageReplayerAdminSocketHook() override {
194     for (auto &it : commands) {
195       if (it.second->registered) {
196         admin_socket->unregister_command(it.first);
197       }
198       delete it.second;
199     }
200     commands.clear();
201   }
202
203   bool call(std::string command, cmdmap_t& cmdmap, std::string format,
204             bufferlist& out) override {
205     auto i = commands.find(command);
206     assert(i != commands.end());
207     Formatter *f = Formatter::create(format);
208     stringstream ss;
209     bool r = i->second->call(f, &ss);
210     delete f;
211     out.append(ss);
212     return r;
213   }
214
215 private:
216   typedef std::map<std::string, ImageReplayerAdminSocketCommand<I> *> Commands;
217
218   AdminSocket *admin_socket;
219   Commands commands;
220 };
221
222 uint32_t calculate_replay_delay(const utime_t &event_time,
223                                 int mirroring_replay_delay) {
224     if (mirroring_replay_delay <= 0) {
225       return 0;
226     }
227
228     utime_t now = ceph_clock_now();
229     if (event_time + mirroring_replay_delay <= now) {
230       return 0;
231     }
232
233     // ensure it is rounded up when converting to integer
234     return (event_time + mirroring_replay_delay - now) + 1;
235 }
236
237 } // anonymous namespace
238
239 template <typename I>
240 void ImageReplayer<I>::BootstrapProgressContext::update_progress(
241   const std::string &description, bool flush)
242 {
243   const std::string desc = "bootstrapping, " + description;
244   replayer->set_state_description(0, desc);
245   if (flush) {
246     replayer->update_mirror_image_status(false, boost::none);
247   }
248 }
249
250 template <typename I>
251 void ImageReplayer<I>::RemoteJournalerListener::handle_update(
252   ::journal::JournalMetadata *) {
253   FunctionContext *ctx = new FunctionContext([this](int r) {
254       replayer->handle_remote_journal_metadata_updated();
255     });
256   replayer->m_threads->work_queue->queue(ctx, 0);
257 }
258
259 template <typename I>
260 ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
261                                 ImageDeleter<I>* image_deleter,
262                                 InstanceWatcher<I> *instance_watcher,
263                                 RadosRef local,
264                                 const std::string &local_mirror_uuid,
265                                 int64_t local_pool_id,
266                                 const std::string &global_image_id) :
267   m_threads(threads),
268   m_image_deleter(image_deleter),
269   m_instance_watcher(instance_watcher),
270   m_local(local),
271   m_local_mirror_uuid(local_mirror_uuid),
272   m_local_pool_id(local_pool_id),
273   m_global_image_id(global_image_id),
274   m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
275          global_image_id),
276   m_progress_cxt(this),
277   m_journal_listener(new JournalListener(this)),
278   m_remote_listener(this)
279 {
280   // Register asok commands using a temporary "remote_pool_name/global_image_id"
281   // name.  When the image name becomes known on start the asok commands will be
282   // re-registered using "remote_pool_name/remote_image_name" name.
283
284   std::string pool_name;
285   int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
286   if (r < 0) {
287     derr << "error resolving local pool " << m_local_pool_id
288          << ": " << cpp_strerror(r) << dendl;
289     pool_name = stringify(m_local_pool_id);
290   }
291
292   m_name = pool_name + "/" + m_global_image_id;
293   register_admin_socket_hook();
294 }
295
296 template <typename I>
297 ImageReplayer<I>::~ImageReplayer()
298 {
299   unregister_admin_socket_hook();
300   assert(m_event_preprocessor == nullptr);
301   assert(m_replay_status_formatter == nullptr);
302   assert(m_local_image_ctx == nullptr);
303   assert(m_local_replay == nullptr);
304   assert(m_remote_journaler == nullptr);
305   assert(m_replay_handler == nullptr);
306   assert(m_on_start_finish == nullptr);
307   assert(m_on_stop_finish == nullptr);
308   assert(m_bootstrap_request == nullptr);
309   assert(m_in_flight_status_updates == 0);
310
311   delete m_journal_listener;
312 }
313
314 template <typename I>
315 image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
316   Mutex::Locker locker(m_lock);
317
318   if (!m_mirror_image_status_state) {
319     return image_replayer::HEALTH_STATE_OK;
320   } else if (*m_mirror_image_status_state ==
321                cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
322              *m_mirror_image_status_state ==
323                cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
324     return image_replayer::HEALTH_STATE_WARNING;
325   }
326   return image_replayer::HEALTH_STATE_ERROR;
327 }
328
329 template <typename I>
330 void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
331                                 librados::IoCtx &io_ctx) {
332   Mutex::Locker locker(m_lock);
333   auto it = m_peers.find({peer_uuid});
334   if (it == m_peers.end()) {
335     m_peers.insert({peer_uuid, io_ctx});
336   }
337 }
338
339 template <typename I>
340 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
341   dout(20) << r << " " << desc << dendl;
342
343   Mutex::Locker l(m_lock);
344   m_last_r = r;
345   m_state_desc = desc;
346 }
347
348 template <typename I>
349 void ImageReplayer<I>::start(Context *on_finish, bool manual)
350 {
351   dout(20) << "on_finish=" << on_finish << dendl;
352
353   int r = 0;
354   {
355     Mutex::Locker locker(m_lock);
356     if (!is_stopped_()) {
357       derr << "already running" << dendl;
358       r = -EINVAL;
359     } else if (m_manual_stop && !manual) {
360       dout(5) << "stopped manually, ignoring start without manual flag"
361               << dendl;
362       r = -EPERM;
363     } else {
364       m_state = STATE_STARTING;
365       m_last_r = 0;
366       m_state_desc.clear();
367       m_manual_stop = false;
368       m_delete_requested = false;
369
370       if (on_finish != nullptr) {
371         assert(m_on_start_finish == nullptr);
372         m_on_start_finish = on_finish;
373       }
374       assert(m_on_stop_finish == nullptr);
375     }
376   }
377
378   if (r < 0) {
379     if (on_finish) {
380       on_finish->complete(r);
381     }
382     return;
383   }
384
385   r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
386   if (r < 0) {
387     derr << "error opening ioctx for local pool " << m_local_pool_id
388          << ": " << cpp_strerror(r) << dendl;
389     on_start_fail(r, "error opening local pool");
390     return;
391   }
392
393   wait_for_deletion();
394 }
395
396 template <typename I>
397 void ImageReplayer<I>::wait_for_deletion() {
398   dout(20) << dendl;
399
400   Context *ctx = create_context_callback<
401     ImageReplayer, &ImageReplayer<I>::handle_wait_for_deletion>(this);
402   m_image_deleter->wait_for_scheduled_deletion(
403     m_local_pool_id, m_global_image_id, ctx, false);
404 }
405
406 template <typename I>
407 void ImageReplayer<I>::handle_wait_for_deletion(int r) {
408   dout(20) << "r=" << r << dendl;
409
410   if (r == -ECANCELED) {
411     on_start_fail(0, "");
412     return;
413   } else if (r < 0) {
414     on_start_fail(r, "error waiting for image deletion");
415     return;
416   }
417
418   prepare_local_image();
419 }
420
421 template <typename I>
422 void ImageReplayer<I>::prepare_local_image() {
423   dout(20) << dendl;
424
425   m_local_image_id = "";
426   Context *ctx = create_context_callback<
427     ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
428   auto req = PrepareLocalImageRequest<I>::create(
429     m_local_ioctx, m_global_image_id, &m_local_image_id,
430     &m_local_image_tag_owner, m_threads->work_queue, ctx);
431   req->send();
432 }
433
434 template <typename I>
435 void ImageReplayer<I>::handle_prepare_local_image(int r) {
436   dout(20) << "r=" << r << dendl;
437
438   if (r == -ENOENT) {
439     dout(20) << "local image does not exist" << dendl;
440   } else if (r < 0) {
441     on_start_fail(r, "error preparing local image for replay");
442     return;
443   } else if (m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
444     dout(5) << "local image is primary" << dendl;
445     on_start_fail(0, "local image is primary");
446     return;
447   }
448
449   // local image doesn't exist or is non-primary
450   prepare_remote_image();
451 }
452
453 template <typename I>
454 void ImageReplayer<I>::prepare_remote_image() {
455   dout(20) << dendl;
456
457   // TODO need to support multiple remote images
458   assert(!m_peers.empty());
459   m_remote_image = {*m_peers.begin()};
460
461   Context *ctx = create_context_callback<
462     ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
463   auto req = PrepareRemoteImageRequest<I>::create(
464     m_remote_image.io_ctx, m_global_image_id, &m_remote_image.mirror_uuid,
465     &m_remote_image.image_id, ctx);
466   req->send();
467 }
468
469 template <typename I>
470 void ImageReplayer<I>::handle_prepare_remote_image(int r) {
471   dout(20) << "r=" << r << dendl;
472
473   if (r == -ENOENT) {
474     dout(20) << "remote image does not exist" << dendl;
475
476     // TODO need to support multiple remote images
477     if (!m_local_image_id.empty() &&
478         m_local_image_tag_owner == m_remote_image.mirror_uuid) {
479       // local image exists and is non-primary and linked to the missing
480       // remote image
481
482       m_delete_requested = true;
483       on_start_fail(0, "remote image no longer exists");
484     } else {
485       on_start_fail(-ENOENT, "remote image does not exist");
486     }
487     return;
488   } else if (r < 0) {
489     on_start_fail(r, "error retrieving remote image id");
490     return;
491   }
492
493   bootstrap();
494 }
495
496 template <typename I>
497 void ImageReplayer<I>::bootstrap() {
498   dout(20) << dendl;
499
500   CephContext *cct = static_cast<CephContext *>(m_local->cct());
501   journal::Settings settings;
502   settings.commit_interval = cct->_conf->get_val<double>(
503     "rbd_mirror_journal_commit_age");
504   settings.max_fetch_bytes = cct->_conf->get_val<uint64_t>(
505     "rbd_mirror_journal_max_fetch_bytes");
506
507   m_remote_journaler = new Journaler(m_threads->work_queue,
508                                      m_threads->timer,
509                                      &m_threads->timer_lock,
510                                      m_remote_image.io_ctx,
511                                      m_remote_image.image_id,
512                                      m_local_mirror_uuid, settings);
513
514   Context *ctx = create_context_callback<
515     ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
516
517   BootstrapRequest<I> *request = BootstrapRequest<I>::create(
518     m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
519     &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
520     m_global_image_id, m_threads->work_queue, m_threads->timer,
521     &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
522     m_remote_journaler, &m_client_meta, ctx, &m_resync_requested,
523     &m_progress_cxt);
524
525   {
526     Mutex::Locker locker(m_lock);
527     request->get();
528     m_bootstrap_request = request;
529   }
530
531   update_mirror_image_status(false, boost::none);
532   reschedule_update_status_task(10);
533
534   request->send();
535 }
536
537 template <typename I>
538 void ImageReplayer<I>::handle_bootstrap(int r) {
539   dout(20) << "r=" << r << dendl;
540   {
541     Mutex::Locker locker(m_lock);
542     m_bootstrap_request->put();
543     m_bootstrap_request = nullptr;
544     if (m_local_image_ctx) {
545       m_local_image_id = m_local_image_ctx->id;
546     }
547   }
548
549   if (r == -EREMOTEIO) {
550     m_local_image_tag_owner = "";
551     dout(5) << "remote image is non-primary" << dendl;
552     on_start_fail(-EREMOTEIO, "remote image is non-primary");
553     return;
554   } else if (r == -EEXIST) {
555     m_local_image_tag_owner = "";
556     on_start_fail(r, "split-brain detected");
557     return;
558   } else if (r < 0) {
559     on_start_fail(r, "error bootstrapping replay");
560     return;
561   } else if (on_start_interrupted()) {
562     return;
563   } else if (m_resync_requested) {
564     on_start_fail(0, "resync requested");
565     return;
566   }
567
568   assert(m_local_journal == nullptr);
569   {
570     RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
571     if (m_local_image_ctx->journal != nullptr) {
572       m_local_journal = m_local_image_ctx->journal;
573       m_local_journal->add_listener(m_journal_listener);
574     }
575   }
576
577   if (m_local_journal == nullptr) {
578     on_start_fail(-EINVAL, "error accessing local journal");
579     return;
580   }
581
582   on_name_changed();
583
584   update_mirror_image_status(false, boost::none);
585   init_remote_journaler();
586 }
587
588 template <typename I>
589 void ImageReplayer<I>::init_remote_journaler() {
590   dout(20) << dendl;
591
592   Context *ctx = create_context_callback<
593     ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
594   m_remote_journaler->init(ctx);
595 }
596
597 template <typename I>
598 void ImageReplayer<I>::handle_init_remote_journaler(int r) {
599   dout(20) << "r=" << r << dendl;
600
601   if (r < 0) {
602     derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
603     on_start_fail(r, "error initializing remote journal");
604     return;
605   } else if (on_start_interrupted()) {
606     return;
607   }
608
609   m_remote_journaler->add_listener(&m_remote_listener);
610
611   cls::journal::Client client;
612   r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
613   if (r < 0) {
614     derr << "error retrieving remote journal client: " << cpp_strerror(r)
615          << dendl;
616     on_start_fail(r, "error retrieving remote journal client");
617     return;
618   }
619
620   derr << "image_id=" << m_local_image_id << ", "
621        << "m_client_meta.image_id=" << m_client_meta.image_id << ", "
622        << "client.state=" << client.state << dendl;
623   if (m_client_meta.image_id == m_local_image_id &&
624       client.state != cls::journal::CLIENT_STATE_CONNECTED) {
625     dout(5) << "client flagged disconnected, stopping image replay" << dendl;
626     if (m_local_image_ctx->mirroring_resync_after_disconnect) {
627       m_resync_requested = true;
628       on_start_fail(-ENOTCONN, "disconnected: automatic resync");
629     } else {
630       on_start_fail(-ENOTCONN, "disconnected");
631     }
632     return;
633   }
634
635   start_replay();
636 }
637
638 template <typename I>
639 void ImageReplayer<I>::start_replay() {
640   dout(20) << dendl;
641
642   Context *start_ctx = create_context_callback<
643     ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
644   m_local_journal->start_external_replay(&m_local_replay, start_ctx);
645 }
646
647 template <typename I>
648 void ImageReplayer<I>::handle_start_replay(int r) {
649   dout(20) << "r=" << r << dendl;
650
651   if (r < 0) {
652     assert(m_local_replay == nullptr);
653     derr << "error starting external replay on local image "
654          <<  m_local_image_id << ": " << cpp_strerror(r) << dendl;
655     on_start_fail(r, "error starting replay on local image");
656     return;
657   }
658
659   Context *on_finish(nullptr);
660   {
661     Mutex::Locker locker(m_lock);
662     assert(m_state == STATE_STARTING);
663     m_state = STATE_REPLAYING;
664     std::swap(m_on_start_finish, on_finish);
665   }
666
667   m_event_preprocessor = EventPreprocessor<I>::create(
668     *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
669     &m_client_meta, m_threads->work_queue);
670   m_replay_status_formatter =
671     ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
672
673   update_mirror_image_status(true, boost::none);
674   reschedule_update_status_task(30);
675
676   if (on_replay_interrupted()) {
677     return;
678   }
679
680   {
681     CephContext *cct = static_cast<CephContext *>(m_local->cct());
682     double poll_seconds = cct->_conf->get_val<double>(
683       "rbd_mirror_journal_poll_age");
684
685     Mutex::Locker locker(m_lock);
686     m_replay_handler = new ReplayHandler<I>(this);
687     m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
688
689     dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
690   }
691
692   dout(20) << "start succeeded" << dendl;
693   if (on_finish != nullptr) {
694     dout(20) << "on finish complete, r=" << r << dendl;
695     on_finish->complete(r);
696   }
697 }
698
699 template <typename I>
700 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
701 {
702   dout(20) << "r=" << r << dendl;
703   Context *ctx = new FunctionContext([this, r, desc](int _r) {
704       {
705         Mutex::Locker locker(m_lock);
706         assert(m_state == STATE_STARTING);
707         m_state = STATE_STOPPING;
708         if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
709           derr << "start failed: " << cpp_strerror(r) << dendl;
710         } else {
711           dout(20) << "start canceled" << dendl;
712         }
713       }
714
715       set_state_description(r, desc);
716       update_mirror_image_status(false, boost::none);
717       reschedule_update_status_task(-1);
718       shut_down(r);
719     });
720   m_threads->work_queue->queue(ctx, 0);
721 }
722
723 template <typename I>
724 bool ImageReplayer<I>::on_start_interrupted()
725 {
726   Mutex::Locker locker(m_lock);
727   assert(m_state == STATE_STARTING);
728   if (m_on_stop_finish == nullptr) {
729     return false;
730   }
731
732   on_start_fail(-ECANCELED);
733   return true;
734 }
735
736 template <typename I>
737 void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
738                             const std::string& desc)
739 {
740   dout(20) << "on_finish=" << on_finish << ", manual=" << manual
741            << ", desc=" << desc << dendl;
742
743   m_image_deleter->cancel_waiter(m_local_pool_id, m_global_image_id);
744
745   image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
746   bool shut_down_replay = false;
747   bool running = true;
748   {
749     Mutex::Locker locker(m_lock);
750
751     if (!is_running_()) {
752       running = false;
753     } else {
754       if (!is_stopped_()) {
755         if (m_state == STATE_STARTING) {
756           dout(20) << "canceling start" << dendl;
757           if (m_bootstrap_request) {
758             bootstrap_request = m_bootstrap_request;
759             bootstrap_request->get();
760           }
761         } else {
762           dout(20) << "interrupting replay" << dendl;
763           shut_down_replay = true;
764         }
765
766         assert(m_on_stop_finish == nullptr);
767         std::swap(m_on_stop_finish, on_finish);
768         m_stop_requested = true;
769         m_manual_stop = manual;
770       }
771     }
772   }
773
774   // avoid holding lock since bootstrap request will update status
775   if (bootstrap_request != nullptr) {
776     bootstrap_request->cancel();
777     bootstrap_request->put();
778   }
779
780   if (!running) {
781     dout(20) << "not running" << dendl;
782     if (on_finish) {
783       on_finish->complete(-EINVAL);
784     }
785     return;
786   }
787
788   if (shut_down_replay) {
789     on_stop_journal_replay(r, desc);
790   } else if (on_finish != nullptr) {
791     on_finish->complete(0);
792   }
793 }
794
795 template <typename I>
796 void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
797 {
798   dout(20) << "enter" << dendl;
799
800   {
801     Mutex::Locker locker(m_lock);
802     if (m_state != STATE_REPLAYING) {
803       // might be invoked multiple times while stopping
804       return;
805     }
806     m_stop_requested = true;
807     m_state = STATE_STOPPING;
808   }
809
810   set_state_description(r, desc);
811   update_mirror_image_status(false, boost::none);
812   reschedule_update_status_task(-1);
813   shut_down(0);
814 }
815
816 template <typename I>
817 void ImageReplayer<I>::handle_replay_ready()
818 {
819   dout(20) << "enter" << dendl;
820   if (on_replay_interrupted()) {
821     return;
822   }
823
824   if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
825     return;
826   }
827
828   m_event_replay_tracker.start_op();
829
830   m_lock.Lock();
831   bool stopping = (m_state == STATE_STOPPING);
832   m_lock.Unlock();
833
834   if (stopping) {
835     dout(10) << "stopping event replay" << dendl;
836     m_event_replay_tracker.finish_op();
837     return;
838   }
839
840   if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
841     preprocess_entry();
842     return;
843   }
844
845   replay_flush();
846 }
847
848 template <typename I>
849 void ImageReplayer<I>::restart(Context *on_finish)
850 {
851   FunctionContext *ctx = new FunctionContext(
852     [this, on_finish](int r) {
853       if (r < 0) {
854         // Try start anyway.
855       }
856       start(on_finish, true);
857     });
858   stop(ctx);
859 }
860
861 template <typename I>
862 void ImageReplayer<I>::flush(Context *on_finish)
863 {
864   dout(20) << "enter" << dendl;
865
866   {
867     Mutex::Locker locker(m_lock);
868     if (m_state == STATE_REPLAYING) {
869       Context *ctx = new FunctionContext(
870         [on_finish](int r) {
871           if (on_finish != nullptr) {
872             on_finish->complete(r);
873           }
874         });
875       on_flush_local_replay_flush_start(ctx);
876       return;
877     }
878   }
879
880   if (on_finish) {
881     on_finish->complete(0);
882   }
883 }
884
885 template <typename I>
886 void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
887 {
888   dout(20) << "enter" << dendl;
889   FunctionContext *ctx = new FunctionContext(
890     [this, on_flush](int r) {
891       on_flush_local_replay_flush_finish(on_flush, r);
892     });
893
894   assert(m_lock.is_locked());
895   assert(m_state == STATE_REPLAYING);
896   m_local_replay->flush(ctx);
897 }
898
899 template <typename I>
900 void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
901                                                           int r)
902 {
903   dout(20) << "r=" << r << dendl;
904   if (r < 0) {
905     derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
906     on_flush->complete(r);
907     return;
908   }
909
910   on_flush_flush_commit_position_start(on_flush);
911 }
912
913 template <typename I>
914 void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
915 {
916   FunctionContext *ctx = new FunctionContext(
917     [this, on_flush](int r) {
918       on_flush_flush_commit_position_finish(on_flush, r);
919     });
920
921   m_remote_journaler->flush_commit_position(ctx);
922 }
923
924 template <typename I>
925 void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
926                                                              int r)
927 {
928   if (r < 0) {
929     derr << "error flushing remote journal commit position: "
930          << cpp_strerror(r) << dendl;
931   }
932
933   update_mirror_image_status(false, boost::none);
934
935   dout(20) << "flush complete, r=" << r << dendl;
936   on_flush->complete(r);
937 }
938
939 template <typename I>
940 bool ImageReplayer<I>::on_replay_interrupted()
941 {
942   bool shut_down;
943   {
944     Mutex::Locker locker(m_lock);
945     shut_down = m_stop_requested;
946   }
947
948   if (shut_down) {
949     on_stop_journal_replay();
950   }
951   return shut_down;
952 }
953
954 template <typename I>
955 void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
956 {
957   dout(20) << "enter" << dendl;
958
959   Mutex::Locker l(m_lock);
960
961   if (f) {
962     f->open_object_section("image_replayer");
963     f->dump_string("name", m_name);
964     f->dump_string("state", to_string(m_state));
965     f->close_section();
966     f->flush(*ss);
967   } else {
968     *ss << m_name << ": state: " << to_string(m_state);
969   }
970 }
971
972 template <typename I>
973 void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
974 {
975   dout(20) << "r=" << r << dendl;
976   if (r < 0) {
977     derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
978     set_state_description(r, error_desc);
979   }
980
981   {
982     Mutex::Locker locker(m_lock);
983     m_stop_requested = true;
984   }
985   on_replay_interrupted();
986 }
987
988 template <typename I>
989 void ImageReplayer<I>::replay_flush() {
990   dout(20) << dendl;
991
992   bool interrupted = false;
993   {
994     Mutex::Locker locker(m_lock);
995     if (m_state != STATE_REPLAYING) {
996       dout(20) << "replay interrupted" << dendl;
997       interrupted = true;
998     } else {
999       m_state = STATE_REPLAY_FLUSHING;
1000     }
1001   }
1002
1003   if (interrupted) {
1004     m_event_replay_tracker.finish_op();
1005     return;
1006   }
1007
1008   // shut down the replay to flush all IO and ops and create a new
1009   // replayer to handle the new tag epoch
1010   Context *ctx = create_context_callback<
1011     ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1012   ctx = new FunctionContext([this, ctx](int r) {
1013       m_local_image_ctx->journal->stop_external_replay();
1014       m_local_replay = nullptr;
1015
1016       if (r < 0) {
1017         ctx->complete(r);
1018         return;
1019       }
1020
1021       m_local_journal->start_external_replay(&m_local_replay, ctx);
1022     });
1023   m_local_replay->shut_down(false, ctx);
1024 }
1025
1026 template <typename I>
1027 void ImageReplayer<I>::handle_replay_flush(int r) {
1028   dout(20) << "r=" << r << dendl;
1029
1030   {
1031     Mutex::Locker locker(m_lock);
1032     assert(m_state == STATE_REPLAY_FLUSHING);
1033     m_state = STATE_REPLAYING;
1034   }
1035
1036   if (r < 0) {
1037     derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1038     m_event_replay_tracker.finish_op();
1039     handle_replay_complete(r, "replay flush encountered an error");
1040     return;
1041   } else if (on_replay_interrupted()) {
1042     m_event_replay_tracker.finish_op();
1043     return;
1044   }
1045
1046   get_remote_tag();
1047 }
1048
1049 template <typename I>
1050 void ImageReplayer<I>::get_remote_tag() {
1051   dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
1052
1053   Context *ctx = create_context_callback<
1054     ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1055   m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1056 }
1057
1058 template <typename I>
1059 void ImageReplayer<I>::handle_get_remote_tag(int r) {
1060   dout(20) << "r=" << r << dendl;
1061
1062   if (r == 0) {
1063     try {
1064       bufferlist::iterator it = m_replay_tag.data.begin();
1065       ::decode(m_replay_tag_data, it);
1066     } catch (const buffer::error &err) {
1067       r = -EBADMSG;
1068     }
1069   }
1070
1071   if (r < 0) {
1072     derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1073          << cpp_strerror(r) << dendl;
1074     m_event_replay_tracker.finish_op();
1075     handle_replay_complete(r, "failed to retrieve remote tag");
1076     return;
1077   }
1078
1079   m_replay_tag_valid = true;
1080   dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
1081            << m_replay_tag_data << dendl;
1082
1083   allocate_local_tag();
1084 }
1085
1086 template <typename I>
1087 void ImageReplayer<I>::allocate_local_tag() {
1088   dout(20) << dendl;
1089
1090   std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
1091   if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
1092       mirror_uuid == m_local_mirror_uuid) {
1093     mirror_uuid = m_remote_image.mirror_uuid;
1094   } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
1095     dout(5) << "encountered image demotion: stopping" << dendl;
1096     Mutex::Locker locker(m_lock);
1097     m_stop_requested = true;
1098   }
1099
1100   librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1101   if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1102     predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1103   } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1104     predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1105   }
1106
1107   dout(20) << "mirror_uuid=" << mirror_uuid << ", "
1108            << "predecessor_mirror_uuid=" << predecessor.mirror_uuid << ", "
1109            << "replay_tag_tid=" << m_replay_tag_tid << ", "
1110            << "replay_tag_data=" << m_replay_tag_data << dendl;
1111   Context *ctx = create_context_callback<
1112     ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1113   m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1114 }
1115
1116 template <typename I>
1117 void ImageReplayer<I>::handle_allocate_local_tag(int r) {
1118   dout(20) << "r=" << r << dendl;
1119
1120   if (r < 0) {
1121     derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1122     m_event_replay_tracker.finish_op();
1123     handle_replay_complete(r, "failed to allocate journal tag");
1124     return;
1125   }
1126
1127   preprocess_entry();
1128 }
1129
1130 template <typename I>
1131 void ImageReplayer<I>::preprocess_entry() {
1132   dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1133            << dendl;
1134
1135   bufferlist data = m_replay_entry.get_data();
1136   bufferlist::iterator it = data.begin();
1137   int r = m_local_replay->decode(&it, &m_event_entry);
1138   if (r < 0) {
1139     derr << "failed to decode journal event" << dendl;
1140     m_event_replay_tracker.finish_op();
1141     handle_replay_complete(r, "failed to decode journal event");
1142     return;
1143   }
1144
1145   uint32_t delay = calculate_replay_delay(
1146     m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1147   if (delay == 0) {
1148     handle_preprocess_entry_ready(0);
1149     return;
1150   }
1151
1152   dout(20) << "delaying replay by " << delay << " sec" << dendl;
1153
1154   Mutex::Locker timer_locker(m_threads->timer_lock);
1155   assert(m_delayed_preprocess_task == nullptr);
1156   m_delayed_preprocess_task = new FunctionContext(
1157     [this](int r) {
1158       assert(m_threads->timer_lock.is_locked());
1159       m_delayed_preprocess_task = nullptr;
1160       m_threads->work_queue->queue(
1161         create_context_callback<ImageReplayer,
1162         &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1163     });
1164   m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1165 }
1166
1167 template <typename I>
1168 void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1169   dout(20) << "r=" << r << dendl;
1170   assert(r == 0);
1171
1172   if (!m_event_preprocessor->is_required(m_event_entry)) {
1173     process_entry();
1174     return;
1175   }
1176
1177   Context *ctx = create_context_callback<
1178     ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1179   m_event_preprocessor->preprocess(&m_event_entry, ctx);
1180 }
1181
1182 template <typename I>
1183 void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1184   dout(20) << "r=" << r << dendl;
1185
1186   if (r < 0) {
1187     m_event_replay_tracker.finish_op();
1188
1189     if (r == -ECANCELED) {
1190       handle_replay_complete(0, "lost exclusive lock");
1191     } else {
1192       derr << "failed to preprocess journal event" << dendl;
1193       handle_replay_complete(r, "failed to preprocess journal event");
1194     }
1195     return;
1196   }
1197
1198   process_entry();
1199 }
1200
1201 template <typename I>
1202 void ImageReplayer<I>::process_entry() {
1203   dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1204            << dendl;
1205
1206   // stop replaying events if stop has been requested
1207   if (on_replay_interrupted()) {
1208     m_event_replay_tracker.finish_op();
1209     return;
1210   }
1211
1212   Context *on_ready = create_context_callback<
1213     ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1214   Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
1215
1216   m_local_replay->process(m_event_entry, on_ready, on_commit);
1217 }
1218
1219 template <typename I>
1220 void ImageReplayer<I>::handle_process_entry_ready(int r) {
1221   dout(20) << dendl;
1222   assert(r == 0);
1223
1224   on_name_changed();
1225
1226   // attempt to process the next event
1227   handle_replay_ready();
1228 }
1229
1230 template <typename I>
1231 void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
1232                                                  int r) {
1233   dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1234            << dendl;
1235
1236   if (r < 0) {
1237     derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1238     handle_replay_complete(r, "failed to commit journal event");
1239   } else {
1240     assert(m_remote_journaler != nullptr);
1241     m_remote_journaler->committed(replay_entry);
1242   }
1243   m_event_replay_tracker.finish_op();
1244 }
1245
1246 template <typename I>
1247 bool ImageReplayer<I>::update_mirror_image_status(bool force,
1248                                                   const OptionalState &state) {
1249   dout(20) << dendl;
1250   {
1251     Mutex::Locker locker(m_lock);
1252     if (!start_mirror_image_status_update(force, false)) {
1253       return false;
1254     }
1255   }
1256
1257   queue_mirror_image_status_update(state);
1258   return true;
1259 }
1260
1261 template <typename I>
1262 bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1263                                                         bool restarting) {
1264   assert(m_lock.is_locked());
1265
1266   if (!force && !is_stopped_()) {
1267     if (!is_running_()) {
1268       dout(20) << "shut down in-progress: ignoring update" << dendl;
1269       return false;
1270     } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1271       dout(20) << "already sending update" << dendl;
1272       m_update_status_requested = true;
1273       return false;
1274     }
1275   }
1276
1277   dout(20) << dendl;
1278   ++m_in_flight_status_updates;
1279   return true;
1280 }
1281
1282 template <typename I>
1283 void ImageReplayer<I>::finish_mirror_image_status_update() {
1284   Context *on_finish = nullptr;
1285   {
1286     Mutex::Locker locker(m_lock);
1287     assert(m_in_flight_status_updates > 0);
1288     if (--m_in_flight_status_updates > 0) {
1289       dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
1290                << "updates" << dendl;
1291       return;
1292     }
1293
1294     std::swap(on_finish, m_on_update_status_finish);
1295   }
1296
1297   dout(20) << dendl;
1298   if (on_finish != nullptr) {
1299     on_finish->complete(0);
1300   }
1301 }
1302
1303 template <typename I>
1304 void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1305   dout(20) << dendl;
1306   FunctionContext *ctx = new FunctionContext(
1307     [this, state](int r) {
1308       send_mirror_status_update(state);
1309     });
1310   m_threads->work_queue->queue(ctx, 0);
1311 }
1312
1313 template <typename I>
1314 void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1315   State state;
1316   std::string state_desc;
1317   int last_r;
1318   bool stopping_replay;
1319
1320   OptionalMirrorImageStatusState mirror_image_status_state{
1321     boost::make_optional(false, cls::rbd::MirrorImageStatusState{})};
1322   image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
1323   {
1324     Mutex::Locker locker(m_lock);
1325     state = m_state;
1326     state_desc = m_state_desc;
1327     mirror_image_status_state = m_mirror_image_status_state;
1328     last_r = m_last_r;
1329     stopping_replay = (m_local_image_ctx != nullptr);
1330
1331     if (m_bootstrap_request != nullptr) {
1332       bootstrap_request = m_bootstrap_request;
1333       bootstrap_request->get();
1334     }
1335   }
1336
1337   bool syncing = false;
1338   if (bootstrap_request != nullptr) {
1339     syncing = bootstrap_request->is_syncing();
1340     bootstrap_request->put();
1341     bootstrap_request = nullptr;
1342   }
1343
1344   if (opt_state) {
1345     state = *opt_state;
1346   }
1347
1348   cls::rbd::MirrorImageStatus status;
1349   status.up = true;
1350   switch (state) {
1351   case STATE_STARTING:
1352     if (syncing) {
1353       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1354       status.description = state_desc.empty() ? "syncing" : state_desc;
1355       mirror_image_status_state = status.state;
1356     } else {
1357       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1358       status.description = "starting replay";
1359     }
1360     break;
1361   case STATE_REPLAYING:
1362   case STATE_REPLAY_FLUSHING:
1363     status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1364     {
1365       Context *on_req_finish = new FunctionContext(
1366         [this](int r) {
1367           dout(20) << "replay status ready: r=" << r << dendl;
1368           if (r >= 0) {
1369             send_mirror_status_update(boost::none);
1370           } else if (r == -EAGAIN) {
1371             // decrement in-flight status update counter
1372             handle_mirror_status_update(r);
1373           }
1374         });
1375
1376       std::string desc;
1377       if (!m_replay_status_formatter->get_or_send_update(&desc,
1378                                                          on_req_finish)) {
1379         dout(20) << "waiting for replay status" << dendl;
1380         return;
1381       }
1382       status.description = "replaying, " + desc;
1383       mirror_image_status_state = boost::none;
1384     }
1385     break;
1386   case STATE_STOPPING:
1387     if (stopping_replay) {
1388       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1389       status.description = "stopping replay";
1390       break;
1391     }
1392     // FALLTHROUGH
1393   case STATE_STOPPED:
1394     if (last_r == -EREMOTEIO) {
1395       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1396       status.description = state_desc;
1397       mirror_image_status_state = status.state;
1398     } else if (last_r < 0) {
1399       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1400       status.description = state_desc;
1401       mirror_image_status_state = status.state;
1402     } else {
1403       status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1404       status.description = state_desc.empty() ? "stopped" : state_desc;
1405       mirror_image_status_state = boost::none;
1406     }
1407     break;
1408   default:
1409     assert(!"invalid state");
1410   }
1411
1412   {
1413     Mutex::Locker locker(m_lock);
1414     m_mirror_image_status_state = mirror_image_status_state;
1415   }
1416
1417   // prevent the status from ping-ponging when failed replays are restarted
1418   if (mirror_image_status_state &&
1419       *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1420     status.state = *mirror_image_status_state;
1421   }
1422
1423   dout(20) << "status=" << status << dendl;
1424   librados::ObjectWriteOperation op;
1425   librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1426
1427   librados::AioCompletion *aio_comp = create_rados_callback<
1428     ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1429   int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
1430   assert(r == 0);
1431   aio_comp->release();
1432 }
1433
1434 template <typename I>
1435 void ImageReplayer<I>::handle_mirror_status_update(int r) {
1436   dout(20) << "r=" << r << dendl;
1437
1438   bool running = false;
1439   bool started = false;
1440   {
1441     Mutex::Locker locker(m_lock);
1442     bool update_status_requested = false;
1443     std::swap(update_status_requested, m_update_status_requested);
1444
1445     running = is_running_();
1446     if (running && update_status_requested) {
1447       started = start_mirror_image_status_update(false, true);
1448     }
1449   }
1450
1451   // if a deferred update is available, send it -- otherwise reschedule
1452   // the timer task
1453   if (started) {
1454     queue_mirror_image_status_update(boost::none);
1455   } else if (running) {
1456     reschedule_update_status_task();
1457   }
1458
1459   // mark committed status update as no longer in-flight
1460   finish_mirror_image_status_update();
1461 }
1462
1463 template <typename I>
1464 void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1465   dout(20) << dendl;
1466
1467   bool canceled_task = false;
1468   {
1469     Mutex::Locker locker(m_lock);
1470     Mutex::Locker timer_locker(m_threads->timer_lock);
1471
1472     if (m_update_status_task) {
1473       canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1474       m_update_status_task = nullptr;
1475     }
1476
1477     if (new_interval > 0) {
1478       m_update_status_interval = new_interval;
1479     }
1480
1481     bool restarting = (new_interval == 0 || canceled_task);
1482     if (new_interval >= 0 && is_running_() &&
1483         start_mirror_image_status_update(false, restarting)) {
1484       m_update_status_task = new FunctionContext(
1485         [this](int r) {
1486           assert(m_threads->timer_lock.is_locked());
1487           m_update_status_task = nullptr;
1488
1489           queue_mirror_image_status_update(boost::none);
1490         });
1491       m_threads->timer->add_event_after(m_update_status_interval,
1492                                         m_update_status_task);
1493     }
1494   }
1495
1496   if (canceled_task) {
1497     dout(20) << "canceled task" << dendl;
1498     finish_mirror_image_status_update();
1499   }
1500 }
1501
1502 template <typename I>
1503 void ImageReplayer<I>::shut_down(int r) {
1504   dout(20) << "r=" << r << dendl;
1505
1506   bool canceled_delayed_preprocess_task = false;
1507   {
1508     Mutex::Locker timer_locker(m_threads->timer_lock);
1509     if (m_delayed_preprocess_task != nullptr) {
1510       canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1511         m_delayed_preprocess_task);
1512       assert(canceled_delayed_preprocess_task);
1513       m_delayed_preprocess_task = nullptr;
1514     }
1515   }
1516   if (canceled_delayed_preprocess_task) {
1517     // wake up sleeping replay
1518     m_event_replay_tracker.finish_op();
1519   }
1520
1521   {
1522     Mutex::Locker locker(m_lock);
1523     assert(m_state == STATE_STOPPING);
1524
1525     // if status updates are in-flight, wait for them to complete
1526     // before proceeding
1527     if (m_in_flight_status_updates > 0) {
1528       if (m_on_update_status_finish == nullptr) {
1529         dout(20) << "waiting for in-flight status update" << dendl;
1530         m_on_update_status_finish = new FunctionContext(
1531           [this, r](int _r) {
1532             shut_down(r);
1533           });
1534       }
1535       return;
1536     }
1537   }
1538
1539   // NOTE: it's important to ensure that the local image is fully
1540   // closed before attempting to close the remote journal in
1541   // case the remote cluster is unreachable
1542
1543   // chain the shut down sequence (reverse order)
1544   Context *ctx = new FunctionContext(
1545     [this, r](int _r) {
1546       update_mirror_image_status(true, STATE_STOPPED);
1547       handle_shut_down(r);
1548     });
1549
1550   // close the remote journal
1551   if (m_remote_journaler != nullptr) {
1552     ctx = new FunctionContext([this, ctx](int r) {
1553         delete m_remote_journaler;
1554         m_remote_journaler = nullptr;
1555         ctx->complete(0);
1556       });
1557     ctx = new FunctionContext([this, ctx](int r) {
1558         m_remote_journaler->remove_listener(&m_remote_listener);
1559         m_remote_journaler->shut_down(ctx);
1560       });
1561   }
1562
1563   // stop the replay of remote journal events
1564   if (m_replay_handler != nullptr) {
1565     ctx = new FunctionContext([this, ctx](int r) {
1566         delete m_replay_handler;
1567         m_replay_handler = nullptr;
1568
1569         m_event_replay_tracker.wait_for_ops(ctx);
1570       });
1571     ctx = new FunctionContext([this, ctx](int r) {
1572         m_remote_journaler->stop_replay(ctx);
1573       });
1574   }
1575
1576   // close the local image (release exclusive lock)
1577   if (m_local_image_ctx) {
1578     ctx = new FunctionContext([this, ctx](int r) {
1579       CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1580         &m_local_image_ctx, ctx);
1581       request->send();
1582     });
1583   }
1584
1585   // shut down event replay into the local image
1586   if (m_local_journal != nullptr) {
1587     ctx = new FunctionContext([this, ctx](int r) {
1588         m_local_journal = nullptr;
1589         ctx->complete(0);
1590       });
1591     if (m_local_replay != nullptr) {
1592       ctx = new FunctionContext([this, ctx](int r) {
1593           m_local_journal->stop_external_replay();
1594           m_local_replay = nullptr;
1595
1596           EventPreprocessor<I>::destroy(m_event_preprocessor);
1597           m_event_preprocessor = nullptr;
1598           ctx->complete(0);
1599         });
1600     }
1601     ctx = new FunctionContext([this, ctx](int r) {
1602         // blocks if listener notification is in-progress
1603         m_local_journal->remove_listener(m_journal_listener);
1604         ctx->complete(0);
1605       });
1606   }
1607
1608   // wait for all local in-flight replay events to complete
1609   ctx = new FunctionContext([this, ctx](int r) {
1610       if (r < 0) {
1611         derr << "error shutting down journal replay: " << cpp_strerror(r)
1612              << dendl;
1613       }
1614
1615       m_event_replay_tracker.wait_for_ops(ctx);
1616     });
1617
1618   // flush any local in-flight replay events
1619   if (m_local_replay != nullptr) {
1620     ctx = new FunctionContext([this, ctx](int r) {
1621         m_local_replay->shut_down(true, ctx);
1622       });
1623   }
1624
1625   m_threads->work_queue->queue(ctx, 0);
1626 }
1627
1628 template <typename I>
1629 void ImageReplayer<I>::handle_shut_down(int r) {
1630   reschedule_update_status_task(-1);
1631
1632   bool unregister_asok_hook = false;
1633   {
1634     Mutex::Locker locker(m_lock);
1635
1636     // if status updates are in-flight, wait for them to complete
1637     // before proceeding
1638     if (m_in_flight_status_updates > 0) {
1639       if (m_on_update_status_finish == nullptr) {
1640         dout(20) << "waiting for in-flight status update" << dendl;
1641         m_on_update_status_finish = new FunctionContext(
1642           [this, r](int _r) {
1643             handle_shut_down(r);
1644           });
1645       }
1646       return;
1647     }
1648
1649     bool delete_requested = false;
1650     if (m_delete_requested && !m_local_image_id.empty()) {
1651       assert(m_remote_image.image_id.empty());
1652       dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
1653       delete_requested = true;
1654     }
1655     if (delete_requested || m_resync_requested) {
1656       m_image_deleter->schedule_image_delete(m_local,
1657                                              m_local_pool_id,
1658                                              m_global_image_id,
1659                                              m_resync_requested);
1660
1661       m_local_image_id = "";
1662       m_resync_requested = false;
1663       if (m_delete_requested) {
1664         unregister_asok_hook = true;
1665         m_delete_requested = false;
1666       }
1667     } else if (m_last_r == -ENOENT &&
1668                m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1669       dout(0) << "mirror image no longer exists" << dendl;
1670       unregister_asok_hook = true;
1671       m_finished = true;
1672     }
1673   }
1674
1675   if (unregister_asok_hook) {
1676     unregister_admin_socket_hook();
1677   }
1678
1679   dout(20) << "stop complete" << dendl;
1680   m_local_ioctx.close();
1681
1682   ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1683   m_replay_status_formatter = nullptr;
1684
1685   Context *on_start = nullptr;
1686   Context *on_stop = nullptr;
1687   {
1688     Mutex::Locker locker(m_lock);
1689     std::swap(on_start, m_on_start_finish);
1690     std::swap(on_stop, m_on_stop_finish);
1691     m_stop_requested = false;
1692     assert(m_delayed_preprocess_task == nullptr);
1693     assert(m_state == STATE_STOPPING);
1694     m_state = STATE_STOPPED;
1695   }
1696
1697   if (on_start != nullptr) {
1698     dout(20) << "on start finish complete, r=" << r << dendl;
1699     on_start->complete(r);
1700     r = 0;
1701   }
1702   if (on_stop != nullptr) {
1703     dout(20) << "on stop finish complete, r=" << r << dendl;
1704     on_stop->complete(r);
1705   }
1706 }
1707
1708 template <typename I>
1709 void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1710   dout(20) << dendl;
1711
1712   cls::journal::Client client;
1713   {
1714     Mutex::Locker locker(m_lock);
1715     if (!is_running_()) {
1716       return;
1717     }
1718
1719     int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1720     if (r < 0) {
1721       derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1722       return;
1723     }
1724   }
1725
1726   if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1727     dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1728     stop(nullptr, false, -ENOTCONN, "disconnected");
1729   }
1730 }
1731
1732 template <typename I>
1733 std::string ImageReplayer<I>::to_string(const State state) {
1734   switch (state) {
1735   case ImageReplayer<I>::STATE_STARTING:
1736     return "Starting";
1737   case ImageReplayer<I>::STATE_REPLAYING:
1738     return "Replaying";
1739   case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1740     return "ReplayFlushing";
1741   case ImageReplayer<I>::STATE_STOPPING:
1742     return "Stopping";
1743   case ImageReplayer<I>::STATE_STOPPED:
1744     return "Stopped";
1745   default:
1746     break;
1747   }
1748   return "Unknown(" + stringify(state) + ")";
1749 }
1750
1751 template <typename I>
1752 void ImageReplayer<I>::resync_image(Context *on_finish) {
1753   dout(20) << dendl;
1754
1755   m_resync_requested = true;
1756   stop(on_finish);
1757 }
1758
1759 template <typename I>
1760 void ImageReplayer<I>::register_admin_socket_hook() {
1761   ImageReplayerAdminSocketHook<I> *asok_hook;
1762   {
1763     Mutex::Locker locker(m_lock);
1764     if (m_asok_hook != nullptr) {
1765       return;
1766     }
1767
1768     dout(20) << "registered asok hook: " << m_name << dendl;
1769     asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1770                                                     this);
1771     int r = asok_hook->register_commands();
1772     if (r == 0) {
1773       m_asok_hook = asok_hook;
1774       return;
1775     }
1776     derr << "error registering admin socket commands" << dendl;
1777   }
1778   delete asok_hook;
1779 }
1780
1781 template <typename I>
1782 void ImageReplayer<I>::unregister_admin_socket_hook() {
1783   dout(20) << dendl;
1784
1785   AdminSocketHook *asok_hook = nullptr;
1786   {
1787     Mutex::Locker locker(m_lock);
1788     std::swap(asok_hook, m_asok_hook);
1789   }
1790   delete asok_hook;
1791 }
1792
1793 template <typename I>
1794 void ImageReplayer<I>::on_name_changed() {
1795   {
1796     Mutex::Locker locker(m_lock);
1797     std::string name = m_local_ioctx.get_pool_name() + "/" +
1798       m_local_image_ctx->name;
1799     if (m_name == name) {
1800       return;
1801     }
1802     m_name = name;
1803   }
1804   unregister_admin_socket_hook();
1805   register_admin_socket_hook();
1806 }
1807
1808 template <typename I>
1809 std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1810 {
1811   os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1812      << "/" << replayer.get_global_image_id() << "]";
1813   return os;
1814 }
1815
1816 } // namespace mirror
1817 } // namespace rbd
1818
1819 template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;