Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd_mirror / InstanceReplayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/stringify.h"
5 #include "common/Timer.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "librbd/Utils.h"
9 #include "ImageReplayer.h"
10 #include "InstanceReplayer.h"
11 #include "ServiceDaemon.h"
12 #include "Threads.h"
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
16 #undef dout_prefix
17 #define dout_prefix *_dout << "rbd::mirror::InstanceReplayer: " \
18                            << this << " " << __func__ << ": "
19
20 namespace rbd {
21 namespace mirror {
22
23 namespace {
24
25 const std::string SERVICE_DAEMON_ASSIGNED_COUNT_KEY("image_assigned_count");
26 const std::string SERVICE_DAEMON_WARNING_COUNT_KEY("image_warning_count");
27 const std::string SERVICE_DAEMON_ERROR_COUNT_KEY("image_error_count");
28
29 } // anonymous namespace
30
31 using librbd::util::create_async_context_callback;
32 using librbd::util::create_context_callback;
33
34 template <typename I>
35 InstanceReplayer<I>::InstanceReplayer(
36     Threads<I> *threads, ServiceDaemon<I>* service_daemon,
37     ImageDeleter<I>* image_deleter, RadosRef local_rados,
38     const std::string &local_mirror_uuid, int64_t local_pool_id)
39   : m_threads(threads), m_service_daemon(service_daemon),
40     m_image_deleter(image_deleter), m_local_rados(local_rados),
41     m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id),
42     m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) {
43 }
44
45 template <typename I>
46 InstanceReplayer<I>::~InstanceReplayer() {
47   assert(m_image_state_check_task == nullptr);
48   assert(m_async_op_tracker.empty());
49   assert(m_image_replayers.empty());
50 }
51
52 template <typename I>
53 int InstanceReplayer<I>::init() {
54   C_SaferCond init_ctx;
55   init(&init_ctx);
56   return init_ctx.wait();
57 }
58
59 template <typename I>
60 void InstanceReplayer<I>::init(Context *on_finish) {
61   dout(20) << dendl;
62
63   Context *ctx = new FunctionContext(
64     [this, on_finish] (int r) {
65       {
66         Mutex::Locker timer_locker(m_threads->timer_lock);
67         schedule_image_state_check_task();
68       }
69       on_finish->complete(0);
70     });
71
72   m_threads->work_queue->queue(ctx, 0);
73 }
74
75 template <typename I>
76 void InstanceReplayer<I>::shut_down() {
77   C_SaferCond shut_down_ctx;
78   shut_down(&shut_down_ctx);
79   int r = shut_down_ctx.wait();
80   assert(r == 0);
81 }
82
83 template <typename I>
84 void InstanceReplayer<I>::shut_down(Context *on_finish) {
85   dout(20) << dendl;
86
87   Mutex::Locker locker(m_lock);
88
89   assert(m_on_shut_down == nullptr);
90   m_on_shut_down = on_finish;
91
92   Context *ctx = new FunctionContext(
93     [this] (int r) {
94       cancel_image_state_check_task();
95       wait_for_ops();
96     });
97
98   m_threads->work_queue->queue(ctx, 0);
99 }
100
101 template <typename I>
102 void InstanceReplayer<I>::add_peer(std::string peer_uuid,
103                                    librados::IoCtx io_ctx) {
104   dout(20) << peer_uuid << dendl;
105
106   Mutex::Locker locker(m_lock);
107   auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second;
108   assert(result);
109 }
110
111 template <typename I>
112 void InstanceReplayer<I>::release_all(Context *on_finish) {
113   dout(20) << dendl;
114
115   Mutex::Locker locker(m_lock);
116
117   C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish);
118   for (auto it = m_image_replayers.begin(); it != m_image_replayers.end();
119        it = m_image_replayers.erase(it)) {
120     auto image_replayer = it->second;
121     auto ctx = gather_ctx->new_sub();
122     ctx = new FunctionContext(
123       [image_replayer, ctx] (int r) {
124         image_replayer->destroy();
125         ctx->complete(0);
126       });
127     stop_image_replayer(image_replayer, ctx);
128   }
129   gather_ctx->activate();
130 }
131
132 template <typename I>
133 void InstanceReplayer<I>::acquire_image(InstanceWatcher<I> *instance_watcher,
134                                         const std::string &global_image_id,
135                                         Context *on_finish) {
136   dout(20) << "global_image_id=" << global_image_id << dendl;
137
138   Mutex::Locker locker(m_lock);
139
140   assert(m_on_shut_down == nullptr);
141
142   auto it = m_image_replayers.find(global_image_id);
143   if (it == m_image_replayers.end()) {
144     auto image_replayer = ImageReplayer<I>::create(
145         m_threads, m_image_deleter, instance_watcher, m_local_rados,
146         m_local_mirror_uuid, m_local_pool_id, global_image_id);
147
148     dout(20) << global_image_id << ": creating replayer " << image_replayer
149              << dendl;
150
151     it = m_image_replayers.insert(std::make_pair(global_image_id,
152                                                  image_replayer)).first;
153
154     // TODO only a single peer is currently supported
155     assert(m_peers.size() == 1);
156     auto peer = *m_peers.begin();
157     image_replayer->add_peer(peer.peer_uuid, peer.io_ctx);
158   }
159
160   auto& image_replayer = it->second;
161   // TODO temporary until policy integrated
162   image_replayer->set_finished(false);
163
164   start_image_replayer(image_replayer);
165   m_threads->work_queue->queue(on_finish, 0);
166 }
167
168 template <typename I>
169 void InstanceReplayer<I>::release_image(const std::string &global_image_id,
170                                         Context *on_finish) {
171   dout(20) << "global_image_id=" << global_image_id << dendl;
172
173   Mutex::Locker locker(m_lock);
174   assert(m_on_shut_down == nullptr);
175
176   auto it = m_image_replayers.find(global_image_id);
177   if (it == m_image_replayers.end()) {
178     dout(20) << global_image_id << ": not found" << dendl;
179     m_threads->work_queue->queue(on_finish, 0);
180     return;
181   }
182
183   auto image_replayer = it->second;
184   m_image_replayers.erase(it);
185
186   on_finish = new FunctionContext(
187     [image_replayer, on_finish] (int r) {
188       image_replayer->destroy();
189       on_finish->complete(0);
190     });
191   stop_image_replayer(image_replayer, on_finish);
192 }
193
194 template <typename I>
195 void InstanceReplayer<I>::remove_peer_image(const std::string &global_image_id,
196                                             const std::string &peer_mirror_uuid,
197                                             Context *on_finish) {
198   dout(20) << "global_image_id=" << global_image_id << ", "
199            << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
200
201   Mutex::Locker locker(m_lock);
202   assert(m_on_shut_down == nullptr);
203
204   auto it = m_image_replayers.find(global_image_id);
205   if (it != m_image_replayers.end()) {
206     // TODO only a single peer is currently supported, therefore
207     // we can just interrupt the current image replayer and
208     // it will eventually detect that the peer image is missing and
209     // determine if a delete propagation is required.
210     auto image_replayer = it->second;
211     image_replayer->restart();
212   }
213   m_threads->work_queue->queue(on_finish, 0);
214 }
215
216 template <typename I>
217 void InstanceReplayer<I>::print_status(Formatter *f, stringstream *ss) {
218   dout(20) << dendl;
219
220   if (!f) {
221     return;
222   }
223
224   Mutex::Locker locker(m_lock);
225
226   f->open_array_section("image_replayers");
227   for (auto &kv : m_image_replayers) {
228     auto &image_replayer = kv.second;
229     image_replayer->print_status(f, ss);
230   }
231   f->close_section();
232 }
233
234 template <typename I>
235 void InstanceReplayer<I>::start()
236 {
237   dout(20) << dendl;
238
239   Mutex::Locker locker(m_lock);
240
241   m_manual_stop = false;
242
243   for (auto &kv : m_image_replayers) {
244     auto &image_replayer = kv.second;
245     image_replayer->start(nullptr, true);
246   }
247 }
248
249 template <typename I>
250 void InstanceReplayer<I>::stop()
251 {
252   dout(20) << dendl;
253
254   Mutex::Locker locker(m_lock);
255
256   m_manual_stop = true;
257
258   for (auto &kv : m_image_replayers) {
259     auto &image_replayer = kv.second;
260     image_replayer->stop(nullptr, true);
261   }
262 }
263
264 template <typename I>
265 void InstanceReplayer<I>::restart()
266 {
267   dout(20) << dendl;
268
269   Mutex::Locker locker(m_lock);
270
271   m_manual_stop = false;
272
273   for (auto &kv : m_image_replayers) {
274     auto &image_replayer = kv.second;
275     image_replayer->restart();
276   }
277 }
278
279 template <typename I>
280 void InstanceReplayer<I>::flush()
281 {
282   dout(20) << "enter" << dendl;
283
284   Mutex::Locker locker(m_lock);
285
286   for (auto &kv : m_image_replayers) {
287     auto &image_replayer = kv.second;
288     image_replayer->flush();
289   }
290 }
291
292 template <typename I>
293 void InstanceReplayer<I>::start_image_replayer(
294     ImageReplayer<I> *image_replayer) {
295   assert(m_lock.is_locked());
296
297   std::string global_image_id = image_replayer->get_global_image_id();
298   dout(20) << "global_image_id=" << global_image_id << dendl;
299
300   if (!image_replayer->is_stopped()) {
301     return;
302   } else if (image_replayer->is_blacklisted()) {
303     derr << "blacklisted detected during image replay" << dendl;
304     return;
305   } else if (image_replayer->is_finished()) {
306     // TODO temporary until policy integrated
307     dout(5) << "removing image replayer for global_image_id="
308             << global_image_id << dendl;
309     m_image_replayers.erase(image_replayer->get_global_image_id());
310     image_replayer->destroy();
311     return;
312   }
313
314   image_replayer->start(nullptr, false);
315 }
316
317 template <typename I>
318 void InstanceReplayer<I>::queue_start_image_replayers() {
319   dout(20) << dendl;
320
321   Context *ctx = create_context_callback<
322     InstanceReplayer, &InstanceReplayer<I>::start_image_replayers>(this);
323   m_async_op_tracker.start_op();
324   m_threads->work_queue->queue(ctx, 0);
325 }
326
327 template <typename I>
328 void InstanceReplayer<I>::start_image_replayers(int r) {
329   dout(20) << dendl;
330
331   Mutex::Locker locker(m_lock);
332   if (m_on_shut_down != nullptr) {
333     return;
334   }
335
336   uint64_t image_count = 0;
337   uint64_t warning_count = 0;
338   uint64_t error_count = 0;
339   for (auto it = m_image_replayers.begin();
340        it != m_image_replayers.end();) {
341     auto current_it(it);
342     ++it;
343
344     ++image_count;
345     auto health_state = current_it->second->get_health_state();
346     if (health_state == image_replayer::HEALTH_STATE_WARNING) {
347       ++warning_count;
348     } else if (health_state == image_replayer::HEALTH_STATE_ERROR) {
349       ++error_count;
350     }
351
352     start_image_replayer(current_it->second);
353   }
354
355   m_service_daemon->add_or_update_attribute(
356     m_local_pool_id, SERVICE_DAEMON_ASSIGNED_COUNT_KEY, image_count);
357   m_service_daemon->add_or_update_attribute(
358     m_local_pool_id, SERVICE_DAEMON_WARNING_COUNT_KEY, warning_count);
359   m_service_daemon->add_or_update_attribute(
360     m_local_pool_id, SERVICE_DAEMON_ERROR_COUNT_KEY, error_count);
361
362   m_async_op_tracker.finish_op();
363 }
364
365 template <typename I>
366 void InstanceReplayer<I>::stop_image_replayer(ImageReplayer<I> *image_replayer,
367                                               Context *on_finish) {
368   dout(20) << image_replayer << " global_image_id="
369            << image_replayer->get_global_image_id() << ", on_finish="
370            << on_finish << dendl;
371
372   if (image_replayer->is_stopped()) {
373     m_threads->work_queue->queue(on_finish, 0);
374     return;
375   }
376
377   m_async_op_tracker.start_op();
378   Context *ctx = create_async_context_callback(
379     m_threads->work_queue, new FunctionContext(
380       [this, image_replayer, on_finish] (int r) {
381         stop_image_replayer(image_replayer, on_finish);
382         m_async_op_tracker.finish_op();
383       }));
384
385   if (image_replayer->is_running()) {
386     image_replayer->stop(ctx, false);
387   } else {
388     int after = 1;
389     dout(20) << "scheduling image replayer " << image_replayer << " stop after "
390              << after << " sec (task " << ctx << ")" << dendl;
391     ctx = new FunctionContext(
392       [this, after, ctx] (int r) {
393         Mutex::Locker timer_locker(m_threads->timer_lock);
394         m_threads->timer->add_event_after(after, ctx);
395       });
396     m_threads->work_queue->queue(ctx, 0);
397   }
398 }
399
400 template <typename I>
401 void InstanceReplayer<I>::wait_for_ops() {
402   dout(20) << dendl;
403
404   Context *ctx = create_context_callback<
405     InstanceReplayer, &InstanceReplayer<I>::handle_wait_for_ops>(this);
406
407   m_async_op_tracker.wait_for_ops(ctx);
408 }
409
410 template <typename I>
411 void InstanceReplayer<I>::handle_wait_for_ops(int r) {
412   dout(20) << "r=" << r << dendl;
413
414   assert(r == 0);
415
416   Mutex::Locker locker(m_lock);
417   stop_image_replayers();
418 }
419
420 template <typename I>
421 void InstanceReplayer<I>::stop_image_replayers() {
422   dout(20) << dendl;
423
424   assert(m_lock.is_locked());
425
426   Context *ctx = create_async_context_callback(
427     m_threads->work_queue, create_context_callback<InstanceReplayer<I>,
428     &InstanceReplayer<I>::handle_stop_image_replayers>(this));
429
430   C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
431   for (auto &it : m_image_replayers) {
432     stop_image_replayer(it.second, gather_ctx->new_sub());
433   }
434   gather_ctx->activate();
435 }
436
437 template <typename I>
438 void InstanceReplayer<I>::handle_stop_image_replayers(int r) {
439   dout(20) << "r=" << r << dendl;
440
441   assert(r == 0);
442
443   Context *on_finish = nullptr;
444   {
445     Mutex::Locker locker(m_lock);
446
447     for (auto &it : m_image_replayers) {
448       assert(it.second->is_stopped());
449       it.second->destroy();
450     }
451     m_image_replayers.clear();
452
453     assert(m_on_shut_down != nullptr);
454     std::swap(on_finish, m_on_shut_down);
455   }
456   on_finish->complete(r);
457 }
458
459 template <typename I>
460 void InstanceReplayer<I>::cancel_image_state_check_task() {
461   Mutex::Locker timer_locker(m_threads->timer_lock);
462
463   if (m_image_state_check_task == nullptr) {
464     return;
465   }
466
467   dout(20) << m_image_state_check_task << dendl;
468   bool canceled = m_threads->timer->cancel_event(m_image_state_check_task);
469   assert(canceled);
470   m_image_state_check_task = nullptr;
471 }
472
473 template <typename I>
474 void InstanceReplayer<I>::schedule_image_state_check_task() {
475   assert(m_threads->timer_lock.is_locked());
476   assert(m_image_state_check_task == nullptr);
477
478   m_image_state_check_task = new FunctionContext(
479     [this](int r) {
480       assert(m_threads->timer_lock.is_locked());
481       m_image_state_check_task = nullptr;
482       schedule_image_state_check_task();
483       queue_start_image_replayers();
484     });
485
486   int after = g_ceph_context->_conf->get_val<int64_t>(
487     "rbd_mirror_image_state_check_interval");
488
489   dout(20) << "scheduling image state check after " << after << " sec (task "
490            << m_image_state_check_task << ")" << dendl;
491   m_threads->timer->add_event_after(after, m_image_state_check_task);
492 }
493
494 } // namespace mirror
495 } // namespace rbd
496
497 template class rbd::mirror::InstanceReplayer<librbd::ImageCtx>;