1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <boost/range/adaptor/map.hpp>
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/debug.h"
9 #include "common/errno.h"
10 #include "librbd/ImageCtx.h"
12 #include "ServiceDaemon.h"
15 #define dout_context g_ceph_context
16 #define dout_subsys ceph_subsys_rbd_mirror
18 #define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
25 using std::unique_ptr;
28 using librados::Rados;
29 using librados::IoCtx;
30 using librbd::mirror_peer_t;
37 class MirrorAdminSocketCommand {
39 virtual ~MirrorAdminSocketCommand() {}
40 virtual bool call(Formatter *f, stringstream *ss) = 0;
43 class StatusCommand : public MirrorAdminSocketCommand {
45 explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
47 bool call(Formatter *f, stringstream *ss) override {
48 mirror->print_status(f, ss);
56 class StartCommand : public MirrorAdminSocketCommand {
58 explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
60 bool call(Formatter *f, stringstream *ss) override {
69 class StopCommand : public MirrorAdminSocketCommand {
71 explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
73 bool call(Formatter *f, stringstream *ss) override {
82 class RestartCommand : public MirrorAdminSocketCommand {
84 explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
86 bool call(Formatter *f, stringstream *ss) override {
95 class FlushCommand : public MirrorAdminSocketCommand {
97 explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
99 bool call(Formatter *f, stringstream *ss) override {
108 class LeaderReleaseCommand : public MirrorAdminSocketCommand {
110 explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
112 bool call(Formatter *f, stringstream *ss) override {
113 mirror->release_leader();
121 } // anonymous namespace
123 class MirrorAdminSocketHook : public AdminSocketHook {
125 MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
126 admin_socket(cct->get_admin_socket()) {
130 command = "rbd mirror status";
131 r = admin_socket->register_command(command, command, this,
132 "get status for rbd mirror");
134 commands[command] = new StatusCommand(mirror);
137 command = "rbd mirror start";
138 r = admin_socket->register_command(command, command, this,
141 commands[command] = new StartCommand(mirror);
144 command = "rbd mirror stop";
145 r = admin_socket->register_command(command, command, this,
148 commands[command] = new StopCommand(mirror);
151 command = "rbd mirror restart";
152 r = admin_socket->register_command(command, command, this,
153 "restart rbd mirror");
155 commands[command] = new RestartCommand(mirror);
158 command = "rbd mirror flush";
159 r = admin_socket->register_command(command, command, this,
162 commands[command] = new FlushCommand(mirror);
165 command = "rbd mirror leader release";
166 r = admin_socket->register_command(command, command, this,
167 "release rbd mirror leader");
169 commands[command] = new LeaderReleaseCommand(mirror);
173 ~MirrorAdminSocketHook() override {
174 for (Commands::const_iterator i = commands.begin(); i != commands.end();
176 (void)admin_socket->unregister_command(i->first);
181 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
182 bufferlist& out) override {
183 Commands::const_iterator i = commands.find(command);
184 assert(i != commands.end());
185 Formatter *f = Formatter::create(format);
187 bool r = i->second->call(f, &ss);
194 typedef std::map<std::string, MirrorAdminSocketCommand*> Commands;
196 AdminSocket *admin_socket;
200 Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
203 m_lock("rbd::mirror::Mirror"),
204 m_local(new librados::Rados()),
205 m_asok_hook(new MirrorAdminSocketHook(cct, this))
207 cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx> >(
208 m_threads, "rbd_mirror::threads");
209 m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
217 void Mirror::handle_signal(int signum)
221 Mutex::Locker l(m_lock);
228 int r = m_local->init_with_context(m_cct);
230 derr << "could not initialize rados handle" << dendl;
234 r = m_local->connect();
236 derr << "error connecting to local cluster" << dendl;
240 r = m_service_daemon->init();
242 derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
246 m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
247 m_service_daemon.get()));
249 m_image_deleter.reset(new ImageDeleter<>(m_threads->work_queue,
251 &m_threads->timer_lock,
252 m_service_daemon.get()));
258 dout(20) << "enter" << dendl;
259 while (!m_stopping) {
260 m_local_cluster_watcher->refresh_pools();
261 Mutex::Locker l(m_lock);
262 if (!m_manual_stop) {
263 update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
267 utime_t(m_cct->_conf->get_val<int64_t>("rbd_mirror_pool_replayers_refresh_interval"), 0));
270 // stop all pool replayers in parallel
271 Mutex::Locker locker(m_lock);
272 for (auto &pool_replayer : m_pool_replayers) {
273 pool_replayer.second->stop(false);
275 dout(20) << "return" << dendl;
278 void Mirror::print_status(Formatter *f, stringstream *ss)
280 dout(20) << "enter" << dendl;
282 Mutex::Locker l(m_lock);
289 f->open_object_section("mirror_status");
290 f->open_array_section("pool_replayers");
293 for (auto &pool_replayer : m_pool_replayers) {
294 pool_replayer.second->print_status(f, ss);
299 f->open_object_section("image_deleter");
302 m_image_deleter->print_status(f, ss);
307 dout(20) << "enter" << dendl;
308 Mutex::Locker l(m_lock);
314 m_manual_stop = false;
316 for (auto &pool_replayer : m_pool_replayers) {
317 pool_replayer.second->start();
323 dout(20) << "enter" << dendl;
324 Mutex::Locker l(m_lock);
330 m_manual_stop = true;
332 for (auto &pool_replayer : m_pool_replayers) {
333 pool_replayer.second->stop(true);
337 void Mirror::restart()
339 dout(20) << "enter" << dendl;
340 Mutex::Locker l(m_lock);
346 m_manual_stop = false;
348 for (auto &pool_replayer : m_pool_replayers) {
349 pool_replayer.second->restart();
355 dout(20) << "enter" << dendl;
356 Mutex::Locker l(m_lock);
358 if (m_stopping || m_manual_stop) {
362 for (auto &pool_replayer : m_pool_replayers) {
363 pool_replayer.second->flush();
367 void Mirror::release_leader()
369 dout(20) << "enter" << dendl;
370 Mutex::Locker l(m_lock);
376 for (auto &pool_replayer : m_pool_replayers) {
377 pool_replayer.second->release_leader();
381 void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
383 dout(20) << "enter" << dendl;
384 assert(m_lock.is_locked());
386 // remove stale pool replayers before creating new pool replayers
387 for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
388 auto &peer = it->first.second;
389 auto pool_peer_it = pool_peers.find(it->first.first);
390 if (pool_peer_it == pool_peers.end() ||
391 pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
392 dout(20) << "removing pool replayer for " << peer << dendl;
394 it->second->shut_down();
395 it = m_pool_replayers.erase(it);
401 for (auto &kv : pool_peers) {
402 for (auto &peer : kv.second) {
403 PoolPeer pool_peer(kv.first, peer);
405 auto pool_replayers_it = m_pool_replayers.find(pool_peer);
406 if (pool_replayers_it != m_pool_replayers.end()) {
407 auto& pool_replayer = pool_replayers_it->second;
408 if (pool_replayer->is_blacklisted()) {
409 derr << "restarting blacklisted pool replayer for " << peer << dendl;
411 pool_replayer->shut_down();
412 pool_replayer->init();
413 } else if (!pool_replayer->is_running()) {
414 derr << "restarting failed pool replayer for " << peer << dendl;
416 pool_replayer->shut_down();
417 pool_replayer->init();
420 dout(20) << "starting pool replayer for " << peer << dendl;
421 unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
422 m_threads, m_service_daemon.get(), m_image_deleter.get(), kv.first,
426 pool_replayer->init();
427 m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
431 // TODO currently only support a single peer
435 } // namespace mirror