X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FMirror.cc;fp=src%2Fceph%2Fsrc%2Ftools%2Frbd_mirror%2FMirror.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=61dc9bf2d89c7615ac4a148949ebc24a8bb9904c;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/tools/rbd_mirror/Mirror.cc b/src/ceph/src/tools/rbd_mirror/Mirror.cc deleted file mode 100644 index 61dc9bf..0000000 --- a/src/ceph/src/tools/rbd_mirror/Mirror.cc +++ /dev/null @@ -1,436 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include - -#include "common/Formatter.h" -#include "common/admin_socket.h" -#include "common/debug.h" -#include "common/errno.h" -#include "librbd/ImageCtx.h" -#include "Mirror.h" -#include "ServiceDaemon.h" -#include "Threads.h" - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_rbd_mirror -#undef dout_prefix -#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \ - << __func__ << ": " - -using std::list; -using std::map; -using std::set; -using std::string; -using std::unique_ptr; -using std::vector; - -using librados::Rados; -using librados::IoCtx; -using librbd::mirror_peer_t; - -namespace rbd { -namespace mirror { - -namespace { - -class MirrorAdminSocketCommand { -public: - virtual ~MirrorAdminSocketCommand() {} - virtual bool call(Formatter *f, stringstream *ss) = 0; -}; - -class StatusCommand : public MirrorAdminSocketCommand { -public: - explicit StatusCommand(Mirror *mirror) : mirror(mirror) {} - - bool call(Formatter *f, stringstream *ss) override { - mirror->print_status(f, ss); - return true; - } - -private: - Mirror *mirror; -}; - -class StartCommand : public MirrorAdminSocketCommand { -public: - explicit StartCommand(Mirror *mirror) : mirror(mirror) {} - - bool call(Formatter *f, stringstream *ss) override { - mirror->start(); - return true; - } - -private: - Mirror *mirror; -}; - -class StopCommand : public MirrorAdminSocketCommand { -public: - explicit StopCommand(Mirror *mirror) : mirror(mirror) {} - - bool call(Formatter *f, stringstream *ss) override { - mirror->stop(); - return true; - } - -private: - Mirror *mirror; -}; - -class RestartCommand : public MirrorAdminSocketCommand { -public: - explicit RestartCommand(Mirror *mirror) : mirror(mirror) {} - - bool call(Formatter *f, stringstream *ss) override { - mirror->restart(); - return true; - } - -private: - Mirror *mirror; -}; - -class FlushCommand : public MirrorAdminSocketCommand { -public: - explicit FlushCommand(Mirror *mirror) : mirror(mirror) {} - - bool call(Formatter *f, stringstream *ss) override { - mirror->flush(); - return true; - } - -private: - Mirror *mirror; -}; - -class LeaderReleaseCommand : public MirrorAdminSocketCommand { -public: - explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {} - - bool call(Formatter *f, stringstream *ss) override { - mirror->release_leader(); - return true; - } - -private: - Mirror *mirror; -}; - -} // anonymous namespace - -class MirrorAdminSocketHook : public AdminSocketHook { -public: - MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) : - admin_socket(cct->get_admin_socket()) { - std::string command; - int r; - - command = "rbd mirror status"; - r = admin_socket->register_command(command, command, this, - "get status for rbd mirror"); - if (r == 0) { - commands[command] = new StatusCommand(mirror); - } - - command = "rbd mirror start"; - r = admin_socket->register_command(command, command, this, - "start rbd mirror"); - if (r == 0) { - commands[command] = new StartCommand(mirror); - } - - command = "rbd mirror stop"; - r = admin_socket->register_command(command, command, this, - "stop rbd mirror"); - if (r == 0) { - commands[command] = new StopCommand(mirror); - } - - command = "rbd mirror restart"; - r = admin_socket->register_command(command, command, this, - "restart rbd mirror"); - if (r == 0) { - commands[command] = new RestartCommand(mirror); - } - - command = "rbd mirror flush"; - r = admin_socket->register_command(command, command, this, - "flush rbd mirror"); - if (r == 0) { - commands[command] = new FlushCommand(mirror); - } - - command = "rbd mirror leader release"; - r = admin_socket->register_command(command, command, this, - "release rbd mirror leader"); - if (r == 0) { - commands[command] = new LeaderReleaseCommand(mirror); - } - } - - ~MirrorAdminSocketHook() override { - for (Commands::const_iterator i = commands.begin(); i != commands.end(); - ++i) { - (void)admin_socket->unregister_command(i->first); - delete i->second; - } - } - - bool call(std::string command, cmdmap_t& cmdmap, std::string format, - bufferlist& out) override { - Commands::const_iterator i = commands.find(command); - assert(i != commands.end()); - Formatter *f = Formatter::create(format); - stringstream ss; - bool r = i->second->call(f, &ss); - delete f; - out.append(ss); - return r; - } - -private: - typedef std::map Commands; - - AdminSocket *admin_socket; - Commands commands; -}; - -Mirror::Mirror(CephContext *cct, const std::vector &args) : - m_cct(cct), - m_args(args), - m_lock("rbd::mirror::Mirror"), - m_local(new librados::Rados()), - m_asok_hook(new MirrorAdminSocketHook(cct, this)) -{ - cct->lookup_or_create_singleton_object >( - m_threads, "rbd_mirror::threads"); - m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads)); -} - -Mirror::~Mirror() -{ - delete m_asok_hook; -} - -void Mirror::handle_signal(int signum) -{ - m_stopping = true; - { - Mutex::Locker l(m_lock); - m_cond.Signal(); - } -} - -int Mirror::init() -{ - int r = m_local->init_with_context(m_cct); - if (r < 0) { - derr << "could not initialize rados handle" << dendl; - return r; - } - - r = m_local->connect(); - if (r < 0) { - derr << "error connecting to local cluster" << dendl; - return r; - } - - r = m_service_daemon->init(); - if (r < 0) { - derr << "error registering service daemon: " << cpp_strerror(r) << dendl; - return r; - } - - m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock, - m_service_daemon.get())); - - m_image_deleter.reset(new ImageDeleter<>(m_threads->work_queue, - m_threads->timer, - &m_threads->timer_lock, - m_service_daemon.get())); - return r; -} - -void Mirror::run() -{ - dout(20) << "enter" << dendl; - while (!m_stopping) { - m_local_cluster_watcher->refresh_pools(); - Mutex::Locker l(m_lock); - if (!m_manual_stop) { - update_pool_replayers(m_local_cluster_watcher->get_pool_peers()); - } - m_cond.WaitInterval( - m_lock, - utime_t(m_cct->_conf->get_val("rbd_mirror_pool_replayers_refresh_interval"), 0)); - } - - // stop all pool replayers in parallel - Mutex::Locker locker(m_lock); - for (auto &pool_replayer : m_pool_replayers) { - pool_replayer.second->stop(false); - } - dout(20) << "return" << dendl; -} - -void Mirror::print_status(Formatter *f, stringstream *ss) -{ - dout(20) << "enter" << dendl; - - Mutex::Locker l(m_lock); - - if (m_stopping) { - return; - } - - if (f) { - f->open_object_section("mirror_status"); - f->open_array_section("pool_replayers"); - }; - - for (auto &pool_replayer : m_pool_replayers) { - pool_replayer.second->print_status(f, ss); - } - - if (f) { - f->close_section(); - f->open_object_section("image_deleter"); - } - - m_image_deleter->print_status(f, ss); -} - -void Mirror::start() -{ - dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); - - if (m_stopping) { - return; - } - - m_manual_stop = false; - - for (auto &pool_replayer : m_pool_replayers) { - pool_replayer.second->start(); - } -} - -void Mirror::stop() -{ - dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); - - if (m_stopping) { - return; - } - - m_manual_stop = true; - - for (auto &pool_replayer : m_pool_replayers) { - pool_replayer.second->stop(true); - } -} - -void Mirror::restart() -{ - dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); - - if (m_stopping) { - return; - } - - m_manual_stop = false; - - for (auto &pool_replayer : m_pool_replayers) { - pool_replayer.second->restart(); - } -} - -void Mirror::flush() -{ - dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); - - if (m_stopping || m_manual_stop) { - return; - } - - for (auto &pool_replayer : m_pool_replayers) { - pool_replayer.second->flush(); - } -} - -void Mirror::release_leader() -{ - dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); - - if (m_stopping) { - return; - } - - for (auto &pool_replayer : m_pool_replayers) { - pool_replayer.second->release_leader(); - } -} - -void Mirror::update_pool_replayers(const PoolPeers &pool_peers) -{ - dout(20) << "enter" << dendl; - assert(m_lock.is_locked()); - - // remove stale pool replayers before creating new pool replayers - for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) { - auto &peer = it->first.second; - auto pool_peer_it = pool_peers.find(it->first.first); - if (pool_peer_it == pool_peers.end() || - pool_peer_it->second.find(peer) == pool_peer_it->second.end()) { - dout(20) << "removing pool replayer for " << peer << dendl; - // TODO: make async - it->second->shut_down(); - it = m_pool_replayers.erase(it); - } else { - ++it; - } - } - - for (auto &kv : pool_peers) { - for (auto &peer : kv.second) { - PoolPeer pool_peer(kv.first, peer); - - auto pool_replayers_it = m_pool_replayers.find(pool_peer); - if (pool_replayers_it != m_pool_replayers.end()) { - auto& pool_replayer = pool_replayers_it->second; - if (pool_replayer->is_blacklisted()) { - derr << "restarting blacklisted pool replayer for " << peer << dendl; - // TODO: make async - pool_replayer->shut_down(); - pool_replayer->init(); - } else if (!pool_replayer->is_running()) { - derr << "restarting failed pool replayer for " << peer << dendl; - // TODO: make async - pool_replayer->shut_down(); - pool_replayer->init(); - } - } else { - dout(20) << "starting pool replayer for " << peer << dendl; - unique_ptr pool_replayer(new PoolReplayer( - m_threads, m_service_daemon.get(), m_image_deleter.get(), kv.first, - peer, m_args)); - - // TODO: make async - pool_replayer->init(); - m_pool_replayers.emplace(pool_peer, std::move(pool_replayer)); - } - } - - // TODO currently only support a single peer - } -} - -} // namespace mirror -} // namespace rbd