1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "PoolReplayer.h"
5 #include <boost/bind.hpp>
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/ceph_argparse.h"
9 #include "common/code_environment.h"
10 #include "common/common_init.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "include/stringify.h"
14 #include "cls/rbd/cls_rbd_client.h"
15 #include "global/global_context.h"
16 #include "librbd/internal.h"
17 #include "librbd/Utils.h"
18 #include "librbd/Watcher.h"
19 #include "librbd/api/Mirror.h"
20 #include "InstanceReplayer.h"
21 #include "InstanceWatcher.h"
22 #include "LeaderWatcher.h"
23 #include "ServiceDaemon.h"
26 #define dout_context g_ceph_context
27 #define dout_subsys ceph_subsys_rbd_mirror
29 #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
30 << this << " " << __func__ << ": "
32 using std::chrono::seconds;
35 using std::unique_ptr;
38 using librbd::cls_client::dir_get_name;
39 using librbd::util::create_async_context_callback;
46 const std::string SERVICE_DAEMON_LEADER_KEY("leader");
47 const std::string SERVICE_DAEMON_LOCAL_COUNT_KEY("image_local_count");
48 const std::string SERVICE_DAEMON_REMOTE_COUNT_KEY("image_remote_count");
50 const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS {
51 {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
53 class PoolReplayerAdminSocketCommand {
55 PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
56 : pool_replayer(pool_replayer) {
58 virtual ~PoolReplayerAdminSocketCommand() {}
59 virtual bool call(Formatter *f, stringstream *ss) = 0;
61 PoolReplayer *pool_replayer;
64 class StatusCommand : public PoolReplayerAdminSocketCommand {
66 explicit StatusCommand(PoolReplayer *pool_replayer)
67 : PoolReplayerAdminSocketCommand(pool_replayer) {
70 bool call(Formatter *f, stringstream *ss) override {
71 pool_replayer->print_status(f, ss);
76 class StartCommand : public PoolReplayerAdminSocketCommand {
78 explicit StartCommand(PoolReplayer *pool_replayer)
79 : PoolReplayerAdminSocketCommand(pool_replayer) {
82 bool call(Formatter *f, stringstream *ss) override {
83 pool_replayer->start();
88 class StopCommand : public PoolReplayerAdminSocketCommand {
90 explicit StopCommand(PoolReplayer *pool_replayer)
91 : PoolReplayerAdminSocketCommand(pool_replayer) {
94 bool call(Formatter *f, stringstream *ss) override {
95 pool_replayer->stop(true);
100 class RestartCommand : public PoolReplayerAdminSocketCommand {
102 explicit RestartCommand(PoolReplayer *pool_replayer)
103 : PoolReplayerAdminSocketCommand(pool_replayer) {
106 bool call(Formatter *f, stringstream *ss) override {
107 pool_replayer->restart();
112 class FlushCommand : public PoolReplayerAdminSocketCommand {
114 explicit FlushCommand(PoolReplayer *pool_replayer)
115 : PoolReplayerAdminSocketCommand(pool_replayer) {
118 bool call(Formatter *f, stringstream *ss) override {
119 pool_replayer->flush();
124 class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand {
126 explicit LeaderReleaseCommand(PoolReplayer *pool_replayer)
127 : PoolReplayerAdminSocketCommand(pool_replayer) {
130 bool call(Formatter *f, stringstream *ss) override {
131 pool_replayer->release_leader();
136 class PoolReplayerAdminSocketHook : public AdminSocketHook {
138 PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
139 PoolReplayer *pool_replayer)
140 : admin_socket(cct->get_admin_socket()) {
144 command = "rbd mirror status " + name;
145 r = admin_socket->register_command(command, command, this,
146 "get status for rbd mirror " + name);
148 commands[command] = new StatusCommand(pool_replayer);
151 command = "rbd mirror start " + name;
152 r = admin_socket->register_command(command, command, this,
153 "start rbd mirror " + name);
155 commands[command] = new StartCommand(pool_replayer);
158 command = "rbd mirror stop " + name;
159 r = admin_socket->register_command(command, command, this,
160 "stop rbd mirror " + name);
162 commands[command] = new StopCommand(pool_replayer);
165 command = "rbd mirror restart " + name;
166 r = admin_socket->register_command(command, command, this,
167 "restart rbd mirror " + name);
169 commands[command] = new RestartCommand(pool_replayer);
172 command = "rbd mirror flush " + name;
173 r = admin_socket->register_command(command, command, this,
174 "flush rbd mirror " + name);
176 commands[command] = new FlushCommand(pool_replayer);
179 command = "rbd mirror leader release " + name;
180 r = admin_socket->register_command(command, command, this,
181 "release rbd mirror leader " + name);
183 commands[command] = new LeaderReleaseCommand(pool_replayer);
187 ~PoolReplayerAdminSocketHook() override {
188 for (Commands::const_iterator i = commands.begin(); i != commands.end();
190 (void)admin_socket->unregister_command(i->first);
195 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
196 bufferlist& out) override {
197 Commands::const_iterator i = commands.find(command);
198 assert(i != commands.end());
199 Formatter *f = Formatter::create(format);
201 bool r = i->second->call(f, &ss);
208 typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands;
210 AdminSocket *admin_socket;
214 } // anonymous namespace
216 PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
217 ServiceDaemon<librbd::ImageCtx>* service_daemon,
218 ImageDeleter<>* image_deleter,
219 int64_t local_pool_id, const peer_t &peer,
220 const std::vector<const char*> &args) :
222 m_service_daemon(service_daemon),
223 m_image_deleter(image_deleter),
224 m_local_pool_id(local_pool_id),
227 m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
228 m_local_pool_watcher_listener(this, true),
229 m_remote_pool_watcher_listener(this, false),
230 m_pool_replayer_thread(this),
231 m_leader_listener(this)
235 PoolReplayer::~PoolReplayer()
241 bool PoolReplayer::is_blacklisted() const {
242 Mutex::Locker locker(m_lock);
243 return m_blacklisted;
246 bool PoolReplayer::is_leader() const {
247 Mutex::Locker locker(m_lock);
248 return m_leader_watcher && m_leader_watcher->is_leader();
251 bool PoolReplayer::is_running() const {
252 return m_pool_replayer_thread.is_started();
255 void PoolReplayer::init()
257 assert(!m_pool_replayer_thread.is_started());
261 m_blacklisted = false;
263 dout(20) << "replaying for " << m_peer << dendl;
264 int r = init_rados(g_ceph_context->_conf->cluster,
265 g_ceph_context->_conf->name.to_str(),
266 "local cluster", &m_local_rados, false);
268 m_callout_id = m_service_daemon->add_or_update_callout(
269 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
270 "unable to connect to local cluster");
274 r = init_rados(m_peer.cluster_name, m_peer.client_name,
275 std::string("remote peer ") + stringify(m_peer),
276 &m_remote_rados, true);
278 m_callout_id = m_service_daemon->add_or_update_callout(
279 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
280 "unable to connect to remote cluster");
284 r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
286 derr << "error accessing local pool " << m_local_pool_id << ": "
287 << cpp_strerror(r) << dendl;
291 std::string local_mirror_uuid;
292 r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
295 derr << "failed to retrieve local mirror uuid from pool "
296 << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
297 m_callout_id = m_service_daemon->add_or_update_callout(
298 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
299 "unable to query local mirror uuid");
303 r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
306 derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
307 << ": " << cpp_strerror(r) << dendl;
308 m_callout_id = m_service_daemon->add_or_update_callout(
309 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
310 "unable to access remote pool");
314 dout(20) << "connected to " << m_peer << dendl;
316 m_instance_replayer.reset(InstanceReplayer<>::create(
317 m_threads, m_service_daemon, m_image_deleter, m_local_rados,
318 local_mirror_uuid, m_local_pool_id));
319 m_instance_replayer->init();
320 m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
322 m_instance_watcher.reset(InstanceWatcher<>::create(
323 m_local_io_ctx, m_threads->work_queue, m_instance_replayer.get()));
324 r = m_instance_watcher->init();
326 derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
327 m_callout_id = m_service_daemon->add_or_update_callout(
328 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
329 "unable to initialize instance messenger object");
333 m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
334 &m_leader_listener));
335 r = m_leader_watcher->init();
337 derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
338 m_callout_id = m_service_daemon->add_or_update_callout(
339 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
340 "unable to initialize leader messenger object");
344 if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
345 m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
346 m_callout_id = service_daemon::CALLOUT_ID_NONE;
349 m_pool_replayer_thread.create("pool replayer");
352 void PoolReplayer::shut_down() {
355 Mutex::Locker l(m_lock);
358 if (m_pool_replayer_thread.is_started()) {
359 m_pool_replayer_thread.join();
361 if (m_leader_watcher) {
362 m_leader_watcher->shut_down();
363 m_leader_watcher.reset();
365 if (m_instance_watcher) {
366 m_instance_watcher->shut_down();
367 m_instance_watcher.reset();
369 if (m_instance_replayer) {
370 m_instance_replayer->shut_down();
371 m_instance_replayer.reset();
374 assert(!m_local_pool_watcher);
375 assert(!m_remote_pool_watcher);
376 m_local_rados.reset();
377 m_remote_rados.reset();
380 int PoolReplayer::init_rados(const std::string &cluster_name,
381 const std::string &client_name,
382 const std::string &description,
384 bool strip_cluster_overrides) {
385 rados_ref->reset(new librados::Rados());
387 // NOTE: manually bootstrap a CephContext here instead of via
388 // the librados API to avoid mixing global singletons between
389 // the librados shared library and the daemon
390 // TODO: eliminate intermingling of global singletons within Ceph APIs
391 CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
392 if (client_name.empty() || !iparams.name.from_str(client_name)) {
393 derr << "error initializing cluster handle for " << description << dendl;
397 CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
398 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
399 cct->_conf->cluster = cluster_name;
401 // librados::Rados::conf_read_file
402 int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
404 derr << "could not read ceph conf for " << description << ": "
405 << cpp_strerror(r) << dendl;
410 // preserve cluster-specific config settings before applying environment/cli
412 std::map<std::string, std::string> config_values;
413 if (strip_cluster_overrides) {
414 // remote peer connections shouldn't apply cluster-specific
415 // configuration settings
416 for (auto& key : UNIQUE_PEER_CONFIG_KEYS) {
417 config_values[key] = cct->_conf->get_val<std::string>(key);
421 cct->_conf->parse_env();
423 // librados::Rados::conf_parse_env
424 std::vector<const char*> args;
425 env_to_vec(args, nullptr);
426 r = cct->_conf->parse_argv(args);
428 derr << "could not parse environment for " << description << ":"
429 << cpp_strerror(r) << dendl;
434 if (!m_args.empty()) {
435 // librados::Rados::conf_parse_argv
437 r = cct->_conf->parse_argv(args);
439 derr << "could not parse command line args for " << description << ": "
440 << cpp_strerror(r) << dendl;
446 if (strip_cluster_overrides) {
447 // remote peer connections shouldn't apply cluster-specific
448 // configuration settings
449 for (auto& pair : config_values) {
450 auto value = cct->_conf->get_val<std::string>(pair.first);
451 if (pair.second != value) {
452 dout(0) << "reverting global config option override: "
453 << pair.first << ": " << value << " -> " << pair.second
455 cct->_conf->set_val_or_die(pair.first, pair.second);
460 if (!g_ceph_context->_conf->admin_socket.empty()) {
461 cct->_conf->set_val_or_die("admin_socket",
462 "$run_dir/$name.$pid.$cluster.$cctid.asok");
465 // disable unnecessary librbd cache
466 cct->_conf->set_val_or_die("rbd_cache", "false");
467 cct->_conf->apply_changes(nullptr);
468 cct->_conf->complain_about_parse_errors(cct);
470 r = (*rados_ref)->init_with_context(cct);
474 r = (*rados_ref)->connect();
476 derr << "error connecting to " << description << ": "
477 << cpp_strerror(r) << dendl;
484 void PoolReplayer::run()
486 dout(20) << "enter" << dendl;
488 while (!m_stopping) {
489 std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
491 if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
492 m_asok_hook_name = asok_hook_name;
495 m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context,
496 m_asok_hook_name, this);
499 Mutex::Locker locker(m_lock);
500 if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
501 (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
502 m_blacklisted = true;
508 m_cond.WaitInterval(m_lock, utime_t(1, 0));
513 void PoolReplayer::print_status(Formatter *f, stringstream *ss)
515 dout(20) << "enter" << dendl;
521 Mutex::Locker l(m_lock);
523 f->open_object_section("pool_replayer_status");
524 f->dump_string("pool", m_local_io_ctx.get_pool_name());
525 f->dump_stream("peer") << m_peer;
526 f->dump_string("instance_id", m_instance_watcher->get_instance_id());
528 std::string leader_instance_id;
529 m_leader_watcher->get_leader_instance_id(&leader_instance_id);
530 f->dump_string("leader_instance_id", leader_instance_id);
532 bool leader = m_leader_watcher->is_leader();
533 f->dump_bool("leader", leader);
535 std::vector<std::string> instance_ids;
536 m_leader_watcher->list_instances(&instance_ids);
537 f->open_array_section("instances");
538 for (auto instance_id : instance_ids) {
539 f->dump_string("instance_id", instance_id);
544 f->dump_string("local_cluster_admin_socket",
545 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf->
546 get_val<std::string>("admin_socket"));
547 f->dump_string("remote_cluster_admin_socket",
548 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
549 get_val<std::string>("admin_socket"));
551 f->open_object_section("sync_throttler");
552 m_instance_watcher->print_sync_status(f, ss);
555 m_instance_replayer->print_status(f, ss);
561 void PoolReplayer::start()
563 dout(20) << "enter" << dendl;
565 Mutex::Locker l(m_lock);
571 m_instance_replayer->start();
574 void PoolReplayer::stop(bool manual)
576 dout(20) << "enter: manual=" << manual << dendl;
578 Mutex::Locker l(m_lock);
583 } else if (m_stopping) {
587 m_instance_replayer->stop();
590 void PoolReplayer::restart()
592 dout(20) << "enter" << dendl;
594 Mutex::Locker l(m_lock);
600 m_instance_replayer->restart();
603 void PoolReplayer::flush()
605 dout(20) << "enter" << dendl;
607 Mutex::Locker l(m_lock);
609 if (m_stopping || m_manual_stop) {
613 m_instance_replayer->flush();
616 void PoolReplayer::release_leader()
618 dout(20) << "enter" << dendl;
620 Mutex::Locker l(m_lock);
622 if (m_stopping || !m_leader_watcher) {
626 m_leader_watcher->release_leader();
629 void PoolReplayer::handle_update(const std::string &mirror_uuid,
630 ImageIds &&added_image_ids,
631 ImageIds &&removed_image_ids) {
636 dout(10) << "mirror_uuid=" << mirror_uuid << ", "
637 << "added_count=" << added_image_ids.size() << ", "
638 << "removed_count=" << removed_image_ids.size() << dendl;
639 Mutex::Locker locker(m_lock);
640 if (!m_leader_watcher->is_leader()) {
644 m_service_daemon->add_or_update_attribute(
645 m_local_pool_id, SERVICE_DAEMON_LOCAL_COUNT_KEY,
646 m_local_pool_watcher->get_image_count());
647 if (m_remote_pool_watcher) {
648 m_service_daemon->add_or_update_attribute(
649 m_local_pool_id, SERVICE_DAEMON_REMOTE_COUNT_KEY,
650 m_remote_pool_watcher->get_image_count());
653 m_update_op_tracker.start_op();
654 Context *ctx = new FunctionContext([this](int r) {
655 dout(20) << "complete handle_update: r=" << r << dendl;
656 m_update_op_tracker.finish_op();
659 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
661 for (auto &image_id : added_image_ids) {
662 // for now always send to myself (the leader)
663 std::string &instance_id = m_instance_watcher->get_instance_id();
664 m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
665 gather_ctx->new_sub());
668 if (!mirror_uuid.empty()) {
669 for (auto &image_id : removed_image_ids) {
670 // for now always send to myself (the leader)
671 std::string &instance_id = m_instance_watcher->get_instance_id();
672 m_instance_watcher->notify_peer_image_removed(instance_id,
675 gather_ctx->new_sub());
679 gather_ctx->activate();
682 void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
685 m_service_daemon->add_or_update_attribute(m_local_pool_id,
686 SERVICE_DAEMON_LEADER_KEY, true);
687 m_instance_watcher->handle_acquire_leader();
688 init_local_pool_watcher(on_finish);
691 void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
694 m_service_daemon->remove_attribute(m_local_pool_id, SERVICE_DAEMON_LEADER_KEY);
695 m_instance_watcher->handle_release_leader();
696 shut_down_pool_watchers(on_finish);
699 void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
702 Mutex::Locker locker(m_lock);
703 assert(!m_local_pool_watcher);
704 m_local_pool_watcher.reset(new PoolWatcher<>(
705 m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
707 // ensure the initial set of local images is up-to-date
708 // after acquiring the leader role
709 auto ctx = new FunctionContext([this, on_finish](int r) {
710 handle_init_local_pool_watcher(r, on_finish);
712 m_local_pool_watcher->init(create_async_context_callback(
713 m_threads->work_queue, ctx));
716 void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) {
717 dout(20) << "r=" << r << dendl;
719 derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
720 on_finish->complete(r);
724 init_remote_pool_watcher(on_finish);
727 void PoolReplayer::init_remote_pool_watcher(Context *on_finish) {
730 Mutex::Locker locker(m_lock);
731 assert(!m_remote_pool_watcher);
732 m_remote_pool_watcher.reset(new PoolWatcher<>(
733 m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
734 m_remote_pool_watcher->init(create_async_context_callback(
735 m_threads->work_queue, on_finish));
740 void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
744 Mutex::Locker locker(m_lock);
745 if (m_local_pool_watcher) {
746 Context *ctx = new FunctionContext([this, on_finish](int r) {
747 handle_shut_down_pool_watchers(r, on_finish);
749 ctx = create_async_context_callback(m_threads->work_queue, ctx);
751 auto gather_ctx = new C_Gather(g_ceph_context, ctx);
752 m_local_pool_watcher->shut_down(gather_ctx->new_sub());
753 if (m_remote_pool_watcher) {
754 m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
756 gather_ctx->activate();
761 on_finish->complete(0);
764 void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
765 dout(20) << "r=" << r << dendl;
768 Mutex::Locker locker(m_lock);
769 assert(m_local_pool_watcher);
770 m_local_pool_watcher.reset();
772 if (m_remote_pool_watcher) {
773 m_remote_pool_watcher.reset();
776 wait_for_update_ops(on_finish);
779 void PoolReplayer::wait_for_update_ops(Context *on_finish) {
782 Mutex::Locker locker(m_lock);
784 Context *ctx = new FunctionContext([this, on_finish](int r) {
785 handle_wait_for_update_ops(r, on_finish);
787 ctx = create_async_context_callback(m_threads->work_queue, ctx);
789 m_update_op_tracker.wait_for_ops(ctx);
792 void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
793 dout(20) << "r=" << r << dendl;
797 Mutex::Locker locker(m_lock);
798 m_instance_replayer->release_all(on_finish);
801 void PoolReplayer::handle_update_leader(const std::string &leader_instance_id) {
802 dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
804 m_instance_watcher->handle_update_leader(leader_instance_id);
807 } // namespace mirror