X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FMirror.cc;fp=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FMirror.cc;h=61dc9bf2d89c7615ac4a148949ebc24a8bb9904c;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/tools/rbd_mirror/Mirror.cc b/src/ceph/src/tools/rbd_mirror/Mirror.cc new file mode 100644 index 0000000..61dc9bf --- /dev/null +++ b/src/ceph/src/tools/rbd_mirror/Mirror.cc @@ -0,0 +1,436 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include "common/Formatter.h" +#include "common/admin_socket.h" +#include "common/debug.h" +#include "common/errno.h" +#include "librbd/ImageCtx.h" +#include "Mirror.h" +#include "ServiceDaemon.h" +#include "Threads.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_rbd_mirror +#undef dout_prefix +#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \ + << __func__ << ": " + +using std::list; +using std::map; +using std::set; +using std::string; +using std::unique_ptr; +using std::vector; + +using librados::Rados; +using librados::IoCtx; +using librbd::mirror_peer_t; + +namespace rbd { +namespace mirror { + +namespace { + +class MirrorAdminSocketCommand { +public: + virtual ~MirrorAdminSocketCommand() {} + virtual bool call(Formatter *f, stringstream *ss) = 0; +}; + +class StatusCommand : public MirrorAdminSocketCommand { +public: + explicit StatusCommand(Mirror *mirror) : mirror(mirror) {} + + bool call(Formatter *f, stringstream *ss) override { + mirror->print_status(f, ss); + return true; + } + +private: + Mirror *mirror; +}; + +class StartCommand : public MirrorAdminSocketCommand { +public: + explicit StartCommand(Mirror *mirror) : mirror(mirror) {} + + bool call(Formatter *f, stringstream *ss) override { + mirror->start(); + return true; + } + +private: + Mirror *mirror; +}; + +class StopCommand : public MirrorAdminSocketCommand { +public: + explicit StopCommand(Mirror *mirror) : mirror(mirror) {} + + bool call(Formatter *f, stringstream *ss) override { + mirror->stop(); + return true; + } + +private: + Mirror *mirror; +}; + +class RestartCommand : public MirrorAdminSocketCommand { +public: + explicit RestartCommand(Mirror *mirror) : mirror(mirror) {} + + bool call(Formatter *f, stringstream *ss) override { + mirror->restart(); + return true; + } + +private: + Mirror *mirror; +}; + +class FlushCommand : public MirrorAdminSocketCommand { +public: + explicit FlushCommand(Mirror *mirror) : mirror(mirror) {} + + bool call(Formatter *f, stringstream *ss) override { + mirror->flush(); + return true; + } + +private: + Mirror *mirror; +}; + +class LeaderReleaseCommand : public MirrorAdminSocketCommand { +public: + explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {} + + bool call(Formatter *f, stringstream *ss) override { + mirror->release_leader(); + return true; + } + +private: + Mirror *mirror; +}; + +} // anonymous namespace + +class MirrorAdminSocketHook : public AdminSocketHook { +public: + MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) : + admin_socket(cct->get_admin_socket()) { + std::string command; + int r; + + command = "rbd mirror status"; + r = admin_socket->register_command(command, command, this, + "get status for rbd mirror"); + if (r == 0) { + commands[command] = new StatusCommand(mirror); + } + + command = "rbd mirror start"; + r = admin_socket->register_command(command, command, this, + "start rbd mirror"); + if (r == 0) { + commands[command] = new StartCommand(mirror); + } + + command = "rbd mirror stop"; + r = admin_socket->register_command(command, command, this, + "stop rbd mirror"); + if (r == 0) { + commands[command] = new StopCommand(mirror); + } + + command = "rbd mirror restart"; + r = admin_socket->register_command(command, command, this, + "restart rbd mirror"); + if (r == 0) { + commands[command] = new RestartCommand(mirror); + } + + command = "rbd mirror flush"; + r = admin_socket->register_command(command, command, this, + "flush rbd mirror"); + if (r == 0) { + commands[command] = new FlushCommand(mirror); + } + + command = "rbd mirror leader release"; + r = admin_socket->register_command(command, command, this, + "release rbd mirror leader"); + if (r == 0) { + commands[command] = new LeaderReleaseCommand(mirror); + } + } + + ~MirrorAdminSocketHook() override { + for (Commands::const_iterator i = commands.begin(); i != commands.end(); + ++i) { + (void)admin_socket->unregister_command(i->first); + delete i->second; + } + } + + bool call(std::string command, cmdmap_t& cmdmap, std::string format, + bufferlist& out) override { + Commands::const_iterator i = commands.find(command); + assert(i != commands.end()); + Formatter *f = Formatter::create(format); + stringstream ss; + bool r = i->second->call(f, &ss); + delete f; + out.append(ss); + return r; + } + +private: + typedef std::map Commands; + + AdminSocket *admin_socket; + Commands commands; +}; + +Mirror::Mirror(CephContext *cct, const std::vector &args) : + m_cct(cct), + m_args(args), + m_lock("rbd::mirror::Mirror"), + m_local(new librados::Rados()), + m_asok_hook(new MirrorAdminSocketHook(cct, this)) +{ + cct->lookup_or_create_singleton_object >( + m_threads, "rbd_mirror::threads"); + m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads)); +} + +Mirror::~Mirror() +{ + delete m_asok_hook; +} + +void Mirror::handle_signal(int signum) +{ + m_stopping = true; + { + Mutex::Locker l(m_lock); + m_cond.Signal(); + } +} + +int Mirror::init() +{ + int r = m_local->init_with_context(m_cct); + if (r < 0) { + derr << "could not initialize rados handle" << dendl; + return r; + } + + r = m_local->connect(); + if (r < 0) { + derr << "error connecting to local cluster" << dendl; + return r; + } + + r = m_service_daemon->init(); + if (r < 0) { + derr << "error registering service daemon: " << cpp_strerror(r) << dendl; + return r; + } + + m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock, + m_service_daemon.get())); + + m_image_deleter.reset(new ImageDeleter<>(m_threads->work_queue, + m_threads->timer, + &m_threads->timer_lock, + m_service_daemon.get())); + return r; +} + +void Mirror::run() +{ + dout(20) << "enter" << dendl; + while (!m_stopping) { + m_local_cluster_watcher->refresh_pools(); + Mutex::Locker l(m_lock); + if (!m_manual_stop) { + update_pool_replayers(m_local_cluster_watcher->get_pool_peers()); + } + m_cond.WaitInterval( + m_lock, + utime_t(m_cct->_conf->get_val("rbd_mirror_pool_replayers_refresh_interval"), 0)); + } + + // stop all pool replayers in parallel + Mutex::Locker locker(m_lock); + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->stop(false); + } + dout(20) << "return" << dendl; +} + +void Mirror::print_status(Formatter *f, stringstream *ss) +{ + dout(20) << "enter" << dendl; + + Mutex::Locker l(m_lock); + + if (m_stopping) { + return; + } + + if (f) { + f->open_object_section("mirror_status"); + f->open_array_section("pool_replayers"); + }; + + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->print_status(f, ss); + } + + if (f) { + f->close_section(); + f->open_object_section("image_deleter"); + } + + m_image_deleter->print_status(f, ss); +} + +void Mirror::start() +{ + dout(20) << "enter" << dendl; + Mutex::Locker l(m_lock); + + if (m_stopping) { + return; + } + + m_manual_stop = false; + + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->start(); + } +} + +void Mirror::stop() +{ + dout(20) << "enter" << dendl; + Mutex::Locker l(m_lock); + + if (m_stopping) { + return; + } + + m_manual_stop = true; + + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->stop(true); + } +} + +void Mirror::restart() +{ + dout(20) << "enter" << dendl; + Mutex::Locker l(m_lock); + + if (m_stopping) { + return; + } + + m_manual_stop = false; + + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->restart(); + } +} + +void Mirror::flush() +{ + dout(20) << "enter" << dendl; + Mutex::Locker l(m_lock); + + if (m_stopping || m_manual_stop) { + return; + } + + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->flush(); + } +} + +void Mirror::release_leader() +{ + dout(20) << "enter" << dendl; + Mutex::Locker l(m_lock); + + if (m_stopping) { + return; + } + + for (auto &pool_replayer : m_pool_replayers) { + pool_replayer.second->release_leader(); + } +} + +void Mirror::update_pool_replayers(const PoolPeers &pool_peers) +{ + dout(20) << "enter" << dendl; + assert(m_lock.is_locked()); + + // remove stale pool replayers before creating new pool replayers + for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) { + auto &peer = it->first.second; + auto pool_peer_it = pool_peers.find(it->first.first); + if (pool_peer_it == pool_peers.end() || + pool_peer_it->second.find(peer) == pool_peer_it->second.end()) { + dout(20) << "removing pool replayer for " << peer << dendl; + // TODO: make async + it->second->shut_down(); + it = m_pool_replayers.erase(it); + } else { + ++it; + } + } + + for (auto &kv : pool_peers) { + for (auto &peer : kv.second) { + PoolPeer pool_peer(kv.first, peer); + + auto pool_replayers_it = m_pool_replayers.find(pool_peer); + if (pool_replayers_it != m_pool_replayers.end()) { + auto& pool_replayer = pool_replayers_it->second; + if (pool_replayer->is_blacklisted()) { + derr << "restarting blacklisted pool replayer for " << peer << dendl; + // TODO: make async + pool_replayer->shut_down(); + pool_replayer->init(); + } else if (!pool_replayer->is_running()) { + derr << "restarting failed pool replayer for " << peer << dendl; + // TODO: make async + pool_replayer->shut_down(); + pool_replayer->init(); + } + } else { + dout(20) << "starting pool replayer for " << peer << dendl; + unique_ptr pool_replayer(new PoolReplayer( + m_threads, m_service_daemon.get(), m_image_deleter.get(), kv.first, + peer, m_args)); + + // TODO: make async + pool_replayer->init(); + m_pool_replayers.emplace(pool_peer, std::move(pool_replayer)); + } + } + + // TODO currently only support a single peer + } +} + +} // namespace mirror +} // namespace rbd