1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "InstanceWatcher.h"
5 #include "include/stringify.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "cls/rbd/cls_rbd_client.h"
9 #include "librbd/ManagedLock.h"
10 #include "librbd/Utils.h"
11 #include "InstanceReplayer.h"
12 #include "ImageSyncThrottler.h"
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
17 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
22 using namespace instance_watcher;
24 using librbd::util::create_async_context_callback;
25 using librbd::util::create_context_callback;
26 using librbd::util::create_rados_callback;
27 using librbd::util::unique_lock_name;
31 struct C_GetInstances : public Context {
32 std::vector<std::string> *instance_ids;
36 C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish)
37 : instance_ids(instance_ids), on_finish(on_finish) {
40 void finish(int r) override {
41 dout(20) << "C_GetInstances: " << this << " " << __func__ << ": r=" << r
45 bufferlist::iterator it = out_bl.begin();
46 r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
47 } else if (r == -ENOENT) {
50 on_finish->complete(r);
55 struct C_RemoveInstanceRequest : public Context {
56 InstanceWatcher<I> instance_watcher;
59 C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
60 const std::string &instance_id, Context *on_finish)
61 : instance_watcher(io_ctx, work_queue, nullptr, instance_id),
62 on_finish(on_finish) {
66 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl;
68 instance_watcher.remove(this);
71 void finish(int r) override {
72 dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r="
76 on_finish->complete(r);
80 } // anonymous namespace
83 struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
84 InstanceWatcher<I> *instance_watcher;
85 std::string instance_id;
90 std::unique_ptr<librbd::watcher::Notifier> notifier;
91 librbd::watcher::NotifyResponse response;
92 bool canceling = false;
94 C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
95 const std::string &instance_id, uint64_t request_id,
96 bufferlist &&bl, Context *on_finish)
97 : instance_watcher(instance_watcher), instance_id(instance_id),
98 request_id(request_id), bl(bl), on_finish(on_finish),
99 send_to_leader(instance_id.empty()) {
100 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
101 << ": instance_watcher=" << instance_watcher << ", instance_id="
102 << instance_id << ", request_id=" << request_id << dendl;
104 assert(instance_watcher->m_lock.is_locked());
106 if (!send_to_leader) {
107 assert((!instance_id.empty()));
108 notifier.reset(new librbd::watcher::Notifier(
109 instance_watcher->m_work_queue,
110 instance_watcher->m_ioctx,
111 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
114 instance_watcher->m_notify_op_tracker.start_op();
115 auto result = instance_watcher->m_notify_ops.insert(
116 std::make_pair(instance_id, this)).second;
121 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
123 assert(instance_watcher->m_lock.is_locked());
126 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
127 << ": canceling" << dendl;
128 instance_watcher->m_work_queue->queue(this, -ECANCELED);
132 if (send_to_leader) {
133 if (instance_watcher->m_leader_instance_id.empty()) {
134 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
135 << ": suspending" << dendl;
136 instance_watcher->suspend_notify_request(this);
140 if (instance_watcher->m_leader_instance_id != instance_id) {
141 auto count = instance_watcher->m_notify_ops.erase(
142 std::make_pair(instance_id, this));
145 instance_id = instance_watcher->m_leader_instance_id;
147 auto result = instance_watcher->m_notify_ops.insert(
148 std::make_pair(instance_id, this)).second;
151 notifier.reset(new librbd::watcher::Notifier(
152 instance_watcher->m_work_queue,
153 instance_watcher->m_ioctx,
154 RBD_MIRROR_INSTANCE_PREFIX + instance_id));
158 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
159 << ": sending to " << instance_id << dendl;
160 notifier->notify(bl, &response, this);
164 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
166 assert(instance_watcher->m_lock.is_locked());
169 instance_watcher->unsuspend_notify_request(this);
172 void finish(int r) override {
173 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r="
176 if (r == 0 || r == -ETIMEDOUT) {
178 for (auto &it : response.acks) {
179 auto &bl = it.second;
180 if (it.second.length() == 0) {
181 dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
182 << ": no payload in ack, ignoring" << dendl;
186 auto iter = bl.begin();
187 NotifyAckPayload ack;
189 if (ack.instance_id != instance_watcher->get_instance_id()) {
190 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
191 << ": ack instance_id (" << ack.instance_id << ") "
192 << "does not match, ignoring" << dendl;
195 if (ack.request_id != request_id) {
196 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
197 << ": ack request_id (" << ack.request_id << ") "
198 << "does not match, ignoring" << dendl;
204 } catch (const buffer::error &err) {
205 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
206 << ": failed to decode ack: " << err.what() << dendl;
212 if (r == -ETIMEDOUT) {
213 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
214 << ": resending after timeout" << dendl;
215 Mutex::Locker locker(instance_watcher->m_lock);
222 if (r == -ESTALE && send_to_leader) {
223 derr << "C_NotifyInstanceRequest: " << this << " " << __func__
224 << ": resending due to leader change" << dendl;
225 Mutex::Locker locker(instance_watcher->m_lock);
232 on_finish->complete(r);
235 Mutex::Locker locker(instance_watcher->m_lock);
236 auto result = instance_watcher->m_notify_ops.erase(
237 std::make_pair(instance_id, this));
239 instance_watcher->m_notify_op_tracker.finish_op();
245 void complete(int r) override {
250 template <typename I>
251 struct InstanceWatcher<I>::C_SyncRequest : public Context {
252 InstanceWatcher<I> *instance_watcher;
255 Context *on_complete = nullptr;
256 C_NotifyInstanceRequest *req = nullptr;
258 C_SyncRequest(InstanceWatcher<I> *instance_watcher,
259 const std::string &sync_id, Context *on_start)
260 : instance_watcher(instance_watcher), sync_id(sync_id),
262 dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id="
266 void finish(int r) override {
267 dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": r="
270 if (on_start != nullptr) {
271 instance_watcher->handle_notify_sync_request(this, r);
273 instance_watcher->handle_notify_sync_complete(this, r);
279 void complete(int r) override {
285 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
286 << this << " " << __func__ << ": "
287 template <typename I>
288 void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
289 std::vector<std::string> *instance_ids,
290 Context *on_finish) {
291 librados::ObjectReadOperation op;
292 librbd::cls_client::mirror_instances_list_start(&op);
293 C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish);
294 librados::AioCompletion *aio_comp = create_rados_callback(ctx);
296 int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl);
301 template <typename I>
302 void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx,
303 ContextWQ *work_queue,
304 const std::string &instance_id,
305 Context *on_finish) {
306 auto req = new C_RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
311 template <typename I>
312 InstanceWatcher<I> *InstanceWatcher<I>::create(
313 librados::IoCtx &io_ctx, ContextWQ *work_queue,
314 InstanceReplayer<I> *instance_replayer) {
315 return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
316 stringify(io_ctx.get_instance_id()));
319 template <typename I>
320 InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
321 ContextWQ *work_queue,
322 InstanceReplayer<I> *instance_replayer,
323 const std::string &instance_id)
324 : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
325 m_instance_replayer(instance_replayer), m_instance_id(instance_id),
326 m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)),
327 m_instance_lock(librbd::ManagedLock<I>::create(
328 m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
329 m_cct->_conf->get_val<int64_t>("rbd_blacklist_expire_seconds"))) {
332 template <typename I>
333 InstanceWatcher<I>::~InstanceWatcher() {
334 assert(m_notify_ops.empty());
335 assert(m_notify_op_tracker.empty());
336 assert(m_suspended_ops.empty());
337 assert(m_inflight_sync_reqs.empty());
338 assert(m_image_sync_throttler == nullptr);
339 m_instance_lock->destroy();
342 template <typename I>
343 int InstanceWatcher<I>::init() {
344 C_SaferCond init_ctx;
346 return init_ctx.wait();
349 template <typename I>
350 void InstanceWatcher<I>::init(Context *on_finish) {
351 dout(20) << "instance_id=" << m_instance_id << dendl;
353 Mutex::Locker locker(m_lock);
355 assert(m_on_finish == nullptr);
356 m_on_finish = on_finish;
362 template <typename I>
363 void InstanceWatcher<I>::shut_down() {
364 C_SaferCond shut_down_ctx;
365 shut_down(&shut_down_ctx);
366 int r = shut_down_ctx.wait();
370 template <typename I>
371 void InstanceWatcher<I>::shut_down(Context *on_finish) {
374 Mutex::Locker locker(m_lock);
376 assert(m_on_finish == nullptr);
377 m_on_finish = on_finish;
383 template <typename I>
384 void InstanceWatcher<I>::remove(Context *on_finish) {
387 Mutex::Locker locker(m_lock);
389 assert(m_on_finish == nullptr);
390 m_on_finish = on_finish;
394 get_instance_locker();
397 template <typename I>
398 void InstanceWatcher<I>::notify_image_acquire(
399 const std::string &instance_id, const std::string &global_image_id,
400 Context *on_notify_ack) {
401 dout(20) << "instance_id=" << instance_id << ", global_image_id="
402 << global_image_id << dendl;
404 Mutex::Locker locker(m_lock);
406 assert(m_on_finish == nullptr);
408 if (instance_id == m_instance_id) {
409 handle_image_acquire(global_image_id, on_notify_ack);
411 uint64_t request_id = ++m_request_seq;
413 ::encode(NotifyMessage{ImageAcquirePayload{request_id, global_image_id}},
415 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
416 std::move(bl), on_notify_ack);
421 template <typename I>
422 void InstanceWatcher<I>::notify_image_release(
423 const std::string &instance_id, const std::string &global_image_id,
424 Context *on_notify_ack) {
425 dout(20) << "instance_id=" << instance_id << ", global_image_id="
426 << global_image_id << dendl;
428 Mutex::Locker locker(m_lock);
430 assert(m_on_finish == nullptr);
432 if (instance_id == m_instance_id) {
433 handle_image_release(global_image_id, on_notify_ack);
435 uint64_t request_id = ++m_request_seq;
437 ::encode(NotifyMessage{ImageReleasePayload{request_id, global_image_id}},
439 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
440 std::move(bl), on_notify_ack);
445 template <typename I>
446 void InstanceWatcher<I>::notify_peer_image_removed(
447 const std::string &instance_id, const std::string &global_image_id,
448 const std::string &peer_mirror_uuid, Context *on_notify_ack) {
449 dout(20) << "instance_id=" << instance_id << ", "
450 << "global_image_id=" << global_image_id << ", "
451 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
453 Mutex::Locker locker(m_lock);
454 assert(m_on_finish == nullptr);
456 if (instance_id == m_instance_id) {
457 handle_peer_image_removed(global_image_id, peer_mirror_uuid, on_notify_ack);
459 uint64_t request_id = ++m_request_seq;
461 ::encode(NotifyMessage{PeerImageRemovedPayload{request_id, global_image_id,
462 peer_mirror_uuid}}, bl);
463 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
464 std::move(bl), on_notify_ack);
469 template <typename I>
470 void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
471 Context *on_sync_start) {
472 dout(20) << "sync_id=" << sync_id << dendl;
474 Mutex::Locker locker(m_lock);
476 assert(m_inflight_sync_reqs.count(sync_id) == 0);
478 uint64_t request_id = ++m_request_seq;
481 ::encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl);
483 auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start);
484 sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id,
485 std::move(bl), sync_ctx);
487 m_inflight_sync_reqs[sync_id] = sync_ctx;
488 sync_ctx->req->send();
491 template <typename I>
492 bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) {
493 dout(20) << "sync_id=" << sync_id << dendl;
495 Mutex::Locker locker(m_lock);
497 auto it = m_inflight_sync_reqs.find(sync_id);
498 if (it == m_inflight_sync_reqs.end()) {
502 auto sync_ctx = it->second;
504 if (sync_ctx->on_start == nullptr) {
508 assert(sync_ctx->req != nullptr);
509 sync_ctx->req->cancel();
513 template <typename I>
514 void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id,
515 const std::string &sync_id) {
516 dout(20) << "sync_id=" << sync_id << dendl;
518 Mutex::Locker locker(m_lock);
520 uint64_t request_id = ++m_request_seq;
523 ::encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
525 auto ctx = new FunctionContext(
526 [this, sync_id] (int r) {
527 dout(20) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
528 Mutex::Locker locker(m_lock);
529 if (r != -ESTALE && m_image_sync_throttler != nullptr) {
530 m_image_sync_throttler->finish_op(sync_id);
533 auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
538 template <typename I>
539 void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) {
540 dout(20) << "sync_id=" << sync_id << dendl;
542 Mutex::Locker locker(m_lock);
544 auto it = m_inflight_sync_reqs.find(sync_id);
545 assert(it != m_inflight_sync_reqs.end());
547 auto sync_ctx = it->second;
548 assert(sync_ctx->req == nullptr);
550 m_inflight_sync_reqs.erase(it);
551 m_work_queue->queue(sync_ctx, 0);
554 template <typename I>
555 void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx,
557 dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
559 Context *on_start = nullptr;
561 Mutex::Locker locker(m_lock);
563 assert(sync_ctx->req != nullptr);
564 assert(sync_ctx->on_start != nullptr);
566 if (sync_ctx->req->canceling) {
570 std::swap(sync_ctx->on_start, on_start);
571 sync_ctx->req = nullptr;
574 on_start->complete(r == -ECANCELED ? r : 0);
576 if (r == -ECANCELED) {
577 notify_sync_complete(sync_ctx->sync_id);
581 template <typename I>
582 void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx,
584 dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
586 if (sync_ctx->on_complete != nullptr) {
587 sync_ctx->on_complete->complete(r);
591 template <typename I>
592 void InstanceWatcher<I>::print_sync_status(Formatter *f, stringstream *ss) {
595 Mutex::Locker locker(m_lock);
596 if (m_image_sync_throttler != nullptr) {
597 m_image_sync_throttler->print_status(f, ss);
601 template <typename I>
602 void InstanceWatcher<I>::handle_acquire_leader() {
605 Mutex::Locker locker(m_lock);
607 assert(m_image_sync_throttler == nullptr);
608 m_image_sync_throttler = ImageSyncThrottler<I>::create();
610 m_leader_instance_id = m_instance_id;
611 unsuspend_notify_requests();
614 template <typename I>
615 void InstanceWatcher<I>::handle_release_leader() {
618 Mutex::Locker locker(m_lock);
620 assert(m_image_sync_throttler != nullptr);
622 m_leader_instance_id.clear();
624 m_image_sync_throttler->drain(-ESTALE);
625 m_image_sync_throttler->destroy();
626 m_image_sync_throttler = nullptr;
629 template <typename I>
630 void InstanceWatcher<I>::handle_update_leader(
631 const std::string &leader_instance_id) {
632 dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
634 Mutex::Locker locker(m_lock);
636 m_leader_instance_id = leader_instance_id;
638 if (!m_leader_instance_id.empty()) {
639 unsuspend_notify_requests();
643 template <typename I>
644 void InstanceWatcher<I>::cancel_notify_requests(
645 const std::string &instance_id) {
646 dout(20) << "instance_id=" << instance_id << dendl;
648 Mutex::Locker locker(m_lock);
650 for (auto op : m_notify_ops) {
651 if (op.first == instance_id && !op.second->send_to_leader) {
657 template <typename I>
658 void InstanceWatcher<I>::register_instance() {
659 assert(m_lock.is_locked());
663 librados::ObjectWriteOperation op;
664 librbd::cls_client::mirror_instances_add(&op, m_instance_id);
665 librados::AioCompletion *aio_comp = create_rados_callback<
666 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this);
668 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
673 template <typename I>
674 void InstanceWatcher<I>::handle_register_instance(int r) {
675 dout(20) << "r=" << r << dendl;
677 Context *on_finish = nullptr;
679 Mutex::Locker locker(m_lock);
682 create_instance_object();
686 derr << "error registering instance: " << cpp_strerror(r) << dendl;
688 std::swap(on_finish, m_on_finish);
690 on_finish->complete(r);
694 template <typename I>
695 void InstanceWatcher<I>::create_instance_object() {
698 assert(m_lock.is_locked());
700 librados::ObjectWriteOperation op;
703 librados::AioCompletion *aio_comp = create_rados_callback<
705 &InstanceWatcher<I>::handle_create_instance_object>(this);
706 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
711 template <typename I>
712 void InstanceWatcher<I>::handle_create_instance_object(int r) {
713 dout(20) << "r=" << r << dendl;
715 Mutex::Locker locker(m_lock);
718 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
722 unregister_instance();
729 template <typename I>
730 void InstanceWatcher<I>::register_watch() {
733 assert(m_lock.is_locked());
735 Context *ctx = create_async_context_callback(
736 m_work_queue, create_context_callback<
737 InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this));
739 librbd::Watcher::register_watch(ctx);
742 template <typename I>
743 void InstanceWatcher<I>::handle_register_watch(int r) {
744 dout(20) << "r=" << r << dendl;
746 Mutex::Locker locker(m_lock);
749 derr << "error registering instance watcher for " << m_oid << " object: "
750 << cpp_strerror(r) << dendl;
753 remove_instance_object();
760 template <typename I>
761 void InstanceWatcher<I>::acquire_lock() {
764 assert(m_lock.is_locked());
766 Context *ctx = create_async_context_callback(
767 m_work_queue, create_context_callback<
768 InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this));
770 m_instance_lock->acquire_lock(ctx);
773 template <typename I>
774 void InstanceWatcher<I>::handle_acquire_lock(int r) {
775 dout(20) << "r=" << r << dendl;
777 Context *on_finish = nullptr;
779 Mutex::Locker locker(m_lock);
783 derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl;
790 std::swap(on_finish, m_on_finish);
793 on_finish->complete(r);
796 template <typename I>
797 void InstanceWatcher<I>::release_lock() {
800 assert(m_lock.is_locked());
802 Context *ctx = create_async_context_callback(
803 m_work_queue, create_context_callback<
804 InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this));
806 m_instance_lock->shut_down(ctx);
809 template <typename I>
810 void InstanceWatcher<I>::handle_release_lock(int r) {
811 dout(20) << "r=" << r << dendl;
813 Mutex::Locker locker(m_lock);
816 derr << "error releasing instance lock: " << cpp_strerror(r) << dendl;
822 template <typename I>
823 void InstanceWatcher<I>::unregister_watch() {
826 assert(m_lock.is_locked());
828 Context *ctx = create_async_context_callback(
829 m_work_queue, create_context_callback<
830 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this));
832 librbd::Watcher::unregister_watch(ctx);
835 template <typename I>
836 void InstanceWatcher<I>::handle_unregister_watch(int r) {
837 dout(20) << "r=" << r << dendl;
840 derr << "error unregistering instance watcher for " << m_oid << " object: "
841 << cpp_strerror(r) << dendl;
844 Mutex::Locker locker(m_lock);
845 remove_instance_object();
848 template <typename I>
849 void InstanceWatcher<I>::remove_instance_object() {
850 assert(m_lock.is_locked());
854 librados::ObjectWriteOperation op;
857 librados::AioCompletion *aio_comp = create_rados_callback<
859 &InstanceWatcher<I>::handle_remove_instance_object>(this);
860 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
865 template <typename I>
866 void InstanceWatcher<I>::handle_remove_instance_object(int r) {
867 dout(20) << "r=" << r << dendl;
869 if (m_removing && r == -ENOENT) {
874 derr << "error removing " << m_oid << " object: " << cpp_strerror(r)
878 Mutex::Locker locker(m_lock);
879 unregister_instance();
882 template <typename I>
883 void InstanceWatcher<I>::unregister_instance() {
886 assert(m_lock.is_locked());
888 librados::ObjectWriteOperation op;
889 librbd::cls_client::mirror_instances_remove(&op, m_instance_id);
890 librados::AioCompletion *aio_comp = create_rados_callback<
891 InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this);
893 int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
898 template <typename I>
899 void InstanceWatcher<I>::handle_unregister_instance(int r) {
900 dout(20) << "r=" << r << dendl;
903 derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
906 Mutex::Locker locker(m_lock);
907 wait_for_notify_ops();
910 template <typename I>
911 void InstanceWatcher<I>::wait_for_notify_ops() {
914 assert(m_lock.is_locked());
916 for (auto op : m_notify_ops) {
920 Context *ctx = create_async_context_callback(
921 m_work_queue, create_context_callback<
922 InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this));
924 m_notify_op_tracker.wait_for_ops(ctx);
927 template <typename I>
928 void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) {
929 dout(20) << "r=" << r << dendl;
933 Context *on_finish = nullptr;
935 Mutex::Locker locker(m_lock);
937 assert(m_notify_ops.empty());
939 std::swap(on_finish, m_on_finish);
946 on_finish->complete(r);
949 template <typename I>
950 void InstanceWatcher<I>::get_instance_locker() {
953 assert(m_lock.is_locked());
955 Context *ctx = create_async_context_callback(
956 m_work_queue, create_context_callback<
957 InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this));
959 m_instance_lock->get_locker(&m_instance_locker, ctx);
962 template <typename I>
963 void InstanceWatcher<I>::handle_get_instance_locker(int r) {
964 dout(20) << "r=" << r << dendl;
966 Mutex::Locker locker(m_lock);
970 derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl;
972 remove_instance_object();
976 break_instance_lock();
979 template <typename I>
980 void InstanceWatcher<I>::break_instance_lock() {
983 assert(m_lock.is_locked());
985 Context *ctx = create_async_context_callback(
986 m_work_queue, create_context_callback<
987 InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this));
989 m_instance_lock->break_lock(m_instance_locker, true, ctx);
992 template <typename I>
993 void InstanceWatcher<I>::handle_break_instance_lock(int r) {
994 dout(20) << "r=" << r << dendl;
996 Mutex::Locker locker(m_lock);
1000 derr << "error breaking instance lock: " << cpp_strerror(r) << dendl;
1002 remove_instance_object();
1006 remove_instance_object();
1009 template <typename I>
1010 void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) {
1011 dout(20) << req << dendl;
1013 assert(m_lock.is_locked());
1015 auto result = m_suspended_ops.insert(req).second;
1019 template <typename I>
1020 bool InstanceWatcher<I>::unsuspend_notify_request(
1021 C_NotifyInstanceRequest *req) {
1022 dout(20) << req << dendl;
1024 assert(m_lock.is_locked());
1026 auto result = m_suspended_ops.erase(req);
1035 template <typename I>
1036 void InstanceWatcher<I>::unsuspend_notify_requests() {
1039 assert(m_lock.is_locked());
1041 std::set<C_NotifyInstanceRequest *> suspended_ops;
1042 std::swap(m_suspended_ops, suspended_ops);
1044 for (auto op : suspended_ops) {
1049 template <typename I>
1050 Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
1051 uint64_t request_id,
1052 C_NotifyAck *on_notify_ack) {
1053 dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
1056 Mutex::Locker locker(m_lock);
1058 Context *ctx = nullptr;
1059 Request request(instance_id, request_id);
1060 auto it = m_requests.find(request);
1062 if (it != m_requests.end()) {
1063 dout(20) << "duplicate for in-progress request" << dendl;
1064 delete it->on_notify_ack;
1065 m_requests.erase(it);
1067 ctx = create_async_context_callback(
1068 m_work_queue, new FunctionContext(
1069 [this, instance_id, request_id] (int r) {
1070 complete_request(instance_id, request_id, r);
1074 request.on_notify_ack = on_notify_ack;
1075 m_requests.insert(request);
1079 template <typename I>
1080 void InstanceWatcher<I>::complete_request(const std::string &instance_id,
1081 uint64_t request_id, int r) {
1082 dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
1085 C_NotifyAck *on_notify_ack;
1087 Mutex::Locker locker(m_lock);
1088 Request request(instance_id, request_id);
1089 auto it = m_requests.find(request);
1090 assert(it != m_requests.end());
1091 on_notify_ack = it->on_notify_ack;
1092 m_requests.erase(it);
1095 ::encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out);
1096 on_notify_ack->complete(0);
1099 template <typename I>
1100 void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1101 uint64_t notifier_id, bufferlist &bl) {
1102 dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
1103 << "notifier_id=" << notifier_id << dendl;
1105 auto ctx = new C_NotifyAck(this, notify_id, handle);
1107 NotifyMessage notify_message;
1109 bufferlist::iterator iter = bl.begin();
1110 ::decode(notify_message, iter);
1111 } catch (const buffer::error &err) {
1112 derr << "error decoding image notification: " << err.what() << dendl;
1117 apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx),
1118 notify_message.payload);
1121 template <typename I>
1122 void InstanceWatcher<I>::handle_image_acquire(
1123 const std::string &global_image_id, Context *on_finish) {
1124 dout(20) << "global_image_id=" << global_image_id << dendl;
1126 auto ctx = new FunctionContext(
1127 [this, global_image_id, on_finish] (int r) {
1128 m_instance_replayer->acquire_image(this, global_image_id, on_finish);
1129 m_notify_op_tracker.finish_op();
1132 m_notify_op_tracker.start_op();
1133 m_work_queue->queue(ctx, 0);
1136 template <typename I>
1137 void InstanceWatcher<I>::handle_image_release(
1138 const std::string &global_image_id, Context *on_finish) {
1139 dout(20) << "global_image_id=" << global_image_id << dendl;
1141 auto ctx = new FunctionContext(
1142 [this, global_image_id, on_finish] (int r) {
1143 m_instance_replayer->release_image(global_image_id, on_finish);
1144 m_notify_op_tracker.finish_op();
1147 m_notify_op_tracker.start_op();
1148 m_work_queue->queue(ctx, 0);
1151 template <typename I>
1152 void InstanceWatcher<I>::handle_peer_image_removed(
1153 const std::string &global_image_id, const std::string &peer_mirror_uuid,
1154 Context *on_finish) {
1155 dout(20) << "global_image_id=" << global_image_id << ", "
1156 << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
1158 auto ctx = new FunctionContext(
1159 [this, peer_mirror_uuid, global_image_id, on_finish] (int r) {
1160 m_instance_replayer->remove_peer_image(global_image_id,
1161 peer_mirror_uuid, on_finish);
1162 m_notify_op_tracker.finish_op();
1165 m_notify_op_tracker.start_op();
1166 m_work_queue->queue(ctx, 0);
1169 template <typename I>
1170 void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
1171 const std::string &sync_id,
1172 Context *on_finish) {
1173 dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
1175 Mutex::Locker locker(m_lock);
1177 if (m_image_sync_throttler == nullptr) {
1178 dout(20) << "sync request for non-leader" << dendl;
1179 m_work_queue->queue(on_finish, -ESTALE);
1183 Context *on_start = create_async_context_callback(
1184 m_work_queue, new FunctionContext(
1185 [this, instance_id, sync_id, on_finish] (int r) {
1186 dout(20) << "handle_sync_request: finish: instance_id=" << instance_id
1187 << ", sync_id=" << sync_id << ", r=" << r << dendl;
1189 notify_sync_start(instance_id, sync_id);
1191 on_finish->complete(r);
1193 m_image_sync_throttler->start_op(sync_id, on_start);
1196 template <typename I>
1197 void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id,
1198 const std::string &sync_id,
1199 Context *on_finish) {
1200 dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
1202 Mutex::Locker locker(m_lock);
1204 auto it = m_inflight_sync_reqs.find(sync_id);
1205 if (it == m_inflight_sync_reqs.end()) {
1206 dout(20) << "not found" << dendl;
1207 m_work_queue->queue(on_finish, 0);
1211 auto sync_ctx = it->second;
1213 if (sync_ctx->on_complete != nullptr) {
1214 dout(20) << "duplicate request" << dendl;
1215 m_work_queue->queue(sync_ctx->on_complete, -ESTALE);
1218 sync_ctx->on_complete = on_finish;
1221 template <typename I>
1222 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1223 const ImageAcquirePayload &payload,
1224 C_NotifyAck *on_notify_ack) {
1225 dout(20) << "image_acquire: instance_id=" << instance_id << ", "
1226 << "request_id=" << payload.request_id << dendl;
1228 auto on_finish = prepare_request(instance_id, payload.request_id,
1230 if (on_finish != nullptr) {
1231 handle_image_acquire(payload.global_image_id, on_finish);
1235 template <typename I>
1236 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1237 const ImageReleasePayload &payload,
1238 C_NotifyAck *on_notify_ack) {
1239 dout(20) << "image_release: instance_id=" << instance_id << ", "
1240 << "request_id=" << payload.request_id << dendl;
1242 auto on_finish = prepare_request(instance_id, payload.request_id,
1244 if (on_finish != nullptr) {
1245 handle_image_release(payload.global_image_id, on_finish);
1249 template <typename I>
1250 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1251 const PeerImageRemovedPayload &payload,
1252 C_NotifyAck *on_notify_ack) {
1253 dout(20) << "remove_peer_image: instance_id=" << instance_id << ", "
1254 << "request_id=" << payload.request_id << dendl;
1256 auto on_finish = prepare_request(instance_id, payload.request_id,
1258 if (on_finish != nullptr) {
1259 handle_peer_image_removed(payload.global_image_id, payload.peer_mirror_uuid,
1264 template <typename I>
1265 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1266 const SyncRequestPayload &payload,
1267 C_NotifyAck *on_notify_ack) {
1268 dout(20) << "sync_request: instance_id=" << instance_id << ", "
1269 << "request_id=" << payload.request_id << dendl;
1271 auto on_finish = prepare_request(instance_id, payload.request_id,
1273 if (on_finish == nullptr) {
1277 handle_sync_request(instance_id, payload.sync_id, on_finish);
1280 template <typename I>
1281 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1282 const SyncStartPayload &payload,
1283 C_NotifyAck *on_notify_ack) {
1284 dout(20) << "sync_start: instance_id=" << instance_id << ", "
1285 << "request_id=" << payload.request_id << dendl;
1287 auto on_finish = prepare_request(instance_id, payload.request_id,
1289 if (on_finish == nullptr) {
1293 handle_sync_start(instance_id, payload.sync_id, on_finish);
1296 template <typename I>
1297 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1298 const UnknownPayload &payload,
1299 C_NotifyAck *on_notify_ack) {
1300 dout(20) << "unknown: instance_id=" << instance_id << dendl;
1302 on_notify_ack->complete(0);
1305 } // namespace mirror
1308 template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>;