X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FImageReplayer.cc;fp=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FImageReplayer.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=bf77e9db7bdb1b5832626b1615c1b5a2e2a9852b;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/tools/rbd_mirror/ImageReplayer.cc b/src/ceph/src/tools/rbd_mirror/ImageReplayer.cc deleted file mode 100644 index bf77e9d..0000000 --- a/src/ceph/src/tools/rbd_mirror/ImageReplayer.cc +++ /dev/null @@ -1,1819 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "include/compat.h" -#include "common/Formatter.h" -#include "common/debug.h" -#include "common/errno.h" -#include "include/stringify.h" -#include "cls/rbd/cls_rbd_client.h" -#include "common/Timer.h" -#include "common/WorkQueue.h" -#include "global/global_context.h" -#include "journal/Journaler.h" -#include "journal/ReplayHandler.h" -#include "journal/Settings.h" -#include "librbd/ExclusiveLock.h" -#include "librbd/ImageCtx.h" -#include "librbd/ImageState.h" -#include "librbd/Journal.h" -#include "librbd/Operations.h" -#include "librbd/Utils.h" -#include "librbd/journal/Replay.h" -#include "ImageDeleter.h" -#include "ImageReplayer.h" -#include "Threads.h" -#include "tools/rbd_mirror/image_replayer/BootstrapRequest.h" -#include "tools/rbd_mirror/image_replayer/CloseImageRequest.h" -#include "tools/rbd_mirror/image_replayer/EventPreprocessor.h" -#include "tools/rbd_mirror/image_replayer/PrepareLocalImageRequest.h" -#include "tools/rbd_mirror/image_replayer/PrepareRemoteImageRequest.h" -#include "tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h" - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_rbd_mirror -#undef dout_prefix -#define dout_prefix *_dout << "rbd::mirror::" << *this << " " \ - << __func__ << ": " - -using std::map; -using std::string; -using std::unique_ptr; -using std::shared_ptr; -using std::vector; - -namespace rbd { -namespace mirror { - -using librbd::util::create_context_callback; -using librbd::util::create_rados_callback; -using namespace rbd::mirror::image_replayer; - -template -std::ostream &operator<<(std::ostream &os, - const typename ImageReplayer::State &state); - -namespace { - -template -struct ReplayHandler : public ::journal::ReplayHandler { - ImageReplayer *replayer; - ReplayHandler(ImageReplayer *replayer) : replayer(replayer) {} - void get() override {} - void put() override {} - - void handle_entries_available() override { - replayer->handle_replay_ready(); - } - void handle_complete(int r) override { - std::stringstream ss; - if (r < 0) { - ss << "replay completed with error: " << cpp_strerror(r); - } - replayer->handle_replay_complete(r, ss.str()); - } -}; - -template -class ImageReplayerAdminSocketCommand { -public: - ImageReplayerAdminSocketCommand(const std::string &desc, - ImageReplayer *replayer) - : desc(desc), replayer(replayer) { - } - virtual ~ImageReplayerAdminSocketCommand() {} - virtual bool call(Formatter *f, stringstream *ss) = 0; - - std::string desc; - ImageReplayer *replayer; - bool registered = false; -}; - -template -class StatusCommand : public ImageReplayerAdminSocketCommand { -public: - explicit StatusCommand(const std::string &desc, ImageReplayer *replayer) - : ImageReplayerAdminSocketCommand(desc, replayer) { - } - - bool call(Formatter *f, stringstream *ss) override { - this->replayer->print_status(f, ss); - return true; - } -}; - -template -class StartCommand : public ImageReplayerAdminSocketCommand { -public: - explicit StartCommand(const std::string &desc, ImageReplayer *replayer) - : ImageReplayerAdminSocketCommand(desc, replayer) { - } - - bool call(Formatter *f, stringstream *ss) override { - this->replayer->start(nullptr, true); - return true; - } -}; - -template -class StopCommand : public ImageReplayerAdminSocketCommand { -public: - explicit StopCommand(const std::string &desc, ImageReplayer *replayer) - : ImageReplayerAdminSocketCommand(desc, replayer) { - } - - bool call(Formatter *f, stringstream *ss) override { - this->replayer->stop(nullptr, true); - return true; - } -}; - -template -class RestartCommand : public ImageReplayerAdminSocketCommand { -public: - explicit RestartCommand(const std::string &desc, ImageReplayer *replayer) - : ImageReplayerAdminSocketCommand(desc, replayer) { - } - - bool call(Formatter *f, stringstream *ss) override { - this->replayer->restart(); - return true; - } -}; - -template -class FlushCommand : public ImageReplayerAdminSocketCommand { -public: - explicit FlushCommand(const std::string &desc, ImageReplayer *replayer) - : ImageReplayerAdminSocketCommand(desc, replayer) { - } - - bool call(Formatter *f, stringstream *ss) override { - C_SaferCond cond; - this->replayer->flush(&cond); - int r = cond.wait(); - if (r < 0) { - *ss << "flush: " << cpp_strerror(r); - return false; - } - return true; - } -}; - -template -class ImageReplayerAdminSocketHook : public AdminSocketHook { -public: - ImageReplayerAdminSocketHook(CephContext *cct, const std::string &name, - ImageReplayer *replayer) - : admin_socket(cct->get_admin_socket()), - commands{{"rbd mirror flush " + name, - new FlushCommand("flush rbd mirror " + name, replayer)}, - {"rbd mirror restart " + name, - new RestartCommand("restart rbd mirror " + name, replayer)}, - {"rbd mirror start " + name, - new StartCommand("start rbd mirror " + name, replayer)}, - {"rbd mirror status " + name, - new StatusCommand("get status for rbd mirror " + name, replayer)}, - {"rbd mirror stop " + name, - new StopCommand("stop rbd mirror " + name, replayer)}} { - } - - int register_commands() { - for (auto &it : commands) { - int r = admin_socket->register_command(it.first, it.first, this, - it.second->desc); - if (r < 0) { - return r; - } - it.second->registered = true; - } - return 0; - } - - ~ImageReplayerAdminSocketHook() override { - for (auto &it : commands) { - if (it.second->registered) { - admin_socket->unregister_command(it.first); - } - delete it.second; - } - commands.clear(); - } - - bool call(std::string command, cmdmap_t& cmdmap, std::string format, - bufferlist& out) override { - auto i = commands.find(command); - assert(i != commands.end()); - Formatter *f = Formatter::create(format); - stringstream ss; - bool r = i->second->call(f, &ss); - delete f; - out.append(ss); - return r; - } - -private: - typedef std::map *> Commands; - - AdminSocket *admin_socket; - Commands commands; -}; - -uint32_t calculate_replay_delay(const utime_t &event_time, - int mirroring_replay_delay) { - if (mirroring_replay_delay <= 0) { - return 0; - } - - utime_t now = ceph_clock_now(); - if (event_time + mirroring_replay_delay <= now) { - return 0; - } - - // ensure it is rounded up when converting to integer - return (event_time + mirroring_replay_delay - now) + 1; -} - -} // anonymous namespace - -template -void ImageReplayer::BootstrapProgressContext::update_progress( - const std::string &description, bool flush) -{ - const std::string desc = "bootstrapping, " + description; - replayer->set_state_description(0, desc); - if (flush) { - replayer->update_mirror_image_status(false, boost::none); - } -} - -template -void ImageReplayer::RemoteJournalerListener::handle_update( - ::journal::JournalMetadata *) { - FunctionContext *ctx = new FunctionContext([this](int r) { - replayer->handle_remote_journal_metadata_updated(); - }); - replayer->m_threads->work_queue->queue(ctx, 0); -} - -template -ImageReplayer::ImageReplayer(Threads *threads, - ImageDeleter* image_deleter, - InstanceWatcher *instance_watcher, - RadosRef local, - const std::string &local_mirror_uuid, - int64_t local_pool_id, - const std::string &global_image_id) : - m_threads(threads), - m_image_deleter(image_deleter), - m_instance_watcher(instance_watcher), - m_local(local), - m_local_mirror_uuid(local_mirror_uuid), - m_local_pool_id(local_pool_id), - m_global_image_id(global_image_id), - m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " + - global_image_id), - m_progress_cxt(this), - m_journal_listener(new JournalListener(this)), - m_remote_listener(this) -{ - // Register asok commands using a temporary "remote_pool_name/global_image_id" - // name. When the image name becomes known on start the asok commands will be - // re-registered using "remote_pool_name/remote_image_name" name. - - std::string pool_name; - int r = m_local->pool_reverse_lookup(m_local_pool_id, &pool_name); - if (r < 0) { - derr << "error resolving local pool " << m_local_pool_id - << ": " << cpp_strerror(r) << dendl; - pool_name = stringify(m_local_pool_id); - } - - m_name = pool_name + "/" + m_global_image_id; - register_admin_socket_hook(); -} - -template -ImageReplayer::~ImageReplayer() -{ - unregister_admin_socket_hook(); - assert(m_event_preprocessor == nullptr); - assert(m_replay_status_formatter == nullptr); - assert(m_local_image_ctx == nullptr); - assert(m_local_replay == nullptr); - assert(m_remote_journaler == nullptr); - assert(m_replay_handler == nullptr); - assert(m_on_start_finish == nullptr); - assert(m_on_stop_finish == nullptr); - assert(m_bootstrap_request == nullptr); - assert(m_in_flight_status_updates == 0); - - delete m_journal_listener; -} - -template -image_replayer::HealthState ImageReplayer::get_health_state() const { - Mutex::Locker locker(m_lock); - - if (!m_mirror_image_status_state) { - return image_replayer::HEALTH_STATE_OK; - } else if (*m_mirror_image_status_state == - cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING || - *m_mirror_image_status_state == - cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN) { - return image_replayer::HEALTH_STATE_WARNING; - } - return image_replayer::HEALTH_STATE_ERROR; -} - -template -void ImageReplayer::add_peer(const std::string &peer_uuid, - librados::IoCtx &io_ctx) { - Mutex::Locker locker(m_lock); - auto it = m_peers.find({peer_uuid}); - if (it == m_peers.end()) { - m_peers.insert({peer_uuid, io_ctx}); - } -} - -template -void ImageReplayer::set_state_description(int r, const std::string &desc) { - dout(20) << r << " " << desc << dendl; - - Mutex::Locker l(m_lock); - m_last_r = r; - m_state_desc = desc; -} - -template -void ImageReplayer::start(Context *on_finish, bool manual) -{ - dout(20) << "on_finish=" << on_finish << dendl; - - int r = 0; - { - Mutex::Locker locker(m_lock); - if (!is_stopped_()) { - derr << "already running" << dendl; - r = -EINVAL; - } else if (m_manual_stop && !manual) { - dout(5) << "stopped manually, ignoring start without manual flag" - << dendl; - r = -EPERM; - } else { - m_state = STATE_STARTING; - m_last_r = 0; - m_state_desc.clear(); - m_manual_stop = false; - m_delete_requested = false; - - if (on_finish != nullptr) { - assert(m_on_start_finish == nullptr); - m_on_start_finish = on_finish; - } - assert(m_on_stop_finish == nullptr); - } - } - - if (r < 0) { - if (on_finish) { - on_finish->complete(r); - } - return; - } - - r = m_local->ioctx_create2(m_local_pool_id, m_local_ioctx); - if (r < 0) { - derr << "error opening ioctx for local pool " << m_local_pool_id - << ": " << cpp_strerror(r) << dendl; - on_start_fail(r, "error opening local pool"); - return; - } - - wait_for_deletion(); -} - -template -void ImageReplayer::wait_for_deletion() { - dout(20) << dendl; - - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_wait_for_deletion>(this); - m_image_deleter->wait_for_scheduled_deletion( - m_local_pool_id, m_global_image_id, ctx, false); -} - -template -void ImageReplayer::handle_wait_for_deletion(int r) { - dout(20) << "r=" << r << dendl; - - if (r == -ECANCELED) { - on_start_fail(0, ""); - return; - } else if (r < 0) { - on_start_fail(r, "error waiting for image deletion"); - return; - } - - prepare_local_image(); -} - -template -void ImageReplayer::prepare_local_image() { - dout(20) << dendl; - - m_local_image_id = ""; - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_prepare_local_image>(this); - auto req = PrepareLocalImageRequest::create( - m_local_ioctx, m_global_image_id, &m_local_image_id, - &m_local_image_tag_owner, m_threads->work_queue, ctx); - req->send(); -} - -template -void ImageReplayer::handle_prepare_local_image(int r) { - dout(20) << "r=" << r << dendl; - - if (r == -ENOENT) { - dout(20) << "local image does not exist" << dendl; - } else if (r < 0) { - on_start_fail(r, "error preparing local image for replay"); - return; - } else if (m_local_image_tag_owner == librbd::Journal<>::LOCAL_MIRROR_UUID) { - dout(5) << "local image is primary" << dendl; - on_start_fail(0, "local image is primary"); - return; - } - - // local image doesn't exist or is non-primary - prepare_remote_image(); -} - -template -void ImageReplayer::prepare_remote_image() { - dout(20) << dendl; - - // TODO need to support multiple remote images - assert(!m_peers.empty()); - m_remote_image = {*m_peers.begin()}; - - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_prepare_remote_image>(this); - auto req = PrepareRemoteImageRequest::create( - m_remote_image.io_ctx, m_global_image_id, &m_remote_image.mirror_uuid, - &m_remote_image.image_id, ctx); - req->send(); -} - -template -void ImageReplayer::handle_prepare_remote_image(int r) { - dout(20) << "r=" << r << dendl; - - if (r == -ENOENT) { - dout(20) << "remote image does not exist" << dendl; - - // TODO need to support multiple remote images - if (!m_local_image_id.empty() && - m_local_image_tag_owner == m_remote_image.mirror_uuid) { - // local image exists and is non-primary and linked to the missing - // remote image - - m_delete_requested = true; - on_start_fail(0, "remote image no longer exists"); - } else { - on_start_fail(-ENOENT, "remote image does not exist"); - } - return; - } else if (r < 0) { - on_start_fail(r, "error retrieving remote image id"); - return; - } - - bootstrap(); -} - -template -void ImageReplayer::bootstrap() { - dout(20) << dendl; - - CephContext *cct = static_cast(m_local->cct()); - journal::Settings settings; - settings.commit_interval = cct->_conf->get_val( - "rbd_mirror_journal_commit_age"); - settings.max_fetch_bytes = cct->_conf->get_val( - "rbd_mirror_journal_max_fetch_bytes"); - - m_remote_journaler = new Journaler(m_threads->work_queue, - m_threads->timer, - &m_threads->timer_lock, - m_remote_image.io_ctx, - m_remote_image.image_id, - m_local_mirror_uuid, settings); - - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_bootstrap>(this); - - BootstrapRequest *request = BootstrapRequest::create( - m_local_ioctx, m_remote_image.io_ctx, m_instance_watcher, - &m_local_image_ctx, m_local_image_id, m_remote_image.image_id, - m_global_image_id, m_threads->work_queue, m_threads->timer, - &m_threads->timer_lock, m_local_mirror_uuid, m_remote_image.mirror_uuid, - m_remote_journaler, &m_client_meta, ctx, &m_resync_requested, - &m_progress_cxt); - - { - Mutex::Locker locker(m_lock); - request->get(); - m_bootstrap_request = request; - } - - update_mirror_image_status(false, boost::none); - reschedule_update_status_task(10); - - request->send(); -} - -template -void ImageReplayer::handle_bootstrap(int r) { - dout(20) << "r=" << r << dendl; - { - Mutex::Locker locker(m_lock); - m_bootstrap_request->put(); - m_bootstrap_request = nullptr; - if (m_local_image_ctx) { - m_local_image_id = m_local_image_ctx->id; - } - } - - if (r == -EREMOTEIO) { - m_local_image_tag_owner = ""; - dout(5) << "remote image is non-primary" << dendl; - on_start_fail(-EREMOTEIO, "remote image is non-primary"); - return; - } else if (r == -EEXIST) { - m_local_image_tag_owner = ""; - on_start_fail(r, "split-brain detected"); - return; - } else if (r < 0) { - on_start_fail(r, "error bootstrapping replay"); - return; - } else if (on_start_interrupted()) { - return; - } else if (m_resync_requested) { - on_start_fail(0, "resync requested"); - return; - } - - assert(m_local_journal == nullptr); - { - RWLock::RLocker snap_locker(m_local_image_ctx->snap_lock); - if (m_local_image_ctx->journal != nullptr) { - m_local_journal = m_local_image_ctx->journal; - m_local_journal->add_listener(m_journal_listener); - } - } - - if (m_local_journal == nullptr) { - on_start_fail(-EINVAL, "error accessing local journal"); - return; - } - - on_name_changed(); - - update_mirror_image_status(false, boost::none); - init_remote_journaler(); -} - -template -void ImageReplayer::init_remote_journaler() { - dout(20) << dendl; - - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_init_remote_journaler>(this); - m_remote_journaler->init(ctx); -} - -template -void ImageReplayer::handle_init_remote_journaler(int r) { - dout(20) << "r=" << r << dendl; - - if (r < 0) { - derr << "failed to initialize remote journal: " << cpp_strerror(r) << dendl; - on_start_fail(r, "error initializing remote journal"); - return; - } else if (on_start_interrupted()) { - return; - } - - m_remote_journaler->add_listener(&m_remote_listener); - - cls::journal::Client client; - r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client); - if (r < 0) { - derr << "error retrieving remote journal client: " << cpp_strerror(r) - << dendl; - on_start_fail(r, "error retrieving remote journal client"); - return; - } - - derr << "image_id=" << m_local_image_id << ", " - << "m_client_meta.image_id=" << m_client_meta.image_id << ", " - << "client.state=" << client.state << dendl; - if (m_client_meta.image_id == m_local_image_id && - client.state != cls::journal::CLIENT_STATE_CONNECTED) { - dout(5) << "client flagged disconnected, stopping image replay" << dendl; - if (m_local_image_ctx->mirroring_resync_after_disconnect) { - m_resync_requested = true; - on_start_fail(-ENOTCONN, "disconnected: automatic resync"); - } else { - on_start_fail(-ENOTCONN, "disconnected"); - } - return; - } - - start_replay(); -} - -template -void ImageReplayer::start_replay() { - dout(20) << dendl; - - Context *start_ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_start_replay>(this); - m_local_journal->start_external_replay(&m_local_replay, start_ctx); -} - -template -void ImageReplayer::handle_start_replay(int r) { - dout(20) << "r=" << r << dendl; - - if (r < 0) { - assert(m_local_replay == nullptr); - derr << "error starting external replay on local image " - << m_local_image_id << ": " << cpp_strerror(r) << dendl; - on_start_fail(r, "error starting replay on local image"); - return; - } - - Context *on_finish(nullptr); - { - Mutex::Locker locker(m_lock); - assert(m_state == STATE_STARTING); - m_state = STATE_REPLAYING; - std::swap(m_on_start_finish, on_finish); - } - - m_event_preprocessor = EventPreprocessor::create( - *m_local_image_ctx, *m_remote_journaler, m_local_mirror_uuid, - &m_client_meta, m_threads->work_queue); - m_replay_status_formatter = - ReplayStatusFormatter::create(m_remote_journaler, m_local_mirror_uuid); - - update_mirror_image_status(true, boost::none); - reschedule_update_status_task(30); - - if (on_replay_interrupted()) { - return; - } - - { - CephContext *cct = static_cast(m_local->cct()); - double poll_seconds = cct->_conf->get_val( - "rbd_mirror_journal_poll_age"); - - Mutex::Locker locker(m_lock); - m_replay_handler = new ReplayHandler(this); - m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds); - - dout(20) << "m_remote_journaler=" << *m_remote_journaler << dendl; - } - - dout(20) << "start succeeded" << dendl; - if (on_finish != nullptr) { - dout(20) << "on finish complete, r=" << r << dendl; - on_finish->complete(r); - } -} - -template -void ImageReplayer::on_start_fail(int r, const std::string &desc) -{ - dout(20) << "r=" << r << dendl; - Context *ctx = new FunctionContext([this, r, desc](int _r) { - { - Mutex::Locker locker(m_lock); - assert(m_state == STATE_STARTING); - m_state = STATE_STOPPING; - if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) { - derr << "start failed: " << cpp_strerror(r) << dendl; - } else { - dout(20) << "start canceled" << dendl; - } - } - - set_state_description(r, desc); - update_mirror_image_status(false, boost::none); - reschedule_update_status_task(-1); - shut_down(r); - }); - m_threads->work_queue->queue(ctx, 0); -} - -template -bool ImageReplayer::on_start_interrupted() -{ - Mutex::Locker locker(m_lock); - assert(m_state == STATE_STARTING); - if (m_on_stop_finish == nullptr) { - return false; - } - - on_start_fail(-ECANCELED); - return true; -} - -template -void ImageReplayer::stop(Context *on_finish, bool manual, int r, - const std::string& desc) -{ - dout(20) << "on_finish=" << on_finish << ", manual=" << manual - << ", desc=" << desc << dendl; - - m_image_deleter->cancel_waiter(m_local_pool_id, m_global_image_id); - - image_replayer::BootstrapRequest *bootstrap_request = nullptr; - bool shut_down_replay = false; - bool running = true; - { - Mutex::Locker locker(m_lock); - - if (!is_running_()) { - running = false; - } else { - if (!is_stopped_()) { - if (m_state == STATE_STARTING) { - dout(20) << "canceling start" << dendl; - if (m_bootstrap_request) { - bootstrap_request = m_bootstrap_request; - bootstrap_request->get(); - } - } else { - dout(20) << "interrupting replay" << dendl; - shut_down_replay = true; - } - - assert(m_on_stop_finish == nullptr); - std::swap(m_on_stop_finish, on_finish); - m_stop_requested = true; - m_manual_stop = manual; - } - } - } - - // avoid holding lock since bootstrap request will update status - if (bootstrap_request != nullptr) { - bootstrap_request->cancel(); - bootstrap_request->put(); - } - - if (!running) { - dout(20) << "not running" << dendl; - if (on_finish) { - on_finish->complete(-EINVAL); - } - return; - } - - if (shut_down_replay) { - on_stop_journal_replay(r, desc); - } else if (on_finish != nullptr) { - on_finish->complete(0); - } -} - -template -void ImageReplayer::on_stop_journal_replay(int r, const std::string &desc) -{ - dout(20) << "enter" << dendl; - - { - Mutex::Locker locker(m_lock); - if (m_state != STATE_REPLAYING) { - // might be invoked multiple times while stopping - return; - } - m_stop_requested = true; - m_state = STATE_STOPPING; - } - - set_state_description(r, desc); - update_mirror_image_status(false, boost::none); - reschedule_update_status_task(-1); - shut_down(0); -} - -template -void ImageReplayer::handle_replay_ready() -{ - dout(20) << "enter" << dendl; - if (on_replay_interrupted()) { - return; - } - - if (!m_remote_journaler->try_pop_front(&m_replay_entry, &m_replay_tag_tid)) { - return; - } - - m_event_replay_tracker.start_op(); - - m_lock.Lock(); - bool stopping = (m_state == STATE_STOPPING); - m_lock.Unlock(); - - if (stopping) { - dout(10) << "stopping event replay" << dendl; - m_event_replay_tracker.finish_op(); - return; - } - - if (m_replay_tag_valid && m_replay_tag.tid == m_replay_tag_tid) { - preprocess_entry(); - return; - } - - replay_flush(); -} - -template -void ImageReplayer::restart(Context *on_finish) -{ - FunctionContext *ctx = new FunctionContext( - [this, on_finish](int r) { - if (r < 0) { - // Try start anyway. - } - start(on_finish, true); - }); - stop(ctx); -} - -template -void ImageReplayer::flush(Context *on_finish) -{ - dout(20) << "enter" << dendl; - - { - Mutex::Locker locker(m_lock); - if (m_state == STATE_REPLAYING) { - Context *ctx = new FunctionContext( - [on_finish](int r) { - if (on_finish != nullptr) { - on_finish->complete(r); - } - }); - on_flush_local_replay_flush_start(ctx); - return; - } - } - - if (on_finish) { - on_finish->complete(0); - } -} - -template -void ImageReplayer::on_flush_local_replay_flush_start(Context *on_flush) -{ - dout(20) << "enter" << dendl; - FunctionContext *ctx = new FunctionContext( - [this, on_flush](int r) { - on_flush_local_replay_flush_finish(on_flush, r); - }); - - assert(m_lock.is_locked()); - assert(m_state == STATE_REPLAYING); - m_local_replay->flush(ctx); -} - -template -void ImageReplayer::on_flush_local_replay_flush_finish(Context *on_flush, - int r) -{ - dout(20) << "r=" << r << dendl; - if (r < 0) { - derr << "error flushing local replay: " << cpp_strerror(r) << dendl; - on_flush->complete(r); - return; - } - - on_flush_flush_commit_position_start(on_flush); -} - -template -void ImageReplayer::on_flush_flush_commit_position_start(Context *on_flush) -{ - FunctionContext *ctx = new FunctionContext( - [this, on_flush](int r) { - on_flush_flush_commit_position_finish(on_flush, r); - }); - - m_remote_journaler->flush_commit_position(ctx); -} - -template -void ImageReplayer::on_flush_flush_commit_position_finish(Context *on_flush, - int r) -{ - if (r < 0) { - derr << "error flushing remote journal commit position: " - << cpp_strerror(r) << dendl; - } - - update_mirror_image_status(false, boost::none); - - dout(20) << "flush complete, r=" << r << dendl; - on_flush->complete(r); -} - -template -bool ImageReplayer::on_replay_interrupted() -{ - bool shut_down; - { - Mutex::Locker locker(m_lock); - shut_down = m_stop_requested; - } - - if (shut_down) { - on_stop_journal_replay(); - } - return shut_down; -} - -template -void ImageReplayer::print_status(Formatter *f, stringstream *ss) -{ - dout(20) << "enter" << dendl; - - Mutex::Locker l(m_lock); - - if (f) { - f->open_object_section("image_replayer"); - f->dump_string("name", m_name); - f->dump_string("state", to_string(m_state)); - f->close_section(); - f->flush(*ss); - } else { - *ss << m_name << ": state: " << to_string(m_state); - } -} - -template -void ImageReplayer::handle_replay_complete(int r, const std::string &error_desc) -{ - dout(20) << "r=" << r << dendl; - if (r < 0) { - derr << "replay encountered an error: " << cpp_strerror(r) << dendl; - set_state_description(r, error_desc); - } - - { - Mutex::Locker locker(m_lock); - m_stop_requested = true; - } - on_replay_interrupted(); -} - -template -void ImageReplayer::replay_flush() { - dout(20) << dendl; - - bool interrupted = false; - { - Mutex::Locker locker(m_lock); - if (m_state != STATE_REPLAYING) { - dout(20) << "replay interrupted" << dendl; - interrupted = true; - } else { - m_state = STATE_REPLAY_FLUSHING; - } - } - - if (interrupted) { - m_event_replay_tracker.finish_op(); - return; - } - - // shut down the replay to flush all IO and ops and create a new - // replayer to handle the new tag epoch - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_replay_flush>(this); - ctx = new FunctionContext([this, ctx](int r) { - m_local_image_ctx->journal->stop_external_replay(); - m_local_replay = nullptr; - - if (r < 0) { - ctx->complete(r); - return; - } - - m_local_journal->start_external_replay(&m_local_replay, ctx); - }); - m_local_replay->shut_down(false, ctx); -} - -template -void ImageReplayer::handle_replay_flush(int r) { - dout(20) << "r=" << r << dendl; - - { - Mutex::Locker locker(m_lock); - assert(m_state == STATE_REPLAY_FLUSHING); - m_state = STATE_REPLAYING; - } - - if (r < 0) { - derr << "replay flush encountered an error: " << cpp_strerror(r) << dendl; - m_event_replay_tracker.finish_op(); - handle_replay_complete(r, "replay flush encountered an error"); - return; - } else if (on_replay_interrupted()) { - m_event_replay_tracker.finish_op(); - return; - } - - get_remote_tag(); -} - -template -void ImageReplayer::get_remote_tag() { - dout(20) << "tag_tid: " << m_replay_tag_tid << dendl; - - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_get_remote_tag>(this); - m_remote_journaler->get_tag(m_replay_tag_tid, &m_replay_tag, ctx); -} - -template -void ImageReplayer::handle_get_remote_tag(int r) { - dout(20) << "r=" << r << dendl; - - if (r == 0) { - try { - bufferlist::iterator it = m_replay_tag.data.begin(); - ::decode(m_replay_tag_data, it); - } catch (const buffer::error &err) { - r = -EBADMSG; - } - } - - if (r < 0) { - derr << "failed to retrieve remote tag " << m_replay_tag_tid << ": " - << cpp_strerror(r) << dendl; - m_event_replay_tracker.finish_op(); - handle_replay_complete(r, "failed to retrieve remote tag"); - return; - } - - m_replay_tag_valid = true; - dout(20) << "decoded remote tag " << m_replay_tag_tid << ": " - << m_replay_tag_data << dendl; - - allocate_local_tag(); -} - -template -void ImageReplayer::allocate_local_tag() { - dout(20) << dendl; - - std::string mirror_uuid = m_replay_tag_data.mirror_uuid; - if (mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID || - mirror_uuid == m_local_mirror_uuid) { - mirror_uuid = m_remote_image.mirror_uuid; - } else if (mirror_uuid == librbd::Journal<>::ORPHAN_MIRROR_UUID) { - dout(5) << "encountered image demotion: stopping" << dendl; - Mutex::Locker locker(m_lock); - m_stop_requested = true; - } - - librbd::journal::TagPredecessor predecessor(m_replay_tag_data.predecessor); - if (predecessor.mirror_uuid == librbd::Journal<>::LOCAL_MIRROR_UUID) { - predecessor.mirror_uuid = m_remote_image.mirror_uuid; - } else if (predecessor.mirror_uuid == m_local_mirror_uuid) { - predecessor.mirror_uuid = librbd::Journal<>::LOCAL_MIRROR_UUID; - } - - dout(20) << "mirror_uuid=" << mirror_uuid << ", " - << "predecessor_mirror_uuid=" << predecessor.mirror_uuid << ", " - << "replay_tag_tid=" << m_replay_tag_tid << ", " - << "replay_tag_data=" << m_replay_tag_data << dendl; - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_allocate_local_tag>(this); - m_local_journal->allocate_tag(mirror_uuid, predecessor, ctx); -} - -template -void ImageReplayer::handle_allocate_local_tag(int r) { - dout(20) << "r=" << r << dendl; - - if (r < 0) { - derr << "failed to allocate journal tag: " << cpp_strerror(r) << dendl; - m_event_replay_tracker.finish_op(); - handle_replay_complete(r, "failed to allocate journal tag"); - return; - } - - preprocess_entry(); -} - -template -void ImageReplayer::preprocess_entry() { - dout(20) << "preprocessing entry tid=" << m_replay_entry.get_commit_tid() - << dendl; - - bufferlist data = m_replay_entry.get_data(); - bufferlist::iterator it = data.begin(); - int r = m_local_replay->decode(&it, &m_event_entry); - if (r < 0) { - derr << "failed to decode journal event" << dendl; - m_event_replay_tracker.finish_op(); - handle_replay_complete(r, "failed to decode journal event"); - return; - } - - uint32_t delay = calculate_replay_delay( - m_event_entry.timestamp, m_local_image_ctx->mirroring_replay_delay); - if (delay == 0) { - handle_preprocess_entry_ready(0); - return; - } - - dout(20) << "delaying replay by " << delay << " sec" << dendl; - - Mutex::Locker timer_locker(m_threads->timer_lock); - assert(m_delayed_preprocess_task == nullptr); - m_delayed_preprocess_task = new FunctionContext( - [this](int r) { - assert(m_threads->timer_lock.is_locked()); - m_delayed_preprocess_task = nullptr; - m_threads->work_queue->queue( - create_context_callback::handle_preprocess_entry_ready>(this), 0); - }); - m_threads->timer->add_event_after(delay, m_delayed_preprocess_task); -} - -template -void ImageReplayer::handle_preprocess_entry_ready(int r) { - dout(20) << "r=" << r << dendl; - assert(r == 0); - - if (!m_event_preprocessor->is_required(m_event_entry)) { - process_entry(); - return; - } - - Context *ctx = create_context_callback< - ImageReplayer, &ImageReplayer::handle_preprocess_entry_safe>(this); - m_event_preprocessor->preprocess(&m_event_entry, ctx); -} - -template -void ImageReplayer::handle_preprocess_entry_safe(int r) { - dout(20) << "r=" << r << dendl; - - if (r < 0) { - m_event_replay_tracker.finish_op(); - - if (r == -ECANCELED) { - handle_replay_complete(0, "lost exclusive lock"); - } else { - derr << "failed to preprocess journal event" << dendl; - handle_replay_complete(r, "failed to preprocess journal event"); - } - return; - } - - process_entry(); -} - -template -void ImageReplayer::process_entry() { - dout(20) << "processing entry tid=" << m_replay_entry.get_commit_tid() - << dendl; - - // stop replaying events if stop has been requested - if (on_replay_interrupted()) { - m_event_replay_tracker.finish_op(); - return; - } - - Context *on_ready = create_context_callback< - ImageReplayer, &ImageReplayer::handle_process_entry_ready>(this); - Context *on_commit = new C_ReplayCommitted(this, std::move(m_replay_entry)); - - m_local_replay->process(m_event_entry, on_ready, on_commit); -} - -template -void ImageReplayer::handle_process_entry_ready(int r) { - dout(20) << dendl; - assert(r == 0); - - on_name_changed(); - - // attempt to process the next event - handle_replay_ready(); -} - -template -void ImageReplayer::handle_process_entry_safe(const ReplayEntry& replay_entry, - int r) { - dout(20) << "commit_tid=" << replay_entry.get_commit_tid() << ", r=" << r - << dendl; - - if (r < 0) { - derr << "failed to commit journal event: " << cpp_strerror(r) << dendl; - handle_replay_complete(r, "failed to commit journal event"); - } else { - assert(m_remote_journaler != nullptr); - m_remote_journaler->committed(replay_entry); - } - m_event_replay_tracker.finish_op(); -} - -template -bool ImageReplayer::update_mirror_image_status(bool force, - const OptionalState &state) { - dout(20) << dendl; - { - Mutex::Locker locker(m_lock); - if (!start_mirror_image_status_update(force, false)) { - return false; - } - } - - queue_mirror_image_status_update(state); - return true; -} - -template -bool ImageReplayer::start_mirror_image_status_update(bool force, - bool restarting) { - assert(m_lock.is_locked()); - - if (!force && !is_stopped_()) { - if (!is_running_()) { - dout(20) << "shut down in-progress: ignoring update" << dendl; - return false; - } else if (m_in_flight_status_updates > (restarting ? 1 : 0)) { - dout(20) << "already sending update" << dendl; - m_update_status_requested = true; - return false; - } - } - - dout(20) << dendl; - ++m_in_flight_status_updates; - return true; -} - -template -void ImageReplayer::finish_mirror_image_status_update() { - Context *on_finish = nullptr; - { - Mutex::Locker locker(m_lock); - assert(m_in_flight_status_updates > 0); - if (--m_in_flight_status_updates > 0) { - dout(20) << "waiting on " << m_in_flight_status_updates << " in-flight " - << "updates" << dendl; - return; - } - - std::swap(on_finish, m_on_update_status_finish); - } - - dout(20) << dendl; - if (on_finish != nullptr) { - on_finish->complete(0); - } -} - -template -void ImageReplayer::queue_mirror_image_status_update(const OptionalState &state) { - dout(20) << dendl; - FunctionContext *ctx = new FunctionContext( - [this, state](int r) { - send_mirror_status_update(state); - }); - m_threads->work_queue->queue(ctx, 0); -} - -template -void ImageReplayer::send_mirror_status_update(const OptionalState &opt_state) { - State state; - std::string state_desc; - int last_r; - bool stopping_replay; - - OptionalMirrorImageStatusState mirror_image_status_state{ - boost::make_optional(false, cls::rbd::MirrorImageStatusState{})}; - image_replayer::BootstrapRequest* bootstrap_request = nullptr; - { - Mutex::Locker locker(m_lock); - state = m_state; - state_desc = m_state_desc; - mirror_image_status_state = m_mirror_image_status_state; - last_r = m_last_r; - stopping_replay = (m_local_image_ctx != nullptr); - - if (m_bootstrap_request != nullptr) { - bootstrap_request = m_bootstrap_request; - bootstrap_request->get(); - } - } - - bool syncing = false; - if (bootstrap_request != nullptr) { - syncing = bootstrap_request->is_syncing(); - bootstrap_request->put(); - bootstrap_request = nullptr; - } - - if (opt_state) { - state = *opt_state; - } - - cls::rbd::MirrorImageStatus status; - status.up = true; - switch (state) { - case STATE_STARTING: - if (syncing) { - status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_SYNCING; - status.description = state_desc.empty() ? "syncing" : state_desc; - mirror_image_status_state = status.state; - } else { - status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY; - status.description = "starting replay"; - } - break; - case STATE_REPLAYING: - case STATE_REPLAY_FLUSHING: - status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING; - { - Context *on_req_finish = new FunctionContext( - [this](int r) { - dout(20) << "replay status ready: r=" << r << dendl; - if (r >= 0) { - send_mirror_status_update(boost::none); - } else if (r == -EAGAIN) { - // decrement in-flight status update counter - handle_mirror_status_update(r); - } - }); - - std::string desc; - if (!m_replay_status_formatter->get_or_send_update(&desc, - on_req_finish)) { - dout(20) << "waiting for replay status" << dendl; - return; - } - status.description = "replaying, " + desc; - mirror_image_status_state = boost::none; - } - break; - case STATE_STOPPING: - if (stopping_replay) { - status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY; - status.description = "stopping replay"; - break; - } - // FALLTHROUGH - case STATE_STOPPED: - if (last_r == -EREMOTEIO) { - status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN; - status.description = state_desc; - mirror_image_status_state = status.state; - } else if (last_r < 0) { - status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR; - status.description = state_desc; - mirror_image_status_state = status.state; - } else { - status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_STOPPED; - status.description = state_desc.empty() ? "stopped" : state_desc; - mirror_image_status_state = boost::none; - } - break; - default: - assert(!"invalid state"); - } - - { - Mutex::Locker locker(m_lock); - m_mirror_image_status_state = mirror_image_status_state; - } - - // prevent the status from ping-ponging when failed replays are restarted - if (mirror_image_status_state && - *mirror_image_status_state == cls::rbd::MIRROR_IMAGE_STATUS_STATE_ERROR) { - status.state = *mirror_image_status_state; - } - - dout(20) << "status=" << status << dendl; - librados::ObjectWriteOperation op; - librbd::cls_client::mirror_image_status_set(&op, m_global_image_id, status); - - librados::AioCompletion *aio_comp = create_rados_callback< - ImageReplayer, &ImageReplayer::handle_mirror_status_update>(this); - int r = m_local_ioctx.aio_operate(RBD_MIRRORING, aio_comp, &op); - assert(r == 0); - aio_comp->release(); -} - -template -void ImageReplayer::handle_mirror_status_update(int r) { - dout(20) << "r=" << r << dendl; - - bool running = false; - bool started = false; - { - Mutex::Locker locker(m_lock); - bool update_status_requested = false; - std::swap(update_status_requested, m_update_status_requested); - - running = is_running_(); - if (running && update_status_requested) { - started = start_mirror_image_status_update(false, true); - } - } - - // if a deferred update is available, send it -- otherwise reschedule - // the timer task - if (started) { - queue_mirror_image_status_update(boost::none); - } else if (running) { - reschedule_update_status_task(); - } - - // mark committed status update as no longer in-flight - finish_mirror_image_status_update(); -} - -template -void ImageReplayer::reschedule_update_status_task(int new_interval) { - dout(20) << dendl; - - bool canceled_task = false; - { - Mutex::Locker locker(m_lock); - Mutex::Locker timer_locker(m_threads->timer_lock); - - if (m_update_status_task) { - canceled_task = m_threads->timer->cancel_event(m_update_status_task); - m_update_status_task = nullptr; - } - - if (new_interval > 0) { - m_update_status_interval = new_interval; - } - - bool restarting = (new_interval == 0 || canceled_task); - if (new_interval >= 0 && is_running_() && - start_mirror_image_status_update(false, restarting)) { - m_update_status_task = new FunctionContext( - [this](int r) { - assert(m_threads->timer_lock.is_locked()); - m_update_status_task = nullptr; - - queue_mirror_image_status_update(boost::none); - }); - m_threads->timer->add_event_after(m_update_status_interval, - m_update_status_task); - } - } - - if (canceled_task) { - dout(20) << "canceled task" << dendl; - finish_mirror_image_status_update(); - } -} - -template -void ImageReplayer::shut_down(int r) { - dout(20) << "r=" << r << dendl; - - bool canceled_delayed_preprocess_task = false; - { - Mutex::Locker timer_locker(m_threads->timer_lock); - if (m_delayed_preprocess_task != nullptr) { - canceled_delayed_preprocess_task = m_threads->timer->cancel_event( - m_delayed_preprocess_task); - assert(canceled_delayed_preprocess_task); - m_delayed_preprocess_task = nullptr; - } - } - if (canceled_delayed_preprocess_task) { - // wake up sleeping replay - m_event_replay_tracker.finish_op(); - } - - { - Mutex::Locker locker(m_lock); - assert(m_state == STATE_STOPPING); - - // if status updates are in-flight, wait for them to complete - // before proceeding - if (m_in_flight_status_updates > 0) { - if (m_on_update_status_finish == nullptr) { - dout(20) << "waiting for in-flight status update" << dendl; - m_on_update_status_finish = new FunctionContext( - [this, r](int _r) { - shut_down(r); - }); - } - return; - } - } - - // NOTE: it's important to ensure that the local image is fully - // closed before attempting to close the remote journal in - // case the remote cluster is unreachable - - // chain the shut down sequence (reverse order) - Context *ctx = new FunctionContext( - [this, r](int _r) { - update_mirror_image_status(true, STATE_STOPPED); - handle_shut_down(r); - }); - - // close the remote journal - if (m_remote_journaler != nullptr) { - ctx = new FunctionContext([this, ctx](int r) { - delete m_remote_journaler; - m_remote_journaler = nullptr; - ctx->complete(0); - }); - ctx = new FunctionContext([this, ctx](int r) { - m_remote_journaler->remove_listener(&m_remote_listener); - m_remote_journaler->shut_down(ctx); - }); - } - - // stop the replay of remote journal events - if (m_replay_handler != nullptr) { - ctx = new FunctionContext([this, ctx](int r) { - delete m_replay_handler; - m_replay_handler = nullptr; - - m_event_replay_tracker.wait_for_ops(ctx); - }); - ctx = new FunctionContext([this, ctx](int r) { - m_remote_journaler->stop_replay(ctx); - }); - } - - // close the local image (release exclusive lock) - if (m_local_image_ctx) { - ctx = new FunctionContext([this, ctx](int r) { - CloseImageRequest *request = CloseImageRequest::create( - &m_local_image_ctx, ctx); - request->send(); - }); - } - - // shut down event replay into the local image - if (m_local_journal != nullptr) { - ctx = new FunctionContext([this, ctx](int r) { - m_local_journal = nullptr; - ctx->complete(0); - }); - if (m_local_replay != nullptr) { - ctx = new FunctionContext([this, ctx](int r) { - m_local_journal->stop_external_replay(); - m_local_replay = nullptr; - - EventPreprocessor::destroy(m_event_preprocessor); - m_event_preprocessor = nullptr; - ctx->complete(0); - }); - } - ctx = new FunctionContext([this, ctx](int r) { - // blocks if listener notification is in-progress - m_local_journal->remove_listener(m_journal_listener); - ctx->complete(0); - }); - } - - // wait for all local in-flight replay events to complete - ctx = new FunctionContext([this, ctx](int r) { - if (r < 0) { - derr << "error shutting down journal replay: " << cpp_strerror(r) - << dendl; - } - - m_event_replay_tracker.wait_for_ops(ctx); - }); - - // flush any local in-flight replay events - if (m_local_replay != nullptr) { - ctx = new FunctionContext([this, ctx](int r) { - m_local_replay->shut_down(true, ctx); - }); - } - - m_threads->work_queue->queue(ctx, 0); -} - -template -void ImageReplayer::handle_shut_down(int r) { - reschedule_update_status_task(-1); - - bool unregister_asok_hook = false; - { - Mutex::Locker locker(m_lock); - - // if status updates are in-flight, wait for them to complete - // before proceeding - if (m_in_flight_status_updates > 0) { - if (m_on_update_status_finish == nullptr) { - dout(20) << "waiting for in-flight status update" << dendl; - m_on_update_status_finish = new FunctionContext( - [this, r](int _r) { - handle_shut_down(r); - }); - } - return; - } - - bool delete_requested = false; - if (m_delete_requested && !m_local_image_id.empty()) { - assert(m_remote_image.image_id.empty()); - dout(0) << "remote image no longer exists: scheduling deletion" << dendl; - delete_requested = true; - } - if (delete_requested || m_resync_requested) { - m_image_deleter->schedule_image_delete(m_local, - m_local_pool_id, - m_global_image_id, - m_resync_requested); - - m_local_image_id = ""; - m_resync_requested = false; - if (m_delete_requested) { - unregister_asok_hook = true; - m_delete_requested = false; - } - } else if (m_last_r == -ENOENT && - m_local_image_id.empty() && m_remote_image.image_id.empty()) { - dout(0) << "mirror image no longer exists" << dendl; - unregister_asok_hook = true; - m_finished = true; - } - } - - if (unregister_asok_hook) { - unregister_admin_socket_hook(); - } - - dout(20) << "stop complete" << dendl; - m_local_ioctx.close(); - - ReplayStatusFormatter::destroy(m_replay_status_formatter); - m_replay_status_formatter = nullptr; - - Context *on_start = nullptr; - Context *on_stop = nullptr; - { - Mutex::Locker locker(m_lock); - std::swap(on_start, m_on_start_finish); - std::swap(on_stop, m_on_stop_finish); - m_stop_requested = false; - assert(m_delayed_preprocess_task == nullptr); - assert(m_state == STATE_STOPPING); - m_state = STATE_STOPPED; - } - - if (on_start != nullptr) { - dout(20) << "on start finish complete, r=" << r << dendl; - on_start->complete(r); - r = 0; - } - if (on_stop != nullptr) { - dout(20) << "on stop finish complete, r=" << r << dendl; - on_stop->complete(r); - } -} - -template -void ImageReplayer::handle_remote_journal_metadata_updated() { - dout(20) << dendl; - - cls::journal::Client client; - { - Mutex::Locker locker(m_lock); - if (!is_running_()) { - return; - } - - int r = m_remote_journaler->get_cached_client(m_local_mirror_uuid, &client); - if (r < 0) { - derr << "failed to retrieve client: " << cpp_strerror(r) << dendl; - return; - } - } - - if (client.state != cls::journal::CLIENT_STATE_CONNECTED) { - dout(0) << "client flagged disconnected, stopping image replay" << dendl; - stop(nullptr, false, -ENOTCONN, "disconnected"); - } -} - -template -std::string ImageReplayer::to_string(const State state) { - switch (state) { - case ImageReplayer::STATE_STARTING: - return "Starting"; - case ImageReplayer::STATE_REPLAYING: - return "Replaying"; - case ImageReplayer::STATE_REPLAY_FLUSHING: - return "ReplayFlushing"; - case ImageReplayer::STATE_STOPPING: - return "Stopping"; - case ImageReplayer::STATE_STOPPED: - return "Stopped"; - default: - break; - } - return "Unknown(" + stringify(state) + ")"; -} - -template -void ImageReplayer::resync_image(Context *on_finish) { - dout(20) << dendl; - - m_resync_requested = true; - stop(on_finish); -} - -template -void ImageReplayer::register_admin_socket_hook() { - ImageReplayerAdminSocketHook *asok_hook; - { - Mutex::Locker locker(m_lock); - if (m_asok_hook != nullptr) { - return; - } - - dout(20) << "registered asok hook: " << m_name << dendl; - asok_hook = new ImageReplayerAdminSocketHook(g_ceph_context, m_name, - this); - int r = asok_hook->register_commands(); - if (r == 0) { - m_asok_hook = asok_hook; - return; - } - derr << "error registering admin socket commands" << dendl; - } - delete asok_hook; -} - -template -void ImageReplayer::unregister_admin_socket_hook() { - dout(20) << dendl; - - AdminSocketHook *asok_hook = nullptr; - { - Mutex::Locker locker(m_lock); - std::swap(asok_hook, m_asok_hook); - } - delete asok_hook; -} - -template -void ImageReplayer::on_name_changed() { - { - Mutex::Locker locker(m_lock); - std::string name = m_local_ioctx.get_pool_name() + "/" + - m_local_image_ctx->name; - if (m_name == name) { - return; - } - m_name = name; - } - unregister_admin_socket_hook(); - register_admin_socket_hook(); -} - -template -std::ostream &operator<<(std::ostream &os, const ImageReplayer &replayer) -{ - os << "ImageReplayer: " << &replayer << " [" << replayer.get_local_pool_id() - << "/" << replayer.get_global_image_id() << "]"; - return os; -} - -} // namespace mirror -} // namespace rbd - -template class rbd::mirror::ImageReplayer;