Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd_mirror / PoolWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "tools/rbd_mirror/PoolWatcher.h"
5 #include "include/rbd_types.h"
6 #include "cls/rbd/cls_rbd_client.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "common/Timer.h"
10 #include "librbd/ImageCtx.h"
11 #include "librbd/internal.h"
12 #include "librbd/MirroringWatcher.h"
13 #include "librbd/Utils.h"
14 #include "librbd/api/Image.h"
15 #include "librbd/api/Mirror.h"
16 #include "tools/rbd_mirror/Threads.h"
17 #include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h"
18 #include <boost/bind.hpp>
19
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_rbd_mirror
22 #undef dout_prefix
23 #define dout_prefix *_dout << "rbd::mirror::PoolWatcher: " << this << " " \
24                            << __func__ << ": "
25
26 using std::list;
27 using std::string;
28 using std::unique_ptr;
29 using std::vector;
30 using librbd::util::create_context_callback;
31 using librbd::util::create_rados_callback;
32
33 namespace rbd {
34 namespace mirror {
35
36 template <typename I>
37 class PoolWatcher<I>::MirroringWatcher : public librbd::MirroringWatcher<I> {
38 public:
39   using ContextWQ = typename std::decay<
40     typename std::remove_pointer<
41       decltype(Threads<I>::work_queue)>::type>::type;
42
43   MirroringWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue,
44                    PoolWatcher *pool_watcher)
45     : librbd::MirroringWatcher<I>(io_ctx, work_queue),
46       m_pool_watcher(pool_watcher) {
47   }
48
49   void handle_rewatch_complete(int r) override {
50     m_pool_watcher->handle_rewatch_complete(r);
51   }
52
53   void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) override {
54     // invalidate all image state and refresh the pool contents
55     m_pool_watcher->schedule_refresh_images(5);
56   }
57
58   void handle_image_updated(cls::rbd::MirrorImageState state,
59                             const std::string &remote_image_id,
60                             const std::string &global_image_id) override {
61     bool enabled = (state == cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
62     m_pool_watcher->handle_image_updated(remote_image_id, global_image_id,
63                                          enabled);
64   }
65
66 private:
67   PoolWatcher *m_pool_watcher;
68 };
69
70 template <typename I>
71 PoolWatcher<I>::PoolWatcher(Threads<I> *threads, librados::IoCtx &remote_io_ctx,
72                             Listener &listener)
73   : m_threads(threads), m_remote_io_ctx(remote_io_ctx), m_listener(listener),
74     m_lock(librbd::util::unique_lock_name("rbd::mirror::PoolWatcher", this)) {
75   m_mirroring_watcher = new MirroringWatcher(m_remote_io_ctx,
76                                              m_threads->work_queue, this);
77 }
78
79 template <typename I>
80 PoolWatcher<I>::~PoolWatcher() {
81   delete m_mirroring_watcher;
82 }
83
84 template <typename I>
85 bool PoolWatcher<I>::is_blacklisted() const {
86   Mutex::Locker locker(m_lock);
87   return m_blacklisted;
88 }
89
90 template <typename I>
91 void PoolWatcher<I>::init(Context *on_finish) {
92   dout(5) << dendl;
93
94   {
95     Mutex::Locker locker(m_lock);
96     m_on_init_finish = on_finish;
97
98     assert(!m_refresh_in_progress);
99     m_refresh_in_progress = true;
100   }
101
102   // start async updates for mirror image directory
103   register_watcher();
104 }
105
106 template <typename I>
107 void PoolWatcher<I>::shut_down(Context *on_finish) {
108   dout(5) << dendl;
109
110   {
111     Mutex::Locker timer_locker(m_threads->timer_lock);
112     Mutex::Locker locker(m_lock);
113
114     assert(!m_shutting_down);
115     m_shutting_down = true;
116     if (m_timer_ctx != nullptr) {
117       m_threads->timer->cancel_event(m_timer_ctx);
118       m_timer_ctx = nullptr;
119     }
120   }
121
122   // in-progress unregister tracked as async op
123   unregister_watcher();
124
125   m_async_op_tracker.wait_for_ops(on_finish);
126 }
127
128 template <typename I>
129 void PoolWatcher<I>::register_watcher() {
130   {
131     Mutex::Locker locker(m_lock);
132     assert(m_image_ids_invalid);
133     assert(m_refresh_in_progress);
134   }
135
136   // if the watch registration is in-flight, let the watcher
137   // handle the transition -- only (re-)register if it's not registered
138   if (!m_mirroring_watcher->is_unregistered()) {
139     refresh_images();
140     return;
141   }
142
143   // first time registering or the watch failed
144   dout(5) << dendl;
145   m_async_op_tracker.start_op();
146
147   Context *ctx = create_context_callback<
148     PoolWatcher, &PoolWatcher<I>::handle_register_watcher>(this);
149   m_mirroring_watcher->register_watch(ctx);
150 }
151
152 template <typename I>
153 void PoolWatcher<I>::handle_register_watcher(int r) {
154   dout(5) << "r=" << r << dendl;
155
156   {
157     Mutex::Locker locker(m_lock);
158     assert(m_image_ids_invalid);
159     assert(m_refresh_in_progress);
160     if (r < 0) {
161       m_refresh_in_progress = false;
162     }
163   }
164
165   Context *on_init_finish = nullptr;
166   if (r >= 0) {
167     refresh_images();
168   } else if (r == -EBLACKLISTED) {
169     dout(0) << "detected client is blacklisted" << dendl;
170
171     Mutex::Locker locker(m_lock);
172     m_blacklisted = true;
173     std::swap(on_init_finish, m_on_init_finish);
174   } else if (r == -ENOENT) {
175     dout(5) << "mirroring directory does not exist" << dendl;
176     schedule_refresh_images(30);
177   } else {
178     derr << "unexpected error registering mirroring directory watch: "
179          << cpp_strerror(r) << dendl;
180     schedule_refresh_images(10);
181   }
182
183   m_async_op_tracker.finish_op();
184   if (on_init_finish != nullptr) {
185     on_init_finish->complete(r);
186   }
187 }
188
189 template <typename I>
190 void PoolWatcher<I>::unregister_watcher() {
191   dout(5) << dendl;
192
193   m_async_op_tracker.start_op();
194   Context *ctx = new FunctionContext([this](int r) {
195       dout(5) << "unregister_watcher: r=" << r << dendl;
196       if (r < 0) {
197         derr << "error unregistering watcher for "
198              << m_mirroring_watcher->get_oid() << " object: " << cpp_strerror(r)
199              << dendl;
200       }
201       m_async_op_tracker.finish_op();
202     });
203
204   m_mirroring_watcher->unregister_watch(ctx);
205 }
206
207 template <typename I>
208 void PoolWatcher<I>::refresh_images() {
209   dout(5) << dendl;
210
211   {
212     Mutex::Locker locker(m_lock);
213     assert(m_image_ids_invalid);
214     assert(m_refresh_in_progress);
215
216     // clear all pending notification events since we need to perform
217     // a full image list refresh
218     m_pending_added_image_ids.clear();
219     m_pending_removed_image_ids.clear();
220   }
221
222   m_async_op_tracker.start_op();
223   m_refresh_image_ids.clear();
224   Context *ctx = create_context_callback<
225     PoolWatcher, &PoolWatcher<I>::handle_refresh_images>(this);
226   auto req = pool_watcher::RefreshImagesRequest<I>::create(m_remote_io_ctx,
227                                                            &m_refresh_image_ids,
228                                                            ctx);
229   req->send();
230 }
231
232 template <typename I>
233 void PoolWatcher<I>::handle_refresh_images(int r) {
234   dout(5) << "r=" << r << dendl;
235
236   bool retry_refresh = false;
237   Context *on_init_finish = nullptr;
238   {
239     Mutex::Locker locker(m_lock);
240     assert(m_image_ids_invalid);
241     assert(m_refresh_in_progress);
242
243     if (r >= 0) {
244       m_pending_image_ids = std::move(m_refresh_image_ids);
245     } else if (r == -EBLACKLISTED) {
246       dout(0) << "detected client is blacklisted during image refresh" << dendl;
247
248       m_blacklisted = true;
249       m_refresh_in_progress = false;
250       std::swap(on_init_finish, m_on_init_finish);
251     } else if (r == -ENOENT) {
252       dout(5) << "mirroring directory not found" << dendl;
253       m_pending_image_ids.clear();
254       r = 0;
255     } else {
256       m_refresh_in_progress = false;
257       retry_refresh = true;
258     }
259   }
260
261   if (retry_refresh) {
262     derr << "failed to retrieve mirroring directory: " << cpp_strerror(r)
263          << dendl;
264     schedule_refresh_images(10);
265   } else if (r >= 0) {
266     get_mirror_uuid();
267     return;
268   }
269
270   m_async_op_tracker.finish_op();
271   if (on_init_finish != nullptr) {
272     assert(r == -EBLACKLISTED);
273     on_init_finish->complete(r);
274   }
275 }
276
277 template <typename I>
278 void PoolWatcher<I>::get_mirror_uuid() {
279   dout(5) << dendl;
280
281   librados::ObjectReadOperation op;
282   librbd::cls_client::mirror_uuid_get_start(&op);
283
284   m_out_bl.clear();
285   librados::AioCompletion *aio_comp = create_rados_callback<
286     PoolWatcher, &PoolWatcher<I>::handle_get_mirror_uuid>(this);
287   int r = m_remote_io_ctx.aio_operate(RBD_MIRRORING, aio_comp, &op, &m_out_bl);
288   assert(r == 0);
289   aio_comp->release();
290 }
291
292 template <typename I>
293 void PoolWatcher<I>::handle_get_mirror_uuid(int r) {
294   dout(5) << "r=" << r << dendl;
295
296   bool deferred_refresh = false;
297   bool retry_refresh = false;
298   Context *on_init_finish = nullptr;
299   {
300     Mutex::Locker locker(m_lock);
301     assert(m_image_ids_invalid);
302     assert(m_refresh_in_progress);
303     m_refresh_in_progress = false;
304
305     m_pending_mirror_uuid = "";
306     if (r >= 0) {
307       bufferlist::iterator it = m_out_bl.begin();
308       r = librbd::cls_client::mirror_uuid_get_finish(
309         &it, &m_pending_mirror_uuid);
310     }
311     if (r >= 0 && m_pending_mirror_uuid.empty()) {
312       r = -ENOENT;
313     }
314
315     if (m_deferred_refresh) {
316       // need to refresh -- skip the notification
317       deferred_refresh = true;
318     } else if (r >= 0) {
319       dout(10) << "mirror_uuid=" << m_pending_mirror_uuid << dendl;
320       m_image_ids_invalid = false;
321       std::swap(on_init_finish, m_on_init_finish);
322       schedule_listener();
323     } else if (r == -EBLACKLISTED) {
324       dout(0) << "detected client is blacklisted during image refresh" << dendl;
325
326       m_blacklisted = true;
327       std::swap(on_init_finish, m_on_init_finish);
328     } else if (r == -ENOENT) {
329       dout(5) << "mirroring uuid not found" << dendl;
330       std::swap(on_init_finish, m_on_init_finish);
331       retry_refresh = true;
332     } else {
333       retry_refresh = true;
334     }
335   }
336
337   if (deferred_refresh) {
338     dout(5) << "scheduling deferred refresh" << dendl;
339     schedule_refresh_images(0);
340   } else if (retry_refresh) {
341     derr << "failed to retrieve mirror uuid: " << cpp_strerror(r)
342          << dendl;
343     schedule_refresh_images(10);
344   }
345
346   m_async_op_tracker.finish_op();
347   if (on_init_finish != nullptr) {
348     on_init_finish->complete(r);
349   }
350 }
351
352 template <typename I>
353 void PoolWatcher<I>::schedule_refresh_images(double interval) {
354   Mutex::Locker timer_locker(m_threads->timer_lock);
355   Mutex::Locker locker(m_lock);
356   if (m_shutting_down || m_refresh_in_progress || m_timer_ctx != nullptr) {
357     if (m_refresh_in_progress && !m_deferred_refresh) {
358       dout(5) << "deferring refresh until in-flight refresh completes" << dendl;
359       m_deferred_refresh = true;
360     }
361     return;
362   }
363
364   m_image_ids_invalid = true;
365   m_timer_ctx = m_threads->timer->add_event_after(
366     interval,
367     new FunctionContext([this](int r) {
368         process_refresh_images();
369       }));
370 }
371
372 template <typename I>
373 void PoolWatcher<I>::handle_rewatch_complete(int r) {
374   dout(5) << "r=" << r << dendl;
375
376   if (r == -EBLACKLISTED) {
377     dout(0) << "detected client is blacklisted" << dendl;
378
379     Mutex::Locker locker(m_lock);
380     m_blacklisted = true;
381     return;
382   } else if (r == -ENOENT) {
383     dout(5) << "mirroring directory deleted" << dendl;
384   } else if (r < 0) {
385     derr << "unexpected error re-registering mirroring directory watch: "
386          << cpp_strerror(r) << dendl;
387   }
388
389   schedule_refresh_images(5);
390 }
391
392 template <typename I>
393 void PoolWatcher<I>::handle_image_updated(const std::string &remote_image_id,
394                                        const std::string &global_image_id,
395                                        bool enabled) {
396   dout(10) << "remote_image_id=" << remote_image_id << ", "
397            << "global_image_id=" << global_image_id << ", "
398            << "enabled=" << enabled << dendl;
399
400   Mutex::Locker locker(m_lock);
401   ImageId image_id(global_image_id, remote_image_id);
402   m_pending_added_image_ids.erase(image_id);
403   m_pending_removed_image_ids.erase(image_id);
404
405   if (enabled) {
406     m_pending_added_image_ids.insert(image_id);
407     schedule_listener();
408   } else {
409     m_pending_removed_image_ids.insert(image_id);
410     schedule_listener();
411   }
412 }
413
414 template <typename I>
415 void PoolWatcher<I>::process_refresh_images() {
416   assert(m_threads->timer_lock.is_locked());
417   assert(m_timer_ctx != nullptr);
418   m_timer_ctx = nullptr;
419
420   {
421     Mutex::Locker locker(m_lock);
422     assert(!m_refresh_in_progress);
423     m_refresh_in_progress = true;
424     m_deferred_refresh = false;
425   }
426
427   // execute outside of the timer's lock
428   m_async_op_tracker.start_op();
429   Context *ctx = new FunctionContext([this](int r) {
430       register_watcher();
431       m_async_op_tracker.finish_op();
432     });
433   m_threads->work_queue->queue(ctx, 0);
434 }
435
436 template <typename I>
437 void PoolWatcher<I>::schedule_listener() {
438   assert(m_lock.is_locked());
439   m_pending_updates = true;
440   if (m_shutting_down || m_image_ids_invalid || m_notify_listener_in_progress) {
441     return;
442   }
443
444   dout(20) << dendl;
445
446   m_async_op_tracker.start_op();
447   Context *ctx = new FunctionContext([this](int r) {
448       notify_listener();
449       m_async_op_tracker.finish_op();
450     });
451
452   m_notify_listener_in_progress = true;
453   m_threads->work_queue->queue(ctx, 0);
454 }
455
456 template <typename I>
457 void PoolWatcher<I>::notify_listener() {
458   dout(10) << dendl;
459
460   std::string mirror_uuid;
461   ImageIds added_image_ids;
462   ImageIds removed_image_ids;
463   {
464     Mutex::Locker locker(m_lock);
465     assert(m_notify_listener_in_progress);
466
467     // if the mirror uuid is updated, treat it as the removal of all
468     // images in the pool
469     if (m_mirror_uuid != m_pending_mirror_uuid) {
470       if (!m_mirror_uuid.empty()) {
471         dout(0) << "mirror uuid updated:"
472                 << "old=" << m_mirror_uuid << ", "
473                 << "new=" << m_pending_mirror_uuid << dendl;
474       }
475
476       mirror_uuid = m_mirror_uuid;
477       removed_image_ids = std::move(m_image_ids);
478       m_image_ids.clear();
479     }
480   }
481
482   if (!removed_image_ids.empty()) {
483     m_listener.handle_update(mirror_uuid, {}, std::move(removed_image_ids));
484     removed_image_ids.clear();
485   }
486
487   {
488     Mutex::Locker locker(m_lock);
489     assert(m_notify_listener_in_progress);
490
491     // if the watch failed while we didn't own the lock, we are going
492     // to need to perform a full refresh
493     if (m_image_ids_invalid) {
494       m_notify_listener_in_progress = false;
495       return;
496     }
497
498     // merge add/remove notifications into pending set (a given image
499     // can only be in one set or another)
500     for (auto &image_id : m_pending_removed_image_ids) {
501       dout(20) << "image_id=" << image_id << dendl;
502       m_pending_image_ids.erase(image_id);
503     }
504
505     for (auto &image_id : m_pending_added_image_ids) {
506       dout(20) << "image_id=" << image_id << dendl;
507       m_pending_image_ids.erase(image_id);
508       m_pending_image_ids.insert(image_id);
509     }
510     m_pending_added_image_ids.clear();
511
512     // compute added/removed images
513     for (auto &image_id : m_image_ids) {
514       auto it = m_pending_image_ids.find(image_id);
515       if (it == m_pending_image_ids.end() || it->id != image_id.id) {
516         removed_image_ids.insert(image_id);
517       }
518     }
519     for (auto &image_id : m_pending_image_ids) {
520       auto it = m_image_ids.find(image_id);
521       if (it == m_image_ids.end() || it->id != image_id.id) {
522         added_image_ids.insert(image_id);
523       }
524     }
525
526     m_pending_updates = false;
527     m_image_ids = m_pending_image_ids;
528
529     m_mirror_uuid = m_pending_mirror_uuid;
530     mirror_uuid = m_mirror_uuid;
531   }
532
533   m_listener.handle_update(mirror_uuid, std::move(added_image_ids),
534                            std::move(removed_image_ids));
535
536   {
537     Mutex::Locker locker(m_lock);
538     m_notify_listener_in_progress = false;
539     if (m_pending_updates) {
540       schedule_listener();
541     }
542   }
543 }
544
545 } // namespace mirror
546 } // namespace rbd
547
548 template class rbd::mirror::PoolWatcher<librbd::ImageCtx>;