X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FPoolReplayer.cc;fp=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FPoolReplayer.cc;h=817d3434c3983a384b138e8091fa41fdb9f6c82f;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/tools/rbd_mirror/PoolReplayer.cc b/src/ceph/src/tools/rbd_mirror/PoolReplayer.cc new file mode 100644 index 0000000..817d343 --- /dev/null +++ b/src/ceph/src/tools/rbd_mirror/PoolReplayer.cc @@ -0,0 +1,808 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "PoolReplayer.h" +#include +#include "common/Formatter.h" +#include "common/admin_socket.h" +#include "common/ceph_argparse.h" +#include "common/code_environment.h" +#include "common/common_init.h" +#include "common/debug.h" +#include "common/errno.h" +#include "include/stringify.h" +#include "cls/rbd/cls_rbd_client.h" +#include "global/global_context.h" +#include "librbd/internal.h" +#include "librbd/Utils.h" +#include "librbd/Watcher.h" +#include "librbd/api/Mirror.h" +#include "InstanceReplayer.h" +#include "InstanceWatcher.h" +#include "LeaderWatcher.h" +#include "ServiceDaemon.h" +#include "Threads.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \ + << this << " " << __func__ << ": " + +using std::chrono::seconds; +using std::map; +using std::string; +using std::unique_ptr; +using std::vector; + +using librbd::cls_client::dir_get_name; +using librbd::util::create_async_context_callback; + +namespace rbd { +namespace mirror { + +namespace { + +const std::string SERVICE_DAEMON_LEADER_KEY("leader"); +const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count"); +const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count"); + +const std::vector UNIQUE_PEER_CONFIG_KEYS { + {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}}; + +class PoolReplayerAdminSocketCommand { +public: + PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer) + : pool_replayer(pool_replayer) { + } + virtual ~PoolReplayerAdminSocketCommand() {} + virtual bool call(Formatter *f, stringstream *ss) = 0; +protected: + PoolReplayer *pool_replayer; +}; + +class StatusCommand : public PoolReplayerAdminSocketCommand { +public: + explicit StatusCommand(PoolReplayer *pool_replayer) + : PoolReplayerAdminSocketCommand(pool_replayer) { + } + + bool call(Formatter *f, stringstream *ss) override { + pool_replayer->print_status(f, ss); + return true; + } +}; + +class StartCommand : public PoolReplayerAdminSocketCommand { +public: + explicit StartCommand(PoolReplayer *pool_replayer) + : PoolReplayerAdminSocketCommand(pool_replayer) { + } + + bool call(Formatter *f, stringstream *ss) override { + pool_replayer->start(); + return true; + } +}; + +class StopCommand : public PoolReplayerAdminSocketCommand { +public: + explicit StopCommand(PoolReplayer *pool_replayer) + : PoolReplayerAdminSocketCommand(pool_replayer) { + } + + bool call(Formatter *f, stringstream *ss) override { + pool_replayer->stop(true); + return true; + } +}; + +class RestartCommand : public PoolReplayerAdminSocketCommand { +public: + explicit RestartCommand(PoolReplayer *pool_replayer) + : PoolReplayerAdminSocketCommand(pool_replayer) { + } + + bool call(Formatter *f, stringstream *ss) override { + pool_replayer->restart(); + return true; + } +}; + +class FlushCommand : public PoolReplayerAdminSocketCommand { +public: + explicit FlushCommand(PoolReplayer *pool_replayer) + : PoolReplayerAdminSocketCommand(pool_replayer) { + } + + bool call(Formatter *f, stringstream *ss) override { + pool_replayer->flush(); + return true; + } +}; + +class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand { +public: + explicit LeaderReleaseCommand(PoolReplayer *pool_replayer) + : PoolReplayerAdminSocketCommand(pool_replayer) { + } + + bool call(Formatter *f, stringstream *ss) override { + pool_replayer->release_leader(); + return true; + } +}; + +class PoolReplayerAdminSocketHook : public AdminSocketHook { +public: + PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name, + PoolReplayer *pool_replayer) + : admin_socket(cct->get_admin_socket()) { + std::string command; + int r; + + command = "rbd mirror status " + name; + r = admin_socket->register_command(command, command, this, + "get status for rbd mirror " + name); + if (r == 0) { + commands[command] = new StatusCommand(pool_replayer); + } + + command = "rbd mirror start " + name; + r = admin_socket->register_command(command, command, this, + "start rbd mirror " + name); + if (r == 0) { + commands[command] = new StartCommand(pool_replayer); + } + + command = "rbd mirror stop " + name; + r = admin_socket->register_command(command, command, this, + "stop rbd mirror " + name); + if (r == 0) { + commands[command] = new StopCommand(pool_replayer); + } + + command = "rbd mirror restart " + name; + r = admin_socket->register_command(command, command, this, + "restart rbd mirror " + name); + if (r == 0) { + commands[command] = new RestartCommand(pool_replayer); + } + + command = "rbd mirror flush " + name; + r = admin_socket->register_command(command, command, this, + "flush rbd mirror " + name); + if (r == 0) { + commands[command] = new FlushCommand(pool_replayer); + } + + command = "rbd mirror leader release " + name; + r = admin_socket->register_command(command, command, this, + "release rbd mirror leader " + name); + if (r == 0) { + commands[command] = new LeaderReleaseCommand(pool_replayer); + } + } + + ~PoolReplayerAdminSocketHook() override { + for (Commands::const_iterator i = commands.begin(); i != commands.end(); + ++i) { + (void)admin_socket->unregister_command(i->first); + delete i->second; + } + } + + bool call(std::string command, cmdmap_t& cmdmap, std::string format, + bufferlist& out) override { + Commands::const_iterator i = commands.find(command); + assert(i != commands.end()); + Formatter *f = Formatter::create(format); + stringstream ss; + bool r = i->second->call(f, &ss); + delete f; + out.append(ss); + return r; + } + +private: + typedef std::map Commands; + + AdminSocket *admin_socket; + Commands commands; +}; + +} // anonymous namespace + +PoolReplayer::PoolReplayer(Threads *threads, + ServiceDaemon* service_daemon, + ImageDeleter<>* image_deleter, + int64_t local_pool_id, const peer_t &peer, + const std::vector &args) : + m_threads(threads), + m_service_daemon(service_daemon), + m_image_deleter(image_deleter), + m_local_pool_id(local_pool_id), + m_peer(peer), + m_args(args), + m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)), + m_local_pool_watcher_listener(this, true), + m_remote_pool_watcher_listener(this, false), + m_pool_replayer_thread(this), + m_leader_listener(this) +{ +} + +PoolReplayer::~PoolReplayer() +{ + delete m_asok_hook; + shut_down(); +} + +bool PoolReplayer::is_blacklisted() const { + Mutex::Locker locker(m_lock); + return m_blacklisted; +} + +bool PoolReplayer::is_leader() const { + Mutex::Locker locker(m_lock); + return m_leader_watcher && m_leader_watcher->is_leader(); +} + +bool PoolReplayer::is_running() const { + return m_pool_replayer_thread.is_started(); +} + +void PoolReplayer::init() +{ + assert(!m_pool_replayer_thread.is_started()); + + // reset state + m_stopping = false; + m_blacklisted = false; + + dout(20) << "replaying for " << m_peer << dendl; + int r = init_rados(g_ceph_context->_conf->cluster, + g_ceph_context->_conf->name.to_str(), + "local cluster", &m_local_rados, false); + if (r < 0) { + m_callout_id = m_service_daemon->add_or_update_callout( + m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, + "unable to connect to local cluster"); + return; + } + + r = init_rados(m_peer.cluster_name, m_peer.client_name, + std::string("remote peer ") + stringify(m_peer), + &m_remote_rados, true); + if (r < 0) { + m_callout_id = m_service_daemon->add_or_update_callout( + m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, + "unable to connect to remote cluster"); + return; + } + + r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx); + if (r < 0) { + derr << "error accessing local pool " << m_local_pool_id << ": " + << cpp_strerror(r) << dendl; + return; + } + + std::string local_mirror_uuid; + r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx, + &local_mirror_uuid); + if (r < 0) { + derr << "failed to retrieve local mirror uuid from pool " + << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; + m_callout_id = m_service_daemon->add_or_update_callout( + m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, + "unable to query local mirror uuid"); + return; + } + + r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(), + m_remote_io_ctx); + if (r < 0) { + derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name() + << ": " << cpp_strerror(r) << dendl; + m_callout_id = m_service_daemon->add_or_update_callout( + m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING, + "unable to access remote pool"); + return; + } + + dout(20) << "connected to " << m_peer << dendl; + + m_instance_replayer.reset(InstanceReplayer<>::create( + m_threads, m_service_daemon, m_image_deleter, m_local_rados, + local_mirror_uuid, m_local_pool_id)); + m_instance_replayer->init(); + m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx); + + m_instance_watcher.reset(InstanceWatcher<>::create( + m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get())); + r = m_instance_watcher->init(); + if (r < 0) { + derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl; + m_callout_id = m_service_daemon->add_or_update_callout( + m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, + "unable to initialize instance messenger object"); + return; + } + + m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx, + &m_leader_listener)); + r = m_leader_watcher->init(); + if (r < 0) { + derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl; + m_callout_id = m_service_daemon->add_or_update_callout( + m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, + "unable to initialize leader messenger object"); + return; + } + + if (m_callout_id != service_daemon::CALLOUT_ID_NONE) { + m_service_daemon->remove_callout(m_local_pool_id, m_callout_id); + m_callout_id = service_daemon::CALLOUT_ID_NONE; + } + + m_pool_replayer_thread.create("pool replayer"); +} + +void PoolReplayer::shut_down() { + m_stopping = true; + { + Mutex::Locker l(m_lock); + m_cond.Signal(); + } + if (m_pool_replayer_thread.is_started()) { + m_pool_replayer_thread.join(); + } + if (m_leader_watcher) { + m_leader_watcher->shut_down(); + m_leader_watcher.reset(); + } + if (m_instance_watcher) { + m_instance_watcher->shut_down(); + m_instance_watcher.reset(); + } + if (m_instance_replayer) { + m_instance_replayer->shut_down(); + m_instance_replayer.reset(); + } + + assert(!m_local_pool_watcher); + assert(!m_remote_pool_watcher); + m_local_rados.reset(); + m_remote_rados.reset(); +} + +int PoolReplayer::init_rados(const std::string &cluster_name, + const std::string &client_name, + const std::string &description, + RadosRef *rados_ref, + bool strip_cluster_overrides) { + rados_ref->reset(new librados::Rados()); + + // NOTE: manually bootstrap a CephContext here instead of via + // the librados API to avoid mixing global singletons between + // the librados shared library and the daemon + // TODO: eliminate intermingling of global singletons within Ceph APIs + CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT); + if (client_name.empty() || !iparams.name.from_str(client_name)) { + derr << "error initializing cluster handle for " << description << dendl; + return -EINVAL; + } + + CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, + CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); + cct->_conf->cluster = cluster_name; + + // librados::Rados::conf_read_file + int r = cct->_conf->parse_config_files(nullptr, nullptr, 0); + if (r < 0) { + derr << "could not read ceph conf for " << description << ": " + << cpp_strerror(r) << dendl; + cct->put(); + return r; + } + + // preserve cluster-specific config settings before applying environment/cli + // overrides + std::map config_values; + if (strip_cluster_overrides) { + // remote peer connections shouldn't apply cluster-specific + // configuration settings + for (auto& key : UNIQUE_PEER_CONFIG_KEYS) { + config_values[key] = cct->_conf->get_val(key); + } + } + + cct->_conf->parse_env(); + + // librados::Rados::conf_parse_env + std::vector args; + env_to_vec(args, nullptr); + r = cct->_conf->parse_argv(args); + if (r < 0) { + derr << "could not parse environment for " << description << ":" + << cpp_strerror(r) << dendl; + cct->put(); + return r; + } + + if (!m_args.empty()) { + // librados::Rados::conf_parse_argv + args = m_args; + r = cct->_conf->parse_argv(args); + if (r < 0) { + derr << "could not parse command line args for " << description << ": " + << cpp_strerror(r) << dendl; + cct->put(); + return r; + } + } + + if (strip_cluster_overrides) { + // remote peer connections shouldn't apply cluster-specific + // configuration settings + for (auto& pair : config_values) { + auto value = cct->_conf->get_val(pair.first); + if (pair.second != value) { + dout(0) << "reverting global config option override: " + << pair.first << ": " << value << " -> " << pair.second + << dendl; + cct->_conf->set_val_or_die(pair.first, pair.second); + } + } + } + + if (!g_ceph_context->_conf->admin_socket.empty()) { + cct->_conf->set_val_or_die("admin_socket", + "$run_dir/$name.$pid.$cluster.$cctid.asok"); + } + + // disable unnecessary librbd cache + cct->_conf->set_val_or_die("rbd_cache", "false"); + cct->_conf->apply_changes(nullptr); + cct->_conf->complain_about_parse_errors(cct); + + r = (*rados_ref)->init_with_context(cct); + assert(r == 0); + cct->put(); + + r = (*rados_ref)->connect(); + if (r < 0) { + derr << "error connecting to " << description << ": " + << cpp_strerror(r) << dendl; + return r; + } + + return 0; +} + +void PoolReplayer::run() +{ + dout(20) << "enter" << dendl; + + while (!m_stopping) { + std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " + + m_peer.cluster_name; + if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) { + m_asok_hook_name = asok_hook_name; + delete m_asok_hook; + + m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context, + m_asok_hook_name, this); + } + + Mutex::Locker locker(m_lock); + if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) || + (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) { + m_blacklisted = true; + m_stopping = true; + break; + } + + if (!m_stopping) { + m_cond.WaitInterval(m_lock, utime_t(1, 0)); + } + } +} + +void PoolReplayer::print_status(Formatter *f, stringstream *ss) +{ + dout(20) << "enter" << dendl; + + if (!f) { + return; + } + + Mutex::Locker l(m_lock); + + f->open_object_section("pool_replayer_status"); + f->dump_string("pool", m_local_io_ctx.get_pool_name()); + f->dump_stream("peer") << m_peer; + f->dump_string("instance_id", m_instance_watcher->get_instance_id()); + + std::string leader_instance_id; + m_leader_watcher->get_leader_instance_id(&leader_instance_id); + f->dump_string("leader_instance_id", leader_instance_id); + + bool leader = m_leader_watcher->is_leader(); + f->dump_bool("leader", leader); + if (leader) { + std::vector instance_ids; + m_leader_watcher->list_instances(&instance_ids); + f->open_array_section("instances"); + for (auto instance_id : instance_ids) { + f->dump_string("instance_id", instance_id); + } + f->close_section(); + } + + f->dump_string("local_cluster_admin_socket", + reinterpret_cast(m_local_io_ctx.cct())->_conf-> + get_val("admin_socket")); + f->dump_string("remote_cluster_admin_socket", + reinterpret_cast(m_remote_io_ctx.cct())->_conf-> + get_val("admin_socket")); + + f->open_object_section("sync_throttler"); + m_instance_watcher->print_sync_status(f, ss); + f->close_section(); + + m_instance_replayer->print_status(f, ss); + + f->close_section(); + f->flush(*ss); +} + +void PoolReplayer::start() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping) { + return; + } + + m_instance_replayer->start(); +} + +void PoolReplayer::stop(bool manual) +{ + dout(20) << "enter: manual=" << manual << dendl; + + Mutex::Locker l(m_lock); + if (!manual) { + m_stopping = true; + m_cond.Signal(); + return; + } else if (m_stopping) { + return; + } + + m_instance_replayer->stop(); +} + +void PoolReplayer::restart() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping) { + return; + } + + m_instance_replayer->restart(); +} + +void PoolReplayer::flush() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping || m_manual_stop) { + return; + } + + m_instance_replayer->flush(); +} + +void PoolReplayer::release_leader() +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping || !m_leader_watcher) { + return; + } + + m_leader_watcher->release_leader(); +} + +void PoolReplayer::handle_update(const std::string &mirror_uuid, + ImageIds &&added_image_ids, + ImageIds &&removed_image_ids) { + if (m_stopping) { + return; + } + + dout(10) << "mirror_uuid=" << mirror_uuid << ", " + << "added_count=" << added_image_ids.size() << ", " + << "removed_count=" << removed_image_ids.size() << dendl; + Mutex::Locker locker(m_lock); + if (!m_leader_watcher->is_leader()) { + return; + } + + m_service_daemon->add_or_update_attribute( + m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY, + m_local_pool_watcher->get_image_count()); + if (m_remote_pool_watcher) { + m_service_daemon->add_or_update_attribute( + m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY, + m_remote_pool_watcher->get_image_count()); + } + + m_update_op_tracker.start_op(); + Context *ctx = new FunctionContext([this](int r) { + dout(20) << "complete handle_update: r=" << r << dendl; + m_update_op_tracker.finish_op(); + }); + + C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx); + + for (auto &image_id : added_image_ids) { + // for now always send to myself (the leader) + std::string &instance_id = m_instance_watcher->get_instance_id(); + m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id, + gather_ctx->new_sub()); + } + + if (!mirror_uuid.empty()) { + for (auto &image_id : removed_image_ids) { + // for now always send to myself (the leader) + std::string &instance_id = m_instance_watcher->get_instance_id(); + m_instance_watcher->notify_peer_image_removed(instance_id, + image_id.global_id, + mirror_uuid, + gather_ctx->new_sub()); + } + } + + gather_ctx->activate(); +} + +void PoolReplayer::handle_post_acquire_leader(Context *on_finish) { + dout(20) << dendl; + + m_service_daemon->add_or_update_attribute(m_local_pool_id, + SERVICE_DAEMON_LEADER_KEY, true); + m_instance_watcher->handle_acquire_leader(); + init_local_pool_watcher(on_finish); +} + +void PoolReplayer::handle_pre_release_leader(Context *on_finish) { + dout(20) << dendl; + + m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY); + m_instance_watcher->handle_release_leader(); + shut_down_pool_watchers(on_finish); +} + +void PoolReplayer::init_local_pool_watcher(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + assert(!m_local_pool_watcher); + m_local_pool_watcher.reset(new PoolWatcher<>( + m_threads, m_local_io_ctx, m_local_pool_watcher_listener)); + + // ensure the initial set of local images is up-to-date + // after acquiring the leader role + auto ctx = new FunctionContext([this, on_finish](int r) { + handle_init_local_pool_watcher(r, on_finish); + }); + m_local_pool_watcher->init(create_async_context_callback( + m_threads->work_queue, ctx)); +} + +void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) { + dout(20) << "r=" << r << dendl; + if (r < 0) { + derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl; + on_finish->complete(r); + return; + } + + init_remote_pool_watcher(on_finish); +} + +void PoolReplayer::init_remote_pool_watcher(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + assert(!m_remote_pool_watcher); + m_remote_pool_watcher.reset(new PoolWatcher<>( + m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener)); + m_remote_pool_watcher->init(create_async_context_callback( + m_threads->work_queue, on_finish)); + + m_cond.Signal(); +} + +void PoolReplayer::shut_down_pool_watchers(Context *on_finish) { + dout(20) << dendl; + + { + Mutex::Locker locker(m_lock); + if (m_local_pool_watcher) { + Context *ctx = new FunctionContext([this, on_finish](int r) { + handle_shut_down_pool_watchers(r, on_finish); + }); + ctx = create_async_context_callback(m_threads->work_queue, ctx); + + auto gather_ctx = new C_Gather(g_ceph_context, ctx); + m_local_pool_watcher->shut_down(gather_ctx->new_sub()); + if (m_remote_pool_watcher) { + m_remote_pool_watcher->shut_down(gather_ctx->new_sub()); + } + gather_ctx->activate(); + return; + } + } + + on_finish->complete(0); +} + +void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) { + dout(20) << "r=" << r << dendl; + + { + Mutex::Locker locker(m_lock); + assert(m_local_pool_watcher); + m_local_pool_watcher.reset(); + + if (m_remote_pool_watcher) { + m_remote_pool_watcher.reset(); + } + } + wait_for_update_ops(on_finish); +} + +void PoolReplayer::wait_for_update_ops(Context *on_finish) { + dout(20) << dendl; + + Mutex::Locker locker(m_lock); + + Context *ctx = new FunctionContext([this, on_finish](int r) { + handle_wait_for_update_ops(r, on_finish); + }); + ctx = create_async_context_callback(m_threads->work_queue, ctx); + + m_update_op_tracker.wait_for_ops(ctx); +} + +void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) { + dout(20) << "r=" << r << dendl; + + assert(r == 0); + + Mutex::Locker locker(m_lock); + m_instance_replayer->release_all(on_finish); +} + +void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) { + dout(20) << "leader_instance_id=" << leader_instance_id << dendl; + + m_instance_watcher->handle_update_leader(leader_instance_id); +} + +} // namespace mirror +} // namespace rbd