1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
5 #include "common/Formatter.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "include/stringify.h"
9 #include "cls/rbd/cls_rbd_client.h"
10 #include "common/Timer.h"
11 #include "common/WorkQueue.h"
12 #include "global/global_context.h"
13 #include "journal/Journaler.h"
14 #include "journal/ReplayHandler.h"
15 #include "journal/Settings.h"
16 #include "librbd/ExclusiveLock.h"
17 #include "librbd/ImageCtx.h"
18 #include "librbd/ImageState.h"
19 #include "librbd/Journal.h"
20 #include "librbd/Operations.h"
21 #include "librbd/Utils.h"
22 #include "librbd/journal/Replay.h"
23 #include "ImageDeleter.h"
24 #include "ImageReplayer.h"
26 #include "tools/rbd_mirror/image_replayer/BootstrapRequest.h"
27 #include "tools/rbd_mirror/image_replayer/CloseImageRequest.h"
28 #include "tools/rbd_mirror/image_replayer/EventPreprocessor.h"
29 #include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h"
30 #include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h"
31 #include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_rbd_mirror
36 #define dout_prefix *_dout << "rbd::mirror::" << *this << " " \
41 using std::unique_ptr;
42 using std::shared_ptr;
48 using librbd::util::create_context_callback;
49 using librbd::util::create_rados_callback;
50 using namespace rbd::mirror::image_replayer;
53 std::ostream &operator<<(std::ostream &os,
54 const typename ImageReplayer<I>::State &state);
59 struct ReplayHandler : public ::journal::ReplayHandler {
60 ImageReplayer<I> *replayer;
61 ReplayHandler(ImageReplayer<I> *replayer) : replayer(replayer) {}
62 void get() override {}
63 void put() override {}
65 void handle_entries_available() override {
66 replayer->handle_replay_ready();
68 void handle_complete(int r) override {
71 ss << "replay completed with error: " << cpp_strerror(r);
73 replayer->handle_replay_complete(r, ss.str());
78 class ImageReplayerAdminSocketCommand {
80 ImageReplayerAdminSocketCommand(const std::string &desc,
81 ImageReplayer<I> *replayer)
82 : desc(desc), replayer(replayer) {
84 virtual ~ImageReplayerAdminSocketCommand() {}
85 virtual bool call(Formatter *f, stringstream *ss) = 0;
88 ImageReplayer<I> *replayer;
89 bool registered = false;
93 class StatusCommand : public ImageReplayerAdminSocketCommand<I> {
95 explicit StatusCommand(const std::string &desc, ImageReplayer<I> *replayer)
96 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
99 bool call(Formatter *f, stringstream *ss) override {
100 this->replayer->print_status(f, ss);
105 template <typename I>
106 class StartCommand : public ImageReplayerAdminSocketCommand<I> {
108 explicit StartCommand(const std::string &desc, ImageReplayer<I> *replayer)
109 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
112 bool call(Formatter *f, stringstream *ss) override {
113 this->replayer->start(nullptr, true);
118 template <typename I>
119 class StopCommand : public ImageReplayerAdminSocketCommand<I> {
121 explicit StopCommand(const std::string &desc, ImageReplayer<I> *replayer)
122 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
125 bool call(Formatter *f, stringstream *ss) override {
126 this->replayer->stop(nullptr, true);
131 template <typename I>
132 class RestartCommand : public ImageReplayerAdminSocketCommand<I> {
134 explicit RestartCommand(const std::string &desc, ImageReplayer<I> *replayer)
135 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
138 bool call(Formatter *f, stringstream *ss) override {
139 this->replayer->restart();
144 template <typename I>
145 class FlushCommand : public ImageReplayerAdminSocketCommand<I> {
147 explicit FlushCommand(const std::string &desc, ImageReplayer<I> *replayer)
148 : ImageReplayerAdminSocketCommand<I>(desc, replayer) {
151 bool call(Formatter *f, stringstream *ss) override {
153 this->replayer->flush(&cond);
156 *ss << "flush: " << cpp_strerror(r);
163 template <typename I>
164 class ImageReplayerAdminSocketHook : public AdminSocketHook {
166 ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name,
167 ImageReplayer<I> *replayer)
168 : admin_socket(cct->get_admin_socket()),
169 commands{{"rbd mirror flush " + name,
170 new FlushCommand<I>("flush rbd mirror " + name, replayer)},
171 {"rbd mirror restart " + name,
172 new RestartCommand<I>("restart rbd mirror " + name, replayer)},
173 {"rbd mirror start " + name,
174 new StartCommand<I>("start rbd mirror " + name, replayer)},
175 {"rbd mirror status " + name,
176 new StatusCommand<I>("get status for rbd mirror " + name, replayer)},
177 {"rbd mirror stop " + name,
178 new StopCommand<I>("stop rbd mirror " + name, replayer)}} {
181 int register_commands() {
182 for (auto &it : commands) {
183 int r = admin_socket->register_command(it.first, it.first, this,
188 it.second->registered = true;
193 ~ImageReplayerAdminSocketHook() override {
194 for (auto &it : commands) {
195 if (it.second->registered) {
196 admin_socket->unregister_command(it.first);
203 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
204 bufferlist& out) override {
205 auto i = commands.find(command);
206 assert(i != commands.end());
207 Formatter *f = Formatter::create(format);
209 bool r = i->second->call(f, &ss);
216 typedef std::map<std::string, ImageReplayerAdminSocketCommand<I> *> Commands;
218 AdminSocket *admin_socket;
222 uint32_t calculate_replay_delay(const utime_t &event_time,
223 int mirroring_replay_delay) {
224 if (mirroring_replay_delay <= 0) {
228 utime_t now = ceph_clock_now();
229 if (event_time + mirroring_replay_delay <= now) {
233 // ensure it is rounded up when converting to integer
234 return (event_time + mirroring_replay_delay - now) + 1;
237 } // anonymous namespace
239 template <typename I>
240 void ImageReplayer<I>::BootstrapProgressContext::update_progress(
241 const std::string &description, bool flush)
243 const std::string desc = "bootstrapping, " + description;
244 replayer->set_state_description(0, desc);
246 replayer->update_mirror_image_status(false, boost::none);
250 template <typename I>
251 void ImageReplayer<I>::RemoteJournalerListener::handle_update(
252 ::journal::JournalMetadata *) {
253 FunctionContext *ctx = new FunctionContext([this](int r) {
254 replayer->handle_remote_journal_metadata_updated();
256 replayer->m_threads->work_queue->queue(ctx, 0);
259 template <typename I>
260 ImageReplayer<I>::ImageReplayer(Threads<I> *threads,
261 ImageDeleter<I>* image_deleter,
262 InstanceWatcher<I> *instance_watcher,
264 const std::string &local_mirror_uuid,
265 int64_t local_pool_id,
266 const std::string &global_image_id) :
268 m_image_deleter(image_deleter),
269 m_instance_watcher(instance_watcher),
271 m_local_mirror_uuid(local_mirror_uuid),
272 m_local_pool_id(local_pool_id),
273 m_global_image_id(global_image_id),
274 m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " +
276 m_progress_cxt(this),
277 m_journal_listener(new JournalListener(this)),
278 m_remote_listener(this)
280 // Register asok commands using a temporary "remote_pool_name/global_image_id"
281 // name. When the image name becomes known on start the asok commands will be
282 // re-registered using "remote_pool_name/remote_image_name" name.
284 std::string pool_name;
285 int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name);
287 derr << "error resolving local pool " << m_local_pool_id
288 << ": " << cpp_strerror(r) << dendl;
289 pool_name = stringify(m_local_pool_id);
292 m_name = pool_name + "/" + m_global_image_id;
293 register_admin_socket_hook();
296 template <typename I>
297 ImageReplayer<I>::~ImageReplayer()
299 unregister_admin_socket_hook();
300 assert(m_event_preprocessor == nullptr);
301 assert(m_replay_status_formatter == nullptr);
302 assert(m_local_image_ctx == nullptr);
303 assert(m_local_replay == nullptr);
304 assert(m_remote_journaler == nullptr);
305 assert(m_replay_handler == nullptr);
306 assert(m_on_start_finish == nullptr);
307 assert(m_on_stop_finish == nullptr);
308 assert(m_bootstrap_request == nullptr);
309 assert(m_in_flight_status_updates == 0);
311 delete m_journal_listener;
314 template <typename I>
315 image_replayer::HealthState ImageReplayer<I>::get_health_state() const {
316 Mutex::Locker locker(m_lock);
318 if (!m_mirror_image_status_state) {
319 return image_replayer::HEALTH_STATE_OK;
320 } else if (*m_mirror_image_status_state ==
321 cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING ||
322 *m_mirror_image_status_state ==
323 cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) {
324 return image_replayer::HEALTH_STATE_WARNING;
326 return image_replayer::HEALTH_STATE_ERROR;
329 template <typename I>
330 void ImageReplayer<I>::add_peer(const std::string &peer_uuid,
331 librados::IoCtx &io_ctx) {
332 Mutex::Locker locker(m_lock);
333 auto it = m_peers.find({peer_uuid});
334 if (it == m_peers.end()) {
335 m_peers.insert({peer_uuid, io_ctx});
339 template <typename I>
340 void ImageReplayer<I>::set_state_description(int r, const std::string &desc) {
341 dout(20) << r << " " << desc << dendl;
343 Mutex::Locker l(m_lock);
348 template <typename I>
349 void ImageReplayer<I>::start(Context *on_finish, bool manual)
351 dout(20) << "on_finish=" << on_finish << dendl;
355 Mutex::Locker locker(m_lock);
356 if (!is_stopped_()) {
357 derr << "already running" << dendl;
359 } else if (m_manual_stop && !manual) {
360 dout(5) << "stopped manually, ignoring start without manual flag"
364 m_state = STATE_STARTING;
366 m_state_desc.clear();
367 m_manual_stop = false;
368 m_delete_requested = false;
370 if (on_finish != nullptr) {
371 assert(m_on_start_finish == nullptr);
372 m_on_start_finish = on_finish;
374 assert(m_on_stop_finish == nullptr);
380 on_finish->complete(r);
385 r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx);
387 derr << "error opening ioctx for local pool " << m_local_pool_id
388 << ": " << cpp_strerror(r) << dendl;
389 on_start_fail(r, "error opening local pool");
396 template <typename I>
397 void ImageReplayer<I>::wait_for_deletion() {
400 Context *ctx = create_context_callback<
401 ImageReplayer, &ImageReplayer<I>::handle_wait_for_deletion>(this);
402 m_image_deleter->wait_for_scheduled_deletion(
403 m_local_pool_id, m_global_image_id, ctx, false);
406 template <typename I>
407 void ImageReplayer<I>::handle_wait_for_deletion(int r) {
408 dout(20) << "r=" << r << dendl;
410 if (r == -ECANCELED) {
411 on_start_fail(0, "");
414 on_start_fail(r, "error waiting for image deletion");
418 prepare_local_image();
421 template <typename I>
422 void ImageReplayer<I>::prepare_local_image() {
425 m_local_image_id = "";
426 Context *ctx = create_context_callback<
427 ImageReplayer, &ImageReplayer<I>::handle_prepare_local_image>(this);
428 auto req = PrepareLocalImageRequest<I>::create(
429 m_local_ioctx, m_global_image_id, &m_local_image_id,
430 &m_local_image_tag_owner, m_threads->work_queue, ctx);
434 template <typename I>
435 void ImageReplayer<I>::handle_prepare_local_image(int r) {
436 dout(20) << "r=" << r << dendl;
439 dout(20) << "local image does not exist" << dendl;
441 on_start_fail(r, "error preparing local image for replay");
443 } else if (m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) {
444 dout(5) << "local image is primary" << dendl;
445 on_start_fail(0, "local image is primary");
449 // local image doesn't exist or is non-primary
450 prepare_remote_image();
453 template <typename I>
454 void ImageReplayer<I>::prepare_remote_image() {
457 // TODO need to support multiple remote images
458 assert(!m_peers.empty());
459 m_remote_image = {*m_peers.begin()};
461 Context *ctx = create_context_callback<
462 ImageReplayer, &ImageReplayer<I>::handle_prepare_remote_image>(this);
463 auto req = PrepareRemoteImageRequest<I>::create(
464 m_remote_image.io_ctx, m_global_image_id, &m_remote_image.mirror_uuid,
465 &m_remote_image.image_id, ctx);
469 template <typename I>
470 void ImageReplayer<I>::handle_prepare_remote_image(int r) {
471 dout(20) << "r=" << r << dendl;
474 dout(20) << "remote image does not exist" << dendl;
476 // TODO need to support multiple remote images
477 if (!m_local_image_id.empty() &&
478 m_local_image_tag_owner == m_remote_image.mirror_uuid) {
479 // local image exists and is non-primary and linked to the missing
482 m_delete_requested = true;
483 on_start_fail(0, "remote image no longer exists");
485 on_start_fail(-ENOENT, "remote image does not exist");
489 on_start_fail(r, "error retrieving remote image id");
496 template <typename I>
497 void ImageReplayer<I>::bootstrap() {
500 CephContext *cct = static_cast<CephContext *>(m_local->cct());
501 journal::Settings settings;
502 settings.commit_interval = cct->_conf->get_val<double>(
503 "rbd_mirror_journal_commit_age");
504 settings.max_fetch_bytes = cct->_conf->get_val<uint64_t>(
505 "rbd_mirror_journal_max_fetch_bytes");
507 m_remote_journaler = new Journaler(m_threads->work_queue,
509 &m_threads->timer_lock,
510 m_remote_image.io_ctx,
511 m_remote_image.image_id,
512 m_local_mirror_uuid, settings);
514 Context *ctx = create_context_callback<
515 ImageReplayer, &ImageReplayer<I>::handle_bootstrap>(this);
517 BootstrapRequest<I> *request = BootstrapRequest<I>::create(
518 m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher,
519 &m_local_image_ctx, m_local_image_id, m_remote_image.image_id,
520 m_global_image_id, m_threads->work_queue, m_threads->timer,
521 &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid,
522 m_remote_journaler, &m_client_meta, ctx, &m_resync_requested,
526 Mutex::Locker locker(m_lock);
528 m_bootstrap_request = request;
531 update_mirror_image_status(false, boost::none);
532 reschedule_update_status_task(10);
537 template <typename I>
538 void ImageReplayer<I>::handle_bootstrap(int r) {
539 dout(20) << "r=" << r << dendl;
541 Mutex::Locker locker(m_lock);
542 m_bootstrap_request->put();
543 m_bootstrap_request = nullptr;
544 if (m_local_image_ctx) {
545 m_local_image_id = m_local_image_ctx->id;
549 if (r == -EREMOTEIO) {
550 m_local_image_tag_owner = "";
551 dout(5) << "remote image is non-primary" << dendl;
552 on_start_fail(-EREMOTEIO, "remote image is non-primary");
554 } else if (r == -EEXIST) {
555 m_local_image_tag_owner = "";
556 on_start_fail(r, "split-brain detected");
559 on_start_fail(r, "error bootstrapping replay");
561 } else if (on_start_interrupted()) {
563 } else if (m_resync_requested) {
564 on_start_fail(0, "resync requested");
568 assert(m_local_journal == nullptr);
570 RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock);
571 if (m_local_image_ctx->journal != nullptr) {
572 m_local_journal = m_local_image_ctx->journal;
573 m_local_journal->add_listener(m_journal_listener);
577 if (m_local_journal == nullptr) {
578 on_start_fail(-EINVAL, "error accessing local journal");
584 update_mirror_image_status(false, boost::none);
585 init_remote_journaler();
588 template <typename I>
589 void ImageReplayer<I>::init_remote_journaler() {
592 Context *ctx = create_context_callback<
593 ImageReplayer, &ImageReplayer<I>::handle_init_remote_journaler>(this);
594 m_remote_journaler->init(ctx);
597 template <typename I>
598 void ImageReplayer<I>::handle_init_remote_journaler(int r) {
599 dout(20) << "r=" << r << dendl;
602 derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl;
603 on_start_fail(r, "error initializing remote journal");
605 } else if (on_start_interrupted()) {
609 m_remote_journaler->add_listener(&m_remote_listener);
611 cls::journal::Client client;
612 r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
614 derr << "error retrieving remote journal client: " << cpp_strerror(r)
616 on_start_fail(r, "error retrieving remote journal client");
620 derr << "image_id=" << m_local_image_id << ", "
621 << "m_client_meta.image_id=" << m_client_meta.image_id << ", "
622 << "client.state=" << client.state << dendl;
623 if (m_client_meta.image_id == m_local_image_id &&
624 client.state != cls::journal::CLIENT_STATE_CONNECTED) {
625 dout(5) << "client flagged disconnected, stopping image replay" << dendl;
626 if (m_local_image_ctx->mirroring_resync_after_disconnect) {
627 m_resync_requested = true;
628 on_start_fail(-ENOTCONN, "disconnected: automatic resync");
630 on_start_fail(-ENOTCONN, "disconnected");
638 template <typename I>
639 void ImageReplayer<I>::start_replay() {
642 Context *start_ctx = create_context_callback<
643 ImageReplayer, &ImageReplayer<I>::handle_start_replay>(this);
644 m_local_journal->start_external_replay(&m_local_replay, start_ctx);
647 template <typename I>
648 void ImageReplayer<I>::handle_start_replay(int r) {
649 dout(20) << "r=" << r << dendl;
652 assert(m_local_replay == nullptr);
653 derr << "error starting external replay on local image "
654 << m_local_image_id << ": " << cpp_strerror(r) << dendl;
655 on_start_fail(r, "error starting replay on local image");
659 Context *on_finish(nullptr);
661 Mutex::Locker locker(m_lock);
662 assert(m_state == STATE_STARTING);
663 m_state = STATE_REPLAYING;
664 std::swap(m_on_start_finish, on_finish);
667 m_event_preprocessor = EventPreprocessor<I>::create(
668 *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid,
669 &m_client_meta, m_threads->work_queue);
670 m_replay_status_formatter =
671 ReplayStatusFormatter<I>::create(m_remote_journaler, m_local_mirror_uuid);
673 update_mirror_image_status(true, boost::none);
674 reschedule_update_status_task(30);
676 if (on_replay_interrupted()) {
681 CephContext *cct = static_cast<CephContext *>(m_local->cct());
682 double poll_seconds = cct->_conf->get_val<double>(
683 "rbd_mirror_journal_poll_age");
685 Mutex::Locker locker(m_lock);
686 m_replay_handler = new ReplayHandler<I>(this);
687 m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds);
689 dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl;
692 dout(20) << "start succeeded" << dendl;
693 if (on_finish != nullptr) {
694 dout(20) << "on finish complete, r=" << r << dendl;
695 on_finish->complete(r);
699 template <typename I>
700 void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
702 dout(20) << "r=" << r << dendl;
703 Context *ctx = new FunctionContext([this, r, desc](int _r) {
705 Mutex::Locker locker(m_lock);
706 assert(m_state == STATE_STARTING);
707 m_state = STATE_STOPPING;
708 if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) {
709 derr << "start failed: " << cpp_strerror(r) << dendl;
711 dout(20) << "start canceled" << dendl;
715 set_state_description(r, desc);
716 update_mirror_image_status(false, boost::none);
717 reschedule_update_status_task(-1);
720 m_threads->work_queue->queue(ctx, 0);
723 template <typename I>
724 bool ImageReplayer<I>::on_start_interrupted()
726 Mutex::Locker locker(m_lock);
727 assert(m_state == STATE_STARTING);
728 if (m_on_stop_finish == nullptr) {
732 on_start_fail(-ECANCELED);
736 template <typename I>
737 void ImageReplayer<I>::stop(Context *on_finish, bool manual, int r,
738 const std::string& desc)
740 dout(20) << "on_finish=" << on_finish << ", manual=" << manual
741 << ", desc=" << desc << dendl;
743 m_image_deleter->cancel_waiter(m_local_pool_id, m_global_image_id);
745 image_replayer::BootstrapRequest<I> *bootstrap_request = nullptr;
746 bool shut_down_replay = false;
749 Mutex::Locker locker(m_lock);
751 if (!is_running_()) {
754 if (!is_stopped_()) {
755 if (m_state == STATE_STARTING) {
756 dout(20) << "canceling start" << dendl;
757 if (m_bootstrap_request) {
758 bootstrap_request = m_bootstrap_request;
759 bootstrap_request->get();
762 dout(20) << "interrupting replay" << dendl;
763 shut_down_replay = true;
766 assert(m_on_stop_finish == nullptr);
767 std::swap(m_on_stop_finish, on_finish);
768 m_stop_requested = true;
769 m_manual_stop = manual;
774 // avoid holding lock since bootstrap request will update status
775 if (bootstrap_request != nullptr) {
776 bootstrap_request->cancel();
777 bootstrap_request->put();
781 dout(20) << "not running" << dendl;
783 on_finish->complete(-EINVAL);
788 if (shut_down_replay) {
789 on_stop_journal_replay(r, desc);
790 } else if (on_finish != nullptr) {
791 on_finish->complete(0);
795 template <typename I>
796 void ImageReplayer<I>::on_stop_journal_replay(int r, const std::string &desc)
798 dout(20) << "enter" << dendl;
801 Mutex::Locker locker(m_lock);
802 if (m_state != STATE_REPLAYING) {
803 // might be invoked multiple times while stopping
806 m_stop_requested = true;
807 m_state = STATE_STOPPING;
810 set_state_description(r, desc);
811 update_mirror_image_status(false, boost::none);
812 reschedule_update_status_task(-1);
816 template <typename I>
817 void ImageReplayer<I>::handle_replay_ready()
819 dout(20) << "enter" << dendl;
820 if (on_replay_interrupted()) {
824 if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) {
828 m_event_replay_tracker.start_op();
831 bool stopping = (m_state == STATE_STOPPING);
835 dout(10) << "stopping event replay" << dendl;
836 m_event_replay_tracker.finish_op();
840 if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) {
848 template <typename I>
849 void ImageReplayer<I>::restart(Context *on_finish)
851 FunctionContext *ctx = new FunctionContext(
852 [this, on_finish](int r) {
856 start(on_finish, true);
861 template <typename I>
862 void ImageReplayer<I>::flush(Context *on_finish)
864 dout(20) << "enter" << dendl;
867 Mutex::Locker locker(m_lock);
868 if (m_state == STATE_REPLAYING) {
869 Context *ctx = new FunctionContext(
871 if (on_finish != nullptr) {
872 on_finish->complete(r);
875 on_flush_local_replay_flush_start(ctx);
881 on_finish->complete(0);
885 template <typename I>
886 void ImageReplayer<I>::on_flush_local_replay_flush_start(Context *on_flush)
888 dout(20) << "enter" << dendl;
889 FunctionContext *ctx = new FunctionContext(
890 [this, on_flush](int r) {
891 on_flush_local_replay_flush_finish(on_flush, r);
894 assert(m_lock.is_locked());
895 assert(m_state == STATE_REPLAYING);
896 m_local_replay->flush(ctx);
899 template <typename I>
900 void ImageReplayer<I>::on_flush_local_replay_flush_finish(Context *on_flush,
903 dout(20) << "r=" << r << dendl;
905 derr << "error flushing local replay: " << cpp_strerror(r) << dendl;
906 on_flush->complete(r);
910 on_flush_flush_commit_position_start(on_flush);
913 template <typename I>
914 void ImageReplayer<I>::on_flush_flush_commit_position_start(Context *on_flush)
916 FunctionContext *ctx = new FunctionContext(
917 [this, on_flush](int r) {
918 on_flush_flush_commit_position_finish(on_flush, r);
921 m_remote_journaler->flush_commit_position(ctx);
924 template <typename I>
925 void ImageReplayer<I>::on_flush_flush_commit_position_finish(Context *on_flush,
929 derr << "error flushing remote journal commit position: "
930 << cpp_strerror(r) << dendl;
933 update_mirror_image_status(false, boost::none);
935 dout(20) << "flush complete, r=" << r << dendl;
936 on_flush->complete(r);
939 template <typename I>
940 bool ImageReplayer<I>::on_replay_interrupted()
944 Mutex::Locker locker(m_lock);
945 shut_down = m_stop_requested;
949 on_stop_journal_replay();
954 template <typename I>
955 void ImageReplayer<I>::print_status(Formatter *f, stringstream *ss)
957 dout(20) << "enter" << dendl;
959 Mutex::Locker l(m_lock);
962 f->open_object_section("image_replayer");
963 f->dump_string("name", m_name);
964 f->dump_string("state", to_string(m_state));
968 *ss << m_name << ": state: " << to_string(m_state);
972 template <typename I>
973 void ImageReplayer<I>::handle_replay_complete(int r, const std::string &error_desc)
975 dout(20) << "r=" << r << dendl;
977 derr << "replay encountered an error: " << cpp_strerror(r) << dendl;
978 set_state_description(r, error_desc);
982 Mutex::Locker locker(m_lock);
983 m_stop_requested = true;
985 on_replay_interrupted();
988 template <typename I>
989 void ImageReplayer<I>::replay_flush() {
992 bool interrupted = false;
994 Mutex::Locker locker(m_lock);
995 if (m_state != STATE_REPLAYING) {
996 dout(20) << "replay interrupted" << dendl;
999 m_state = STATE_REPLAY_FLUSHING;
1004 m_event_replay_tracker.finish_op();
1008 // shut down the replay to flush all IO and ops and create a new
1009 // replayer to handle the new tag epoch
1010 Context *ctx = create_context_callback<
1011 ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
1012 ctx = new FunctionContext([this, ctx](int r) {
1013 m_local_image_ctx->journal->stop_external_replay();
1014 m_local_replay = nullptr;
1021 m_local_journal->start_external_replay(&m_local_replay, ctx);
1023 m_local_replay->shut_down(false, ctx);
1026 template <typename I>
1027 void ImageReplayer<I>::handle_replay_flush(int r) {
1028 dout(20) << "r=" << r << dendl;
1031 Mutex::Locker locker(m_lock);
1032 assert(m_state == STATE_REPLAY_FLUSHING);
1033 m_state = STATE_REPLAYING;
1037 derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl;
1038 m_event_replay_tracker.finish_op();
1039 handle_replay_complete(r, "replay flush encountered an error");
1041 } else if (on_replay_interrupted()) {
1042 m_event_replay_tracker.finish_op();
1049 template <typename I>
1050 void ImageReplayer<I>::get_remote_tag() {
1051 dout(20) << "tag_tid: " << m_replay_tag_tid << dendl;
1053 Context *ctx = create_context_callback<
1054 ImageReplayer, &ImageReplayer<I>::handle_get_remote_tag>(this);
1055 m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx);
1058 template <typename I>
1059 void ImageReplayer<I>::handle_get_remote_tag(int r) {
1060 dout(20) << "r=" << r << dendl;
1064 bufferlist::iterator it = m_replay_tag.data.begin();
1065 ::decode(m_replay_tag_data, it);
1066 } catch (const buffer::error &err) {
1072 derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": "
1073 << cpp_strerror(r) << dendl;
1074 m_event_replay_tracker.finish_op();
1075 handle_replay_complete(r, "failed to retrieve remote tag");
1079 m_replay_tag_valid = true;
1080 dout(20) << "decoded remote tag " << m_replay_tag_tid << ": "
1081 << m_replay_tag_data << dendl;
1083 allocate_local_tag();
1086 template <typename I>
1087 void ImageReplayer<I>::allocate_local_tag() {
1090 std::string mirror_uuid = m_replay_tag_data.mirror_uuid;
1091 if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID ||
1092 mirror_uuid == m_local_mirror_uuid) {
1093 mirror_uuid = m_remote_image.mirror_uuid;
1094 } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) {
1095 dout(5) << "encountered image demotion: stopping" << dendl;
1096 Mutex::Locker locker(m_lock);
1097 m_stop_requested = true;
1100 librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor);
1101 if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) {
1102 predecessor.mirror_uuid = m_remote_image.mirror_uuid;
1103 } else if (predecessor.mirror_uuid == m_local_mirror_uuid) {
1104 predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID;
1107 dout(20) << "mirror_uuid=" << mirror_uuid << ", "
1108 << "predecessor_mirror_uuid=" << predecessor.mirror_uuid << ", "
1109 << "replay_tag_tid=" << m_replay_tag_tid << ", "
1110 << "replay_tag_data=" << m_replay_tag_data << dendl;
1111 Context *ctx = create_context_callback<
1112 ImageReplayer, &ImageReplayer<I>::handle_allocate_local_tag>(this);
1113 m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx);
1116 template <typename I>
1117 void ImageReplayer<I>::handle_allocate_local_tag(int r) {
1118 dout(20) << "r=" << r << dendl;
1121 derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl;
1122 m_event_replay_tracker.finish_op();
1123 handle_replay_complete(r, "failed to allocate journal tag");
1130 template <typename I>
1131 void ImageReplayer<I>::preprocess_entry() {
1132 dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid()
1135 bufferlist data = m_replay_entry.get_data();
1136 bufferlist::iterator it = data.begin();
1137 int r = m_local_replay->decode(&it, &m_event_entry);
1139 derr << "failed to decode journal event" << dendl;
1140 m_event_replay_tracker.finish_op();
1141 handle_replay_complete(r, "failed to decode journal event");
1145 uint32_t delay = calculate_replay_delay(
1146 m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay);
1148 handle_preprocess_entry_ready(0);
1152 dout(20) << "delaying replay by " << delay << " sec" << dendl;
1154 Mutex::Locker timer_locker(m_threads->timer_lock);
1155 assert(m_delayed_preprocess_task == nullptr);
1156 m_delayed_preprocess_task = new FunctionContext(
1158 assert(m_threads->timer_lock.is_locked());
1159 m_delayed_preprocess_task = nullptr;
1160 m_threads->work_queue->queue(
1161 create_context_callback<ImageReplayer,
1162 &ImageReplayer<I>::handle_preprocess_entry_ready>(this), 0);
1164 m_threads->timer->add_event_after(delay, m_delayed_preprocess_task);
1167 template <typename I>
1168 void ImageReplayer<I>::handle_preprocess_entry_ready(int r) {
1169 dout(20) << "r=" << r << dendl;
1172 if (!m_event_preprocessor->is_required(m_event_entry)) {
1177 Context *ctx = create_context_callback<
1178 ImageReplayer, &ImageReplayer<I>::handle_preprocess_entry_safe>(this);
1179 m_event_preprocessor->preprocess(&m_event_entry, ctx);
1182 template <typename I>
1183 void ImageReplayer<I>::handle_preprocess_entry_safe(int r) {
1184 dout(20) << "r=" << r << dendl;
1187 m_event_replay_tracker.finish_op();
1189 if (r == -ECANCELED) {
1190 handle_replay_complete(0, "lost exclusive lock");
1192 derr << "failed to preprocess journal event" << dendl;
1193 handle_replay_complete(r, "failed to preprocess journal event");
1201 template <typename I>
1202 void ImageReplayer<I>::process_entry() {
1203 dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid()
1206 // stop replaying events if stop has been requested
1207 if (on_replay_interrupted()) {
1208 m_event_replay_tracker.finish_op();
1212 Context *on_ready = create_context_callback<
1213 ImageReplayer, &ImageReplayer<I>::handle_process_entry_ready>(this);
1214 Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry));
1216 m_local_replay->process(m_event_entry, on_ready, on_commit);
1219 template <typename I>
1220 void ImageReplayer<I>::handle_process_entry_ready(int r) {
1226 // attempt to process the next event
1227 handle_replay_ready();
1230 template <typename I>
1231 void ImageReplayer<I>::handle_process_entry_safe(const ReplayEntry& replay_entry,
1233 dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r
1237 derr << "failed to commit journal event: " << cpp_strerror(r) << dendl;
1238 handle_replay_complete(r, "failed to commit journal event");
1240 assert(m_remote_journaler != nullptr);
1241 m_remote_journaler->committed(replay_entry);
1243 m_event_replay_tracker.finish_op();
1246 template <typename I>
1247 bool ImageReplayer<I>::update_mirror_image_status(bool force,
1248 const OptionalState &state) {
1251 Mutex::Locker locker(m_lock);
1252 if (!start_mirror_image_status_update(force, false)) {
1257 queue_mirror_image_status_update(state);
1261 template <typename I>
1262 bool ImageReplayer<I>::start_mirror_image_status_update(bool force,
1264 assert(m_lock.is_locked());
1266 if (!force && !is_stopped_()) {
1267 if (!is_running_()) {
1268 dout(20) << "shut down in-progress: ignoring update" << dendl;
1270 } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) {
1271 dout(20) << "already sending update" << dendl;
1272 m_update_status_requested = true;
1278 ++m_in_flight_status_updates;
1282 template <typename I>
1283 void ImageReplayer<I>::finish_mirror_image_status_update() {
1284 Context *on_finish = nullptr;
1286 Mutex::Locker locker(m_lock);
1287 assert(m_in_flight_status_updates > 0);
1288 if (--m_in_flight_status_updates > 0) {
1289 dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight "
1290 << "updates" << dendl;
1294 std::swap(on_finish, m_on_update_status_finish);
1298 if (on_finish != nullptr) {
1299 on_finish->complete(0);
1303 template <typename I>
1304 void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
1306 FunctionContext *ctx = new FunctionContext(
1307 [this, state](int r) {
1308 send_mirror_status_update(state);
1310 m_threads->work_queue->queue(ctx, 0);
1313 template <typename I>
1314 void ImageReplayer<I>::send_mirror_status_update(const OptionalState &opt_state) {
1316 std::string state_desc;
1318 bool stopping_replay;
1320 OptionalMirrorImageStatusState mirror_image_status_state{
1321 boost::make_optional(false, cls::rbd::MirrorImageStatusState{})};
1322 image_replayer::BootstrapRequest<I>* bootstrap_request = nullptr;
1324 Mutex::Locker locker(m_lock);
1326 state_desc = m_state_desc;
1327 mirror_image_status_state = m_mirror_image_status_state;
1329 stopping_replay = (m_local_image_ctx != nullptr);
1331 if (m_bootstrap_request != nullptr) {
1332 bootstrap_request = m_bootstrap_request;
1333 bootstrap_request->get();
1337 bool syncing = false;
1338 if (bootstrap_request != nullptr) {
1339 syncing = bootstrap_request->is_syncing();
1340 bootstrap_request->put();
1341 bootstrap_request = nullptr;
1348 cls::rbd::MirrorImageStatus status;
1351 case STATE_STARTING:
1353 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING;
1354 status.description = state_desc.empty() ? "syncing" : state_desc;
1355 mirror_image_status_state = status.state;
1357 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY;
1358 status.description = "starting replay";
1361 case STATE_REPLAYING:
1362 case STATE_REPLAY_FLUSHING:
1363 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
1365 Context *on_req_finish = new FunctionContext(
1367 dout(20) << "replay status ready: r=" << r << dendl;
1369 send_mirror_status_update(boost::none);
1370 } else if (r == -EAGAIN) {
1371 // decrement in-flight status update counter
1372 handle_mirror_status_update(r);
1377 if (!m_replay_status_formatter->get_or_send_update(&desc,
1379 dout(20) << "waiting for replay status" << dendl;
1382 status.description = "replaying, " + desc;
1383 mirror_image_status_state = boost::none;
1386 case STATE_STOPPING:
1387 if (stopping_replay) {
1388 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY;
1389 status.description = "stopping replay";
1394 if (last_r == -EREMOTEIO) {
1395 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
1396 status.description = state_desc;
1397 mirror_image_status_state = status.state;
1398 } else if (last_r < 0) {
1399 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR;
1400 status.description = state_desc;
1401 mirror_image_status_state = status.state;
1403 status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED;
1404 status.description = state_desc.empty() ? "stopped" : state_desc;
1405 mirror_image_status_state = boost::none;
1409 assert(!"invalid state");
1413 Mutex::Locker locker(m_lock);
1414 m_mirror_image_status_state = mirror_image_status_state;
1417 // prevent the status from ping-ponging when failed replays are restarted
1418 if (mirror_image_status_state &&
1419 *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) {
1420 status.state = *mirror_image_status_state;
1423 dout(20) << "status=" << status << dendl;
1424 librados::ObjectWriteOperation op;
1425 librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status);
1427 librados::AioCompletion *aio_comp = create_rados_callback<
1428 ImageReplayer<I>, &ImageReplayer<I>::handle_mirror_status_update>(this);
1429 int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op);
1431 aio_comp->release();
1434 template <typename I>
1435 void ImageReplayer<I>::handle_mirror_status_update(int r) {
1436 dout(20) << "r=" << r << dendl;
1438 bool running = false;
1439 bool started = false;
1441 Mutex::Locker locker(m_lock);
1442 bool update_status_requested = false;
1443 std::swap(update_status_requested, m_update_status_requested);
1445 running = is_running_();
1446 if (running && update_status_requested) {
1447 started = start_mirror_image_status_update(false, true);
1451 // if a deferred update is available, send it -- otherwise reschedule
1454 queue_mirror_image_status_update(boost::none);
1455 } else if (running) {
1456 reschedule_update_status_task();
1459 // mark committed status update as no longer in-flight
1460 finish_mirror_image_status_update();
1463 template <typename I>
1464 void ImageReplayer<I>::reschedule_update_status_task(int new_interval) {
1467 bool canceled_task = false;
1469 Mutex::Locker locker(m_lock);
1470 Mutex::Locker timer_locker(m_threads->timer_lock);
1472 if (m_update_status_task) {
1473 canceled_task = m_threads->timer->cancel_event(m_update_status_task);
1474 m_update_status_task = nullptr;
1477 if (new_interval > 0) {
1478 m_update_status_interval = new_interval;
1481 bool restarting = (new_interval == 0 || canceled_task);
1482 if (new_interval >= 0 && is_running_() &&
1483 start_mirror_image_status_update(false, restarting)) {
1484 m_update_status_task = new FunctionContext(
1486 assert(m_threads->timer_lock.is_locked());
1487 m_update_status_task = nullptr;
1489 queue_mirror_image_status_update(boost::none);
1491 m_threads->timer->add_event_after(m_update_status_interval,
1492 m_update_status_task);
1496 if (canceled_task) {
1497 dout(20) << "canceled task" << dendl;
1498 finish_mirror_image_status_update();
1502 template <typename I>
1503 void ImageReplayer<I>::shut_down(int r) {
1504 dout(20) << "r=" << r << dendl;
1506 bool canceled_delayed_preprocess_task = false;
1508 Mutex::Locker timer_locker(m_threads->timer_lock);
1509 if (m_delayed_preprocess_task != nullptr) {
1510 canceled_delayed_preprocess_task = m_threads->timer->cancel_event(
1511 m_delayed_preprocess_task);
1512 assert(canceled_delayed_preprocess_task);
1513 m_delayed_preprocess_task = nullptr;
1516 if (canceled_delayed_preprocess_task) {
1517 // wake up sleeping replay
1518 m_event_replay_tracker.finish_op();
1522 Mutex::Locker locker(m_lock);
1523 assert(m_state == STATE_STOPPING);
1525 // if status updates are in-flight, wait for them to complete
1526 // before proceeding
1527 if (m_in_flight_status_updates > 0) {
1528 if (m_on_update_status_finish == nullptr) {
1529 dout(20) << "waiting for in-flight status update" << dendl;
1530 m_on_update_status_finish = new FunctionContext(
1539 // NOTE: it's important to ensure that the local image is fully
1540 // closed before attempting to close the remote journal in
1541 // case the remote cluster is unreachable
1543 // chain the shut down sequence (reverse order)
1544 Context *ctx = new FunctionContext(
1546 update_mirror_image_status(true, STATE_STOPPED);
1547 handle_shut_down(r);
1550 // close the remote journal
1551 if (m_remote_journaler != nullptr) {
1552 ctx = new FunctionContext([this, ctx](int r) {
1553 delete m_remote_journaler;
1554 m_remote_journaler = nullptr;
1557 ctx = new FunctionContext([this, ctx](int r) {
1558 m_remote_journaler->remove_listener(&m_remote_listener);
1559 m_remote_journaler->shut_down(ctx);
1563 // stop the replay of remote journal events
1564 if (m_replay_handler != nullptr) {
1565 ctx = new FunctionContext([this, ctx](int r) {
1566 delete m_replay_handler;
1567 m_replay_handler = nullptr;
1569 m_event_replay_tracker.wait_for_ops(ctx);
1571 ctx = new FunctionContext([this, ctx](int r) {
1572 m_remote_journaler->stop_replay(ctx);
1576 // close the local image (release exclusive lock)
1577 if (m_local_image_ctx) {
1578 ctx = new FunctionContext([this, ctx](int r) {
1579 CloseImageRequest<I> *request = CloseImageRequest<I>::create(
1580 &m_local_image_ctx, ctx);
1585 // shut down event replay into the local image
1586 if (m_local_journal != nullptr) {
1587 ctx = new FunctionContext([this, ctx](int r) {
1588 m_local_journal = nullptr;
1591 if (m_local_replay != nullptr) {
1592 ctx = new FunctionContext([this, ctx](int r) {
1593 m_local_journal->stop_external_replay();
1594 m_local_replay = nullptr;
1596 EventPreprocessor<I>::destroy(m_event_preprocessor);
1597 m_event_preprocessor = nullptr;
1601 ctx = new FunctionContext([this, ctx](int r) {
1602 // blocks if listener notification is in-progress
1603 m_local_journal->remove_listener(m_journal_listener);
1608 // wait for all local in-flight replay events to complete
1609 ctx = new FunctionContext([this, ctx](int r) {
1611 derr << "error shutting down journal replay: " << cpp_strerror(r)
1615 m_event_replay_tracker.wait_for_ops(ctx);
1618 // flush any local in-flight replay events
1619 if (m_local_replay != nullptr) {
1620 ctx = new FunctionContext([this, ctx](int r) {
1621 m_local_replay->shut_down(true, ctx);
1625 m_threads->work_queue->queue(ctx, 0);
1628 template <typename I>
1629 void ImageReplayer<I>::handle_shut_down(int r) {
1630 reschedule_update_status_task(-1);
1632 bool unregister_asok_hook = false;
1634 Mutex::Locker locker(m_lock);
1636 // if status updates are in-flight, wait for them to complete
1637 // before proceeding
1638 if (m_in_flight_status_updates > 0) {
1639 if (m_on_update_status_finish == nullptr) {
1640 dout(20) << "waiting for in-flight status update" << dendl;
1641 m_on_update_status_finish = new FunctionContext(
1643 handle_shut_down(r);
1649 bool delete_requested = false;
1650 if (m_delete_requested && !m_local_image_id.empty()) {
1651 assert(m_remote_image.image_id.empty());
1652 dout(0) << "remote image no longer exists: scheduling deletion" << dendl;
1653 delete_requested = true;
1655 if (delete_requested || m_resync_requested) {
1656 m_image_deleter->schedule_image_delete(m_local,
1659 m_resync_requested);
1661 m_local_image_id = "";
1662 m_resync_requested = false;
1663 if (m_delete_requested) {
1664 unregister_asok_hook = true;
1665 m_delete_requested = false;
1667 } else if (m_last_r == -ENOENT &&
1668 m_local_image_id.empty() && m_remote_image.image_id.empty()) {
1669 dout(0) << "mirror image no longer exists" << dendl;
1670 unregister_asok_hook = true;
1675 if (unregister_asok_hook) {
1676 unregister_admin_socket_hook();
1679 dout(20) << "stop complete" << dendl;
1680 m_local_ioctx.close();
1682 ReplayStatusFormatter<I>::destroy(m_replay_status_formatter);
1683 m_replay_status_formatter = nullptr;
1685 Context *on_start = nullptr;
1686 Context *on_stop = nullptr;
1688 Mutex::Locker locker(m_lock);
1689 std::swap(on_start, m_on_start_finish);
1690 std::swap(on_stop, m_on_stop_finish);
1691 m_stop_requested = false;
1692 assert(m_delayed_preprocess_task == nullptr);
1693 assert(m_state == STATE_STOPPING);
1694 m_state = STATE_STOPPED;
1697 if (on_start != nullptr) {
1698 dout(20) << "on start finish complete, r=" << r << dendl;
1699 on_start->complete(r);
1702 if (on_stop != nullptr) {
1703 dout(20) << "on stop finish complete, r=" << r << dendl;
1704 on_stop->complete(r);
1708 template <typename I>
1709 void ImageReplayer<I>::handle_remote_journal_metadata_updated() {
1712 cls::journal::Client client;
1714 Mutex::Locker locker(m_lock);
1715 if (!is_running_()) {
1719 int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client);
1721 derr << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1726 if (client.state != cls::journal::CLIENT_STATE_CONNECTED) {
1727 dout(0) << "client flagged disconnected, stopping image replay" << dendl;
1728 stop(nullptr, false, -ENOTCONN, "disconnected");
1732 template <typename I>
1733 std::string ImageReplayer<I>::to_string(const State state) {
1735 case ImageReplayer<I>::STATE_STARTING:
1737 case ImageReplayer<I>::STATE_REPLAYING:
1739 case ImageReplayer<I>::STATE_REPLAY_FLUSHING:
1740 return "ReplayFlushing";
1741 case ImageReplayer<I>::STATE_STOPPING:
1743 case ImageReplayer<I>::STATE_STOPPED:
1748 return "Unknown(" + stringify(state) + ")";
1751 template <typename I>
1752 void ImageReplayer<I>::resync_image(Context *on_finish) {
1755 m_resync_requested = true;
1759 template <typename I>
1760 void ImageReplayer<I>::register_admin_socket_hook() {
1761 ImageReplayerAdminSocketHook<I> *asok_hook;
1763 Mutex::Locker locker(m_lock);
1764 if (m_asok_hook != nullptr) {
1768 dout(20) << "registered asok hook: " << m_name << dendl;
1769 asok_hook = new ImageReplayerAdminSocketHook<I>(g_ceph_context, m_name,
1771 int r = asok_hook->register_commands();
1773 m_asok_hook = asok_hook;
1776 derr << "error registering admin socket commands" << dendl;
1781 template <typename I>
1782 void ImageReplayer<I>::unregister_admin_socket_hook() {
1785 AdminSocketHook *asok_hook = nullptr;
1787 Mutex::Locker locker(m_lock);
1788 std::swap(asok_hook, m_asok_hook);
1793 template <typename I>
1794 void ImageReplayer<I>::on_name_changed() {
1796 Mutex::Locker locker(m_lock);
1797 std::string name = m_local_ioctx.get_pool_name() + "/" +
1798 m_local_image_ctx->name;
1799 if (m_name == name) {
1804 unregister_admin_socket_hook();
1805 register_admin_socket_hook();
1808 template <typename I>
1809 std::ostream &operator<<(std::ostream &os, const ImageReplayer<I> &replayer)
1811 os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id()
1812 << "/" << replayer.get_global_image_id() << "]";
1816 } // namespace mirror
1819 template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;