1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "tools/rbd_mirror/PoolWatcher.h"
5 #include "include/rbd_types.h"
6 #include "cls/rbd/cls_rbd_client.h"
7 #include "common/debug.h"
8 #include "common/errno.h"
9 #include "common/Timer.h"
10 #include "librbd/ImageCtx.h"
11 #include "librbd/internal.h"
12 #include "librbd/MirroringWatcher.h"
13 #include "librbd/Utils.h"
14 #include "librbd/api/Image.h"
15 #include "librbd/api/Mirror.h"
16 #include "tools/rbd_mirror/Threads.h"
17 #include "tools/rbd_mirror/pool_watcher/RefreshImagesRequest.h"
18 #include <boost/bind.hpp>
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_rbd_mirror
23 #define dout_prefix *_dout << "rbd::mirror::PoolWatcher: " << this << " " \
28 using std::unique_ptr;
30 using librbd::util::create_context_callback;
31 using librbd::util::create_rados_callback;
37 class PoolWatcher<I>::MirroringWatcher : public librbd::MirroringWatcher<I> {
39 using ContextWQ = typename std::decay<
40 typename std::remove_pointer<
41 decltype(Threads<I>::work_queue)>::type>::type;
43 MirroringWatcher(librados::IoCtx &io_ctx, ContextWQ *work_queue,
44 PoolWatcher *pool_watcher)
45 : librbd::MirroringWatcher<I>(io_ctx, work_queue),
46 m_pool_watcher(pool_watcher) {
49 void handle_rewatch_complete(int r) override {
50 m_pool_watcher->handle_rewatch_complete(r);
53 void handle_mode_updated(cls::rbd::MirrorMode mirror_mode) override {
54 // invalidate all image state and refresh the pool contents
55 m_pool_watcher->schedule_refresh_images(5);
58 void handle_image_updated(cls::rbd::MirrorImageState state,
59 const std::string &remote_image_id,
60 const std::string &global_image_id) override {
61 bool enabled = (state == cls::rbd::MIRROR_IMAGE_STATE_ENABLED);
62 m_pool_watcher->handle_image_updated(remote_image_id, global_image_id,
67 PoolWatcher *m_pool_watcher;
71 PoolWatcher<I>::PoolWatcher(Threads<I> *threads, librados::IoCtx &remote_io_ctx,
73 : m_threads(threads), m_remote_io_ctx(remote_io_ctx), m_listener(listener),
74 m_lock(librbd::util::unique_lock_name("rbd::mirror::PoolWatcher", this)) {
75 m_mirroring_watcher = new MirroringWatcher(m_remote_io_ctx,
76 m_threads->work_queue, this);
80 PoolWatcher<I>::~PoolWatcher() {
81 delete m_mirroring_watcher;
85 bool PoolWatcher<I>::is_blacklisted() const {
86 Mutex::Locker locker(m_lock);
91 void PoolWatcher<I>::init(Context *on_finish) {
95 Mutex::Locker locker(m_lock);
96 m_on_init_finish = on_finish;
98 assert(!m_refresh_in_progress);
99 m_refresh_in_progress = true;
102 // start async updates for mirror image directory
106 template <typename I>
107 void PoolWatcher<I>::shut_down(Context *on_finish) {
111 Mutex::Locker timer_locker(m_threads->timer_lock);
112 Mutex::Locker locker(m_lock);
114 assert(!m_shutting_down);
115 m_shutting_down = true;
116 if (m_timer_ctx != nullptr) {
117 m_threads->timer->cancel_event(m_timer_ctx);
118 m_timer_ctx = nullptr;
122 // in-progress unregister tracked as async op
123 unregister_watcher();
125 m_async_op_tracker.wait_for_ops(on_finish);
128 template <typename I>
129 void PoolWatcher<I>::register_watcher() {
131 Mutex::Locker locker(m_lock);
132 assert(m_image_ids_invalid);
133 assert(m_refresh_in_progress);
136 // if the watch registration is in-flight, let the watcher
137 // handle the transition -- only (re-)register if it's not registered
138 if (!m_mirroring_watcher->is_unregistered()) {
143 // first time registering or the watch failed
145 m_async_op_tracker.start_op();
147 Context *ctx = create_context_callback<
148 PoolWatcher, &PoolWatcher<I>::handle_register_watcher>(this);
149 m_mirroring_watcher->register_watch(ctx);
152 template <typename I>
153 void PoolWatcher<I>::handle_register_watcher(int r) {
154 dout(5) << "r=" << r << dendl;
157 Mutex::Locker locker(m_lock);
158 assert(m_image_ids_invalid);
159 assert(m_refresh_in_progress);
161 m_refresh_in_progress = false;
165 Context *on_init_finish = nullptr;
168 } else if (r == -EBLACKLISTED) {
169 dout(0) << "detected client is blacklisted" << dendl;
171 Mutex::Locker locker(m_lock);
172 m_blacklisted = true;
173 std::swap(on_init_finish, m_on_init_finish);
174 } else if (r == -ENOENT) {
175 dout(5) << "mirroring directory does not exist" << dendl;
176 schedule_refresh_images(30);
178 derr << "unexpected error registering mirroring directory watch: "
179 << cpp_strerror(r) << dendl;
180 schedule_refresh_images(10);
183 m_async_op_tracker.finish_op();
184 if (on_init_finish != nullptr) {
185 on_init_finish->complete(r);
189 template <typename I>
190 void PoolWatcher<I>::unregister_watcher() {
193 m_async_op_tracker.start_op();
194 Context *ctx = new FunctionContext([this](int r) {
195 dout(5) << "unregister_watcher: r=" << r << dendl;
197 derr << "error unregistering watcher for "
198 << m_mirroring_watcher->get_oid() << " object: " << cpp_strerror(r)
201 m_async_op_tracker.finish_op();
204 m_mirroring_watcher->unregister_watch(ctx);
207 template <typename I>
208 void PoolWatcher<I>::refresh_images() {
212 Mutex::Locker locker(m_lock);
213 assert(m_image_ids_invalid);
214 assert(m_refresh_in_progress);
216 // clear all pending notification events since we need to perform
217 // a full image list refresh
218 m_pending_added_image_ids.clear();
219 m_pending_removed_image_ids.clear();
222 m_async_op_tracker.start_op();
223 m_refresh_image_ids.clear();
224 Context *ctx = create_context_callback<
225 PoolWatcher, &PoolWatcher<I>::handle_refresh_images>(this);
226 auto req = pool_watcher::RefreshImagesRequest<I>::create(m_remote_io_ctx,
227 &m_refresh_image_ids,
232 template <typename I>
233 void PoolWatcher<I>::handle_refresh_images(int r) {
234 dout(5) << "r=" << r << dendl;
236 bool retry_refresh = false;
237 Context *on_init_finish = nullptr;
239 Mutex::Locker locker(m_lock);
240 assert(m_image_ids_invalid);
241 assert(m_refresh_in_progress);
244 m_pending_image_ids = std::move(m_refresh_image_ids);
245 } else if (r == -EBLACKLISTED) {
246 dout(0) << "detected client is blacklisted during image refresh" << dendl;
248 m_blacklisted = true;
249 m_refresh_in_progress = false;
250 std::swap(on_init_finish, m_on_init_finish);
251 } else if (r == -ENOENT) {
252 dout(5) << "mirroring directory not found" << dendl;
253 m_pending_image_ids.clear();
256 m_refresh_in_progress = false;
257 retry_refresh = true;
262 derr << "failed to retrieve mirroring directory: " << cpp_strerror(r)
264 schedule_refresh_images(10);
270 m_async_op_tracker.finish_op();
271 if (on_init_finish != nullptr) {
272 assert(r == -EBLACKLISTED);
273 on_init_finish->complete(r);
277 template <typename I>
278 void PoolWatcher<I>::get_mirror_uuid() {
281 librados::ObjectReadOperation op;
282 librbd::cls_client::mirror_uuid_get_start(&op);
285 librados::AioCompletion *aio_comp = create_rados_callback<
286 PoolWatcher, &PoolWatcher<I>::handle_get_mirror_uuid>(this);
287 int r = m_remote_io_ctx.aio_operate(RBD_MIRRORING, aio_comp, &op, &m_out_bl);
292 template <typename I>
293 void PoolWatcher<I>::handle_get_mirror_uuid(int r) {
294 dout(5) << "r=" << r << dendl;
296 bool deferred_refresh = false;
297 bool retry_refresh = false;
298 Context *on_init_finish = nullptr;
300 Mutex::Locker locker(m_lock);
301 assert(m_image_ids_invalid);
302 assert(m_refresh_in_progress);
303 m_refresh_in_progress = false;
305 m_pending_mirror_uuid = "";
307 bufferlist::iterator it = m_out_bl.begin();
308 r = librbd::cls_client::mirror_uuid_get_finish(
309 &it, &m_pending_mirror_uuid);
311 if (r >= 0 && m_pending_mirror_uuid.empty()) {
315 if (m_deferred_refresh) {
316 // need to refresh -- skip the notification
317 deferred_refresh = true;
319 dout(10) << "mirror_uuid=" << m_pending_mirror_uuid << dendl;
320 m_image_ids_invalid = false;
321 std::swap(on_init_finish, m_on_init_finish);
323 } else if (r == -EBLACKLISTED) {
324 dout(0) << "detected client is blacklisted during image refresh" << dendl;
326 m_blacklisted = true;
327 std::swap(on_init_finish, m_on_init_finish);
328 } else if (r == -ENOENT) {
329 dout(5) << "mirroring uuid not found" << dendl;
330 std::swap(on_init_finish, m_on_init_finish);
331 retry_refresh = true;
333 retry_refresh = true;
337 if (deferred_refresh) {
338 dout(5) << "scheduling deferred refresh" << dendl;
339 schedule_refresh_images(0);
340 } else if (retry_refresh) {
341 derr << "failed to retrieve mirror uuid: " << cpp_strerror(r)
343 schedule_refresh_images(10);
346 m_async_op_tracker.finish_op();
347 if (on_init_finish != nullptr) {
348 on_init_finish->complete(r);
352 template <typename I>
353 void PoolWatcher<I>::schedule_refresh_images(double interval) {
354 Mutex::Locker timer_locker(m_threads->timer_lock);
355 Mutex::Locker locker(m_lock);
356 if (m_shutting_down || m_refresh_in_progress || m_timer_ctx != nullptr) {
357 if (m_refresh_in_progress && !m_deferred_refresh) {
358 dout(5) << "deferring refresh until in-flight refresh completes" << dendl;
359 m_deferred_refresh = true;
364 m_image_ids_invalid = true;
365 m_timer_ctx = m_threads->timer->add_event_after(
367 new FunctionContext([this](int r) {
368 process_refresh_images();
372 template <typename I>
373 void PoolWatcher<I>::handle_rewatch_complete(int r) {
374 dout(5) << "r=" << r << dendl;
376 if (r == -EBLACKLISTED) {
377 dout(0) << "detected client is blacklisted" << dendl;
379 Mutex::Locker locker(m_lock);
380 m_blacklisted = true;
382 } else if (r == -ENOENT) {
383 dout(5) << "mirroring directory deleted" << dendl;
385 derr << "unexpected error re-registering mirroring directory watch: "
386 << cpp_strerror(r) << dendl;
389 schedule_refresh_images(5);
392 template <typename I>
393 void PoolWatcher<I>::handle_image_updated(const std::string &remote_image_id,
394 const std::string &global_image_id,
396 dout(10) << "remote_image_id=" << remote_image_id << ", "
397 << "global_image_id=" << global_image_id << ", "
398 << "enabled=" << enabled << dendl;
400 Mutex::Locker locker(m_lock);
401 ImageId image_id(global_image_id, remote_image_id);
402 m_pending_added_image_ids.erase(image_id);
403 m_pending_removed_image_ids.erase(image_id);
406 m_pending_added_image_ids.insert(image_id);
409 m_pending_removed_image_ids.insert(image_id);
414 template <typename I>
415 void PoolWatcher<I>::process_refresh_images() {
416 assert(m_threads->timer_lock.is_locked());
417 assert(m_timer_ctx != nullptr);
418 m_timer_ctx = nullptr;
421 Mutex::Locker locker(m_lock);
422 assert(!m_refresh_in_progress);
423 m_refresh_in_progress = true;
424 m_deferred_refresh = false;
427 // execute outside of the timer's lock
428 m_async_op_tracker.start_op();
429 Context *ctx = new FunctionContext([this](int r) {
431 m_async_op_tracker.finish_op();
433 m_threads->work_queue->queue(ctx, 0);
436 template <typename I>
437 void PoolWatcher<I>::schedule_listener() {
438 assert(m_lock.is_locked());
439 m_pending_updates = true;
440 if (m_shutting_down || m_image_ids_invalid || m_notify_listener_in_progress) {
446 m_async_op_tracker.start_op();
447 Context *ctx = new FunctionContext([this](int r) {
449 m_async_op_tracker.finish_op();
452 m_notify_listener_in_progress = true;
453 m_threads->work_queue->queue(ctx, 0);
456 template <typename I>
457 void PoolWatcher<I>::notify_listener() {
460 std::string mirror_uuid;
461 ImageIds added_image_ids;
462 ImageIds removed_image_ids;
464 Mutex::Locker locker(m_lock);
465 assert(m_notify_listener_in_progress);
467 // if the mirror uuid is updated, treat it as the removal of all
468 // images in the pool
469 if (m_mirror_uuid != m_pending_mirror_uuid) {
470 if (!m_mirror_uuid.empty()) {
471 dout(0) << "mirror uuid updated:"
472 << "old=" << m_mirror_uuid << ", "
473 << "new=" << m_pending_mirror_uuid << dendl;
476 mirror_uuid = m_mirror_uuid;
477 removed_image_ids = std::move(m_image_ids);
482 if (!removed_image_ids.empty()) {
483 m_listener.handle_update(mirror_uuid, {}, std::move(removed_image_ids));
484 removed_image_ids.clear();
488 Mutex::Locker locker(m_lock);
489 assert(m_notify_listener_in_progress);
491 // if the watch failed while we didn't own the lock, we are going
492 // to need to perform a full refresh
493 if (m_image_ids_invalid) {
494 m_notify_listener_in_progress = false;
498 // merge add/remove notifications into pending set (a given image
499 // can only be in one set or another)
500 for (auto &image_id : m_pending_removed_image_ids) {
501 dout(20) << "image_id=" << image_id << dendl;
502 m_pending_image_ids.erase(image_id);
505 for (auto &image_id : m_pending_added_image_ids) {
506 dout(20) << "image_id=" << image_id << dendl;
507 m_pending_image_ids.erase(image_id);
508 m_pending_image_ids.insert(image_id);
510 m_pending_added_image_ids.clear();
512 // compute added/removed images
513 for (auto &image_id : m_image_ids) {
514 auto it = m_pending_image_ids.find(image_id);
515 if (it == m_pending_image_ids.end() || it->id != image_id.id) {
516 removed_image_ids.insert(image_id);
519 for (auto &image_id : m_pending_image_ids) {
520 auto it = m_image_ids.find(image_id);
521 if (it == m_image_ids.end() || it->id != image_id.id) {
522 added_image_ids.insert(image_id);
526 m_pending_updates = false;
527 m_image_ids = m_pending_image_ids;
529 m_mirror_uuid = m_pending_mirror_uuid;
530 mirror_uuid = m_mirror_uuid;
533 m_listener.handle_update(mirror_uuid, std::move(added_image_ids),
534 std::move(removed_image_ids));
537 Mutex::Locker locker(m_lock);
538 m_notify_listener_in_progress = false;
539 if (m_pending_updates) {
545 } // namespace mirror
548 template class rbd::mirror::PoolWatcher<librbd::ImageCtx>;