Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd_mirror / Mirror.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <boost/range/adaptor/map.hpp>
5
6 #include "common/Formatter.h"
7 #include "common/admin_socket.h"
8 #include "common/debug.h"
9 #include "common/errno.h"
10 #include "librbd/ImageCtx.h"
11 #include "Mirror.h"
12 #include "ServiceDaemon.h"
13 #include "Threads.h"
14
15 #define dout_context g_ceph_context
16 #define dout_subsys ceph_subsys_rbd_mirror
17 #undef dout_prefix
18 #define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
19                            << __func__ << ": "
20
21 using std::list;
22 using std::map;
23 using std::set;
24 using std::string;
25 using std::unique_ptr;
26 using std::vector;
27
28 using librados::Rados;
29 using librados::IoCtx;
30 using librbd::mirror_peer_t;
31
32 namespace rbd {
33 namespace mirror {
34
35 namespace {
36
37 class MirrorAdminSocketCommand {
38 public:
39   virtual ~MirrorAdminSocketCommand() {}
40   virtual bool call(Formatter *f, stringstream *ss) = 0;
41 };
42
43 class StatusCommand : public MirrorAdminSocketCommand {
44 public:
45   explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
46
47   bool call(Formatter *f, stringstream *ss) override {
48     mirror->print_status(f, ss);
49     return true;
50   }
51
52 private:
53   Mirror *mirror;
54 };
55
56 class StartCommand : public MirrorAdminSocketCommand {
57 public:
58   explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
59
60   bool call(Formatter *f, stringstream *ss) override {
61     mirror->start();
62     return true;
63   }
64
65 private:
66   Mirror *mirror;
67 };
68
69 class StopCommand : public MirrorAdminSocketCommand {
70 public:
71   explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
72
73   bool call(Formatter *f, stringstream *ss) override {
74     mirror->stop();
75     return true;
76   }
77
78 private:
79   Mirror *mirror;
80 };
81
82 class RestartCommand : public MirrorAdminSocketCommand {
83 public:
84   explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
85
86   bool call(Formatter *f, stringstream *ss) override {
87     mirror->restart();
88     return true;
89   }
90
91 private:
92   Mirror *mirror;
93 };
94
95 class FlushCommand : public MirrorAdminSocketCommand {
96 public:
97   explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
98
99   bool call(Formatter *f, stringstream *ss) override {
100     mirror->flush();
101     return true;
102   }
103
104 private:
105   Mirror *mirror;
106 };
107
108 class LeaderReleaseCommand : public MirrorAdminSocketCommand {
109 public:
110   explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
111
112   bool call(Formatter *f, stringstream *ss) override {
113     mirror->release_leader();
114     return true;
115   }
116
117 private:
118   Mirror *mirror;
119 };
120
121 } // anonymous namespace
122
123 class MirrorAdminSocketHook : public AdminSocketHook {
124 public:
125   MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
126     admin_socket(cct->get_admin_socket()) {
127     std::string command;
128     int r;
129
130     command = "rbd mirror status";
131     r = admin_socket->register_command(command, command, this,
132                                        "get status for rbd mirror");
133     if (r == 0) {
134       commands[command] = new StatusCommand(mirror);
135     }
136
137     command = "rbd mirror start";
138     r = admin_socket->register_command(command, command, this,
139                                        "start rbd mirror");
140     if (r == 0) {
141       commands[command] = new StartCommand(mirror);
142     }
143
144     command = "rbd mirror stop";
145     r = admin_socket->register_command(command, command, this,
146                                        "stop rbd mirror");
147     if (r == 0) {
148       commands[command] = new StopCommand(mirror);
149     }
150
151     command = "rbd mirror restart";
152     r = admin_socket->register_command(command, command, this,
153                                        "restart rbd mirror");
154     if (r == 0) {
155       commands[command] = new RestartCommand(mirror);
156     }
157
158     command = "rbd mirror flush";
159     r = admin_socket->register_command(command, command, this,
160                                        "flush rbd mirror");
161     if (r == 0) {
162       commands[command] = new FlushCommand(mirror);
163     }
164
165     command = "rbd mirror leader release";
166     r = admin_socket->register_command(command, command, this,
167                                        "release rbd mirror leader");
168     if (r == 0) {
169       commands[command] = new LeaderReleaseCommand(mirror);
170     }
171   }
172
173   ~MirrorAdminSocketHook() override {
174     for (Commands::const_iterator i = commands.begin(); i != commands.end();
175          ++i) {
176       (void)admin_socket->unregister_command(i->first);
177       delete i->second;
178     }
179   }
180
181   bool call(std::string command, cmdmap_t& cmdmap, std::string format,
182             bufferlist& out) override {
183     Commands::const_iterator i = commands.find(command);
184     assert(i != commands.end());
185     Formatter *f = Formatter::create(format);
186     stringstream ss;
187     bool r = i->second->call(f, &ss);
188     delete f;
189     out.append(ss);
190     return r;
191   }
192
193 private:
194   typedef std::map<std::string, MirrorAdminSocketCommand*> Commands;
195
196   AdminSocket *admin_socket;
197   Commands commands;
198 };
199
200 Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
201   m_cct(cct),
202   m_args(args),
203   m_lock("rbd::mirror::Mirror"),
204   m_local(new librados::Rados()),
205   m_asok_hook(new MirrorAdminSocketHook(cct, this))
206 {
207   cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx> >(
208     m_threads, "rbd_mirror::threads");
209   m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
210 }
211
212 Mirror::~Mirror()
213 {
214   delete m_asok_hook;
215 }
216
217 void Mirror::handle_signal(int signum)
218 {
219   m_stopping = true;
220   {
221     Mutex::Locker l(m_lock);
222     m_cond.Signal();
223   }
224 }
225
226 int Mirror::init()
227 {
228   int r = m_local->init_with_context(m_cct);
229   if (r < 0) {
230     derr << "could not initialize rados handle" << dendl;
231     return r;
232   }
233
234   r = m_local->connect();
235   if (r < 0) {
236     derr << "error connecting to local cluster" << dendl;
237     return r;
238   }
239
240   r = m_service_daemon->init();
241   if (r < 0) {
242     derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
243     return r;
244   }
245
246   m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
247                                                    m_service_daemon.get()));
248
249   m_image_deleter.reset(new ImageDeleter<>(m_threads->work_queue,
250                                            m_threads->timer,
251                                            &m_threads->timer_lock,
252                                            m_service_daemon.get()));
253   return r;
254 }
255
256 void Mirror::run()
257 {
258   dout(20) << "enter" << dendl;
259   while (!m_stopping) {
260     m_local_cluster_watcher->refresh_pools();
261     Mutex::Locker l(m_lock);
262     if (!m_manual_stop) {
263       update_pool_replayers(m_local_cluster_watcher->get_pool_peers());
264     }
265     m_cond.WaitInterval(
266       m_lock,
267       utime_t(m_cct->_conf->get_val<int64_t>("rbd_mirror_pool_replayers_refresh_interval"), 0));
268   }
269
270   // stop all pool replayers in parallel
271   Mutex::Locker locker(m_lock);
272   for (auto &pool_replayer : m_pool_replayers) {
273     pool_replayer.second->stop(false);
274   }
275   dout(20) << "return" << dendl;
276 }
277
278 void Mirror::print_status(Formatter *f, stringstream *ss)
279 {
280   dout(20) << "enter" << dendl;
281
282   Mutex::Locker l(m_lock);
283
284   if (m_stopping) {
285     return;
286   }
287
288   if (f) {
289     f->open_object_section("mirror_status");
290     f->open_array_section("pool_replayers");
291   };
292
293   for (auto &pool_replayer : m_pool_replayers) {
294     pool_replayer.second->print_status(f, ss);
295   }
296
297   if (f) {
298     f->close_section();
299     f->open_object_section("image_deleter");
300   }
301
302   m_image_deleter->print_status(f, ss);
303 }
304
305 void Mirror::start()
306 {
307   dout(20) << "enter" << dendl;
308   Mutex::Locker l(m_lock);
309
310   if (m_stopping) {
311     return;
312   }
313
314   m_manual_stop = false;
315
316   for (auto &pool_replayer : m_pool_replayers) {
317     pool_replayer.second->start();
318   }
319 }
320
321 void Mirror::stop()
322 {
323   dout(20) << "enter" << dendl;
324   Mutex::Locker l(m_lock);
325
326   if (m_stopping) {
327     return;
328   }
329
330   m_manual_stop = true;
331
332   for (auto &pool_replayer : m_pool_replayers) {
333     pool_replayer.second->stop(true);
334   }
335 }
336
337 void Mirror::restart()
338 {
339   dout(20) << "enter" << dendl;
340   Mutex::Locker l(m_lock);
341
342   if (m_stopping) {
343     return;
344   }
345
346   m_manual_stop = false;
347
348   for (auto &pool_replayer : m_pool_replayers) {
349     pool_replayer.second->restart();
350   }
351 }
352
353 void Mirror::flush()
354 {
355   dout(20) << "enter" << dendl;
356   Mutex::Locker l(m_lock);
357
358   if (m_stopping || m_manual_stop) {
359     return;
360   }
361
362   for (auto &pool_replayer : m_pool_replayers) {
363     pool_replayer.second->flush();
364   }
365 }
366
367 void Mirror::release_leader()
368 {
369   dout(20) << "enter" << dendl;
370   Mutex::Locker l(m_lock);
371
372   if (m_stopping) {
373     return;
374   }
375
376   for (auto &pool_replayer : m_pool_replayers) {
377     pool_replayer.second->release_leader();
378   }
379 }
380
381 void Mirror::update_pool_replayers(const PoolPeers &pool_peers)
382 {
383   dout(20) << "enter" << dendl;
384   assert(m_lock.is_locked());
385
386   // remove stale pool replayers before creating new pool replayers
387   for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
388     auto &peer = it->first.second;
389     auto pool_peer_it = pool_peers.find(it->first.first);
390     if (pool_peer_it == pool_peers.end() ||
391         pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
392       dout(20) << "removing pool replayer for " << peer << dendl;
393       // TODO: make async
394       it->second->shut_down();
395       it = m_pool_replayers.erase(it);
396     } else {
397       ++it;
398     }
399   }
400
401   for (auto &kv : pool_peers) {
402     for (auto &peer : kv.second) {
403       PoolPeer pool_peer(kv.first, peer);
404
405       auto pool_replayers_it = m_pool_replayers.find(pool_peer);
406       if (pool_replayers_it != m_pool_replayers.end()) {
407         auto& pool_replayer = pool_replayers_it->second;
408         if (pool_replayer->is_blacklisted()) {
409           derr << "restarting blacklisted pool replayer for " << peer << dendl;
410           // TODO: make async
411           pool_replayer->shut_down();
412           pool_replayer->init();
413         } else if (!pool_replayer->is_running()) {
414           derr << "restarting failed pool replayer for " << peer << dendl;
415           // TODO: make async
416           pool_replayer->shut_down();
417           pool_replayer->init();
418         }
419       } else {
420         dout(20) << "starting pool replayer for " << peer << dendl;
421         unique_ptr<PoolReplayer> pool_replayer(new PoolReplayer(
422           m_threads, m_service_daemon.get(), m_image_deleter.get(), kv.first,
423           peer, m_args));
424
425         // TODO: make async
426         pool_replayer->init();
427         m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
428       }
429     }
430
431     // TODO currently only support a single peer
432   }
433 }
434
435 } // namespace mirror
436 } // namespace rbd