1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "LeaderWatcher.h"
5 #include "common/Timer.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "cls/rbd/cls_rbd_client.h"
9 #include "include/stringify.h"
10 #include "librbd/Utils.h"
11 #include "librbd/watcher/Types.h"
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
17 #define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
18 << this << " " << __func__ << ": "
22 using namespace leader_watcher;
24 using librbd::util::create_async_context_callback;
25 using librbd::util::create_context_callback;
26 using librbd::util::create_rados_callback;
29 LeaderWatcher<I>::LeaderWatcher(Threads<I> *threads, librados::IoCtx &io_ctx,
31 : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
32 m_threads(threads), m_listener(listener),
33 m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()),
34 m_notifier_id(librados::Rados(io_ctx).get_instance_id()),
35 m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
36 m_cct->_conf->get_val<int64_t>(
37 "rbd_blacklist_expire_seconds"))) {
41 LeaderWatcher<I>::~LeaderWatcher() {
42 assert(m_status_watcher == nullptr);
43 assert(m_instances == nullptr);
44 assert(m_timer_task == nullptr);
50 std::string LeaderWatcher<I>::get_instance_id() {
51 return stringify(m_notifier_id);
55 int LeaderWatcher<I>::init() {
58 return init_ctx.wait();
62 void LeaderWatcher<I>::init(Context *on_finish) {
63 dout(20) << "notifier_id=" << m_notifier_id << dendl;
65 Mutex::Locker locker(m_lock);
67 assert(m_on_finish == nullptr);
68 m_on_finish = on_finish;
70 create_leader_object();
74 void LeaderWatcher<I>::create_leader_object() {
77 assert(m_lock.is_locked());
79 librados::ObjectWriteOperation op;
82 librados::AioCompletion *aio_comp = create_rados_callback<
83 LeaderWatcher<I>, &LeaderWatcher<I>::handle_create_leader_object>(this);
84 int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
90 void LeaderWatcher<I>::handle_create_leader_object(int r) {
91 dout(20) << "r=" << r << dendl;
93 Context *on_finish = nullptr;
95 Mutex::Locker locker(m_lock);
102 derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
105 std::swap(on_finish, m_on_finish);
107 on_finish->complete(r);
110 template <typename I>
111 void LeaderWatcher<I>::register_watch() {
114 assert(m_lock.is_locked());
116 Context *ctx = create_async_context_callback(
117 m_work_queue, create_context_callback<
118 LeaderWatcher<I>, &LeaderWatcher<I>::handle_register_watch>(this));
120 librbd::Watcher::register_watch(ctx);
123 template <typename I>
124 void LeaderWatcher<I>::handle_register_watch(int r) {
125 dout(20) << "r=" << r << dendl;
127 Context *on_finish = nullptr;
129 Mutex::Locker timer_locker(m_threads->timer_lock);
130 Mutex::Locker locker(m_lock);
133 derr << "error registering leader watcher for " << m_oid << " object: "
134 << cpp_strerror(r) << dendl;
136 schedule_acquire_leader_lock(0);
139 std::swap(on_finish, m_on_finish);
141 on_finish->complete(r);
144 template <typename I>
145 void LeaderWatcher<I>::shut_down() {
146 C_SaferCond shut_down_ctx;
147 shut_down(&shut_down_ctx);
148 int r = shut_down_ctx.wait();
152 template <typename I>
153 void LeaderWatcher<I>::shut_down(Context *on_finish) {
156 Mutex::Locker timer_locker(m_threads->timer_lock);
157 Mutex::Locker locker(m_lock);
159 assert(m_on_shut_down_finish == nullptr);
160 m_on_shut_down_finish = on_finish;
162 shut_down_leader_lock();
165 template <typename I>
166 void LeaderWatcher<I>::shut_down_leader_lock() {
169 assert(m_lock.is_locked());
171 Context *ctx = create_async_context_callback(
172 m_work_queue, create_context_callback<
173 LeaderWatcher<I>, &LeaderWatcher<I>::handle_shut_down_leader_lock>(this));
175 m_leader_lock->shut_down(ctx);
178 template <typename I>
179 void LeaderWatcher<I>::handle_shut_down_leader_lock(int r) {
180 dout(20) << "r=" << r << dendl;
182 Mutex::Locker locker(m_lock);
185 derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl;
191 template <typename I>
192 void LeaderWatcher<I>::unregister_watch() {
195 assert(m_lock.is_locked());
197 Context *ctx = create_async_context_callback(
198 m_work_queue, create_context_callback<
199 LeaderWatcher<I>, &LeaderWatcher<I>::handle_unregister_watch>(this));
201 librbd::Watcher::unregister_watch(ctx);
204 template <typename I>
205 void LeaderWatcher<I>::handle_unregister_watch(int r) {
206 dout(20) << "r=" << r << dendl;
209 derr << "error unregistering leader watcher for " << m_oid << " object: "
210 << cpp_strerror(r) << dendl;
215 template <typename I>
216 void LeaderWatcher<I>::wait_for_tasks() {
219 Mutex::Locker timer_locker(m_threads->timer_lock);
220 Mutex::Locker locker(m_lock);
221 schedule_timer_task("wait for tasks", 0, false,
222 &LeaderWatcher<I>::handle_wait_for_tasks, true);
225 template <typename I>
226 void LeaderWatcher<I>::handle_wait_for_tasks() {
229 assert(m_threads->timer_lock.is_locked());
230 assert(m_lock.is_locked());
231 assert(m_on_shut_down_finish != nullptr);
233 assert(!m_timer_op_tracker.empty());
234 m_timer_op_tracker.finish_op();
236 auto ctx = new FunctionContext([this](int r) {
239 // ensure lock isn't held when completing shut down
240 Mutex::Locker locker(m_lock);
241 assert(m_on_shut_down_finish != nullptr);
242 on_finish = m_on_shut_down_finish;
244 on_finish->complete(0);
246 m_work_queue->queue(ctx, 0);
249 template <typename I>
250 bool LeaderWatcher<I>::is_leader() const {
251 Mutex::Locker locker(m_lock);
253 return is_leader(m_lock);
256 template <typename I>
257 bool LeaderWatcher<I>::is_leader(Mutex &lock) const {
258 assert(m_lock.is_locked());
260 bool leader = m_leader_lock->is_leader();
261 dout(20) << leader << dendl;
265 template <typename I>
266 bool LeaderWatcher<I>::is_releasing_leader() const {
267 Mutex::Locker locker(m_lock);
269 return is_releasing_leader(m_lock);
272 template <typename I>
273 bool LeaderWatcher<I>::is_releasing_leader(Mutex &lock) const {
274 assert(m_lock.is_locked());
276 bool releasing = m_leader_lock->is_releasing_leader();
277 dout(20) << releasing << dendl;
281 template <typename I>
282 bool LeaderWatcher<I>::get_leader_instance_id(std::string *instance_id) const {
285 Mutex::Locker locker(m_lock);
287 if (is_leader(m_lock) || is_releasing_leader(m_lock)) {
288 *instance_id = stringify(m_notifier_id);
292 if (!m_locker.cookie.empty()) {
293 *instance_id = stringify(m_locker.entity.num());
300 template <typename I>
301 void LeaderWatcher<I>::release_leader() {
304 Mutex::Locker locker(m_lock);
305 if (!is_leader(m_lock)) {
309 release_leader_lock();
312 template <typename I>
313 void LeaderWatcher<I>::list_instances(std::vector<std::string> *instance_ids) {
316 Mutex::Locker locker(m_lock);
318 instance_ids->clear();
319 if (m_instances != nullptr) {
320 m_instances->list(instance_ids);
324 template <typename I>
325 void LeaderWatcher<I>::cancel_timer_task() {
326 assert(m_threads->timer_lock.is_locked());
327 assert(m_lock.is_locked());
329 if (m_timer_task == nullptr) {
333 dout(20) << m_timer_task << dendl;
334 bool canceled = m_threads->timer->cancel_event(m_timer_task);
336 m_timer_task = nullptr;
339 template <typename I>
340 void LeaderWatcher<I>::schedule_timer_task(const std::string &name,
341 int delay_factor, bool leader,
342 TimerCallback timer_callback,
343 bool shutting_down) {
344 assert(m_threads->timer_lock.is_locked());
345 assert(m_lock.is_locked());
347 if (!shutting_down && m_on_shut_down_finish != nullptr) {
353 m_timer_task = new FunctionContext(
354 [this, leader, timer_callback](int r) {
355 assert(m_threads->timer_lock.is_locked());
356 m_timer_task = nullptr;
358 if (m_timer_op_tracker.empty()) {
359 Mutex::Locker locker(m_lock);
360 execute_timer_task(leader, timer_callback);
364 // old timer task is still running -- do not start next
365 // task until the previous task completes
366 if (m_timer_gate == nullptr) {
367 m_timer_gate = new C_TimerGate(this);
368 m_timer_op_tracker.wait_for_ops(m_timer_gate);
370 m_timer_gate->leader = leader;
371 m_timer_gate->timer_callback = timer_callback;
374 int after = delay_factor * m_cct->_conf->get_val<int64_t>(
375 "rbd_mirror_leader_heartbeat_interval");
377 dout(20) << "scheduling " << name << " after " << after << " sec (task "
378 << m_timer_task << ")" << dendl;
379 m_threads->timer->add_event_after(after, m_timer_task);
382 template <typename I>
383 void LeaderWatcher<I>::execute_timer_task(bool leader,
384 TimerCallback timer_callback) {
387 assert(m_threads->timer_lock.is_locked());
388 assert(m_lock.is_locked());
389 assert(m_timer_op_tracker.empty());
391 if (is_leader(m_lock) != leader) {
395 m_timer_op_tracker.start_op();
396 (this->*timer_callback)();
399 template <typename I>
400 void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
401 Context *on_finish) {
402 dout(20) << "r=" << r << dendl;
406 dout(20) << "already locked" << dendl;
408 derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl;
410 on_finish->complete(r);
414 Mutex::Locker locker(m_lock);
415 assert(m_on_finish == nullptr);
416 m_on_finish = on_finish;
419 init_status_watcher();
422 template <typename I>
423 void LeaderWatcher<I>::handle_pre_release_leader_lock(Context *on_finish) {
426 Mutex::Locker locker(m_lock);
427 assert(m_on_finish == nullptr);
428 m_on_finish = on_finish;
434 template <typename I>
435 void LeaderWatcher<I>::handle_post_release_leader_lock(int r,
436 Context *on_finish) {
437 dout(20) << "r=" << r << dendl;
440 on_finish->complete(r);
444 Mutex::Locker locker(m_lock);
445 assert(m_on_finish == nullptr);
446 m_on_finish = on_finish;
448 notify_lock_released();
451 template <typename I>
452 void LeaderWatcher<I>::break_leader_lock() {
455 assert(m_threads->timer_lock.is_locked());
456 assert(m_lock.is_locked());
457 assert(!m_timer_op_tracker.empty());
459 if (m_locker.cookie.empty()) {
464 Context *ctx = create_async_context_callback(
465 m_work_queue, create_context_callback<
466 LeaderWatcher<I>, &LeaderWatcher<I>::handle_break_leader_lock>(this));
468 m_leader_lock->break_lock(m_locker, true, ctx);
471 template <typename I>
472 void LeaderWatcher<I>::handle_break_leader_lock(int r) {
473 dout(20) << "r=" << r << dendl;
475 Mutex::Locker timer_locker(m_threads->timer_lock);
476 Mutex::Locker locker(m_lock);
477 assert(!m_timer_op_tracker.empty());
479 if (m_leader_lock->is_shutdown()) {
480 dout(20) << "canceling due to shutdown" << dendl;
481 m_timer_op_tracker.finish_op();
485 if (r < 0 && r != -ENOENT) {
486 derr << "error beaking leader lock: " << cpp_strerror(r) << dendl;
487 schedule_acquire_leader_lock(1);
488 m_timer_op_tracker.finish_op();
493 m_acquire_attempts = 0;
494 acquire_leader_lock();
497 template <typename I>
498 void LeaderWatcher<I>::schedule_get_locker(bool reset_leader,
499 uint32_t delay_factor) {
502 assert(m_threads->timer_lock.is_locked());
503 assert(m_lock.is_locked());
507 m_acquire_attempts = 0;
510 schedule_timer_task("get locker", delay_factor, false,
511 &LeaderWatcher<I>::get_locker, false);
514 template <typename I>
515 void LeaderWatcher<I>::get_locker() {
518 assert(m_threads->timer_lock.is_locked());
519 assert(m_lock.is_locked());
520 assert(!m_timer_op_tracker.empty());
522 C_GetLocker *get_locker_ctx = new C_GetLocker(this);
523 Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx);
525 m_leader_lock->get_locker(&get_locker_ctx->locker, ctx);
528 template <typename I>
529 void LeaderWatcher<I>::handle_get_locker(int r,
530 librbd::managed_lock::Locker& locker) {
531 dout(20) << "r=" << r << dendl;
533 Mutex::Locker timer_locker(m_threads->timer_lock);
534 Mutex::Locker mutex_locker(m_lock);
535 assert(!m_timer_op_tracker.empty());
537 if (m_leader_lock->is_shutdown()) {
538 dout(20) << "canceling due to shutdown" << dendl;
539 m_timer_op_tracker.finish_op();
543 if (is_leader(m_lock)) {
545 m_timer_op_tracker.finish_op();
551 m_acquire_attempts = 0;
552 acquire_leader_lock();
555 derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl;
556 schedule_get_locker(true, 1);
557 m_timer_op_tracker.finish_op();
561 bool notify_listener = false;
562 if (m_locker != locker) {
564 notify_listener = true;
565 if (m_acquire_attempts > 1) {
566 dout(10) << "new lock owner detected -- resetting heartbeat counter"
568 m_acquire_attempts = 0;
572 if (m_acquire_attempts >= m_cct->_conf->get_val<int64_t>(
573 "rbd_mirror_leader_max_acquire_attempts_before_break")) {
574 dout(0) << "breaking leader lock after " << m_acquire_attempts << " "
575 << "failed attempts to acquire" << dendl;
580 schedule_acquire_leader_lock(1);
582 if (!notify_listener) {
583 m_timer_op_tracker.finish_op();
587 auto ctx = new FunctionContext(
589 std::string instance_id;
590 if (get_leader_instance_id(&instance_id)) {
591 m_listener->update_leader_handler(instance_id);
593 Mutex::Locker timer_locker(m_threads->timer_lock);
594 Mutex::Locker locker(m_lock);
595 m_timer_op_tracker.finish_op();
597 m_work_queue->queue(ctx, 0);
600 template <typename I>
601 void LeaderWatcher<I>::schedule_acquire_leader_lock(uint32_t delay_factor) {
604 assert(m_threads->timer_lock.is_locked());
605 assert(m_lock.is_locked());
607 schedule_timer_task("acquire leader lock",
609 m_cct->_conf->get_val<int64_t>("rbd_mirror_leader_max_missed_heartbeats"),
610 false, &LeaderWatcher<I>::acquire_leader_lock, false);
613 template <typename I>
614 void LeaderWatcher<I>::acquire_leader_lock() {
615 assert(m_threads->timer_lock.is_locked());
616 assert(m_lock.is_locked());
617 assert(!m_timer_op_tracker.empty());
619 ++m_acquire_attempts;
620 dout(20) << "acquire_attempts=" << m_acquire_attempts << dendl;
622 Context *ctx = create_async_context_callback(
623 m_work_queue, create_context_callback<
624 LeaderWatcher<I>, &LeaderWatcher<I>::handle_acquire_leader_lock>(this));
625 m_leader_lock->try_acquire_lock(ctx);
628 template <typename I>
629 void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {
630 dout(20) << "r=" << r << dendl;
632 Mutex::Locker timer_locker(m_threads->timer_lock);
633 Mutex::Locker locker(m_lock);
634 assert(!m_timer_op_tracker.empty());
636 if (m_leader_lock->is_shutdown()) {
637 dout(20) << "canceling due to shutdown" << dendl;
638 m_timer_op_tracker.finish_op();
644 dout(20) << "already locked" << dendl;
646 derr << "error acquiring lock: " << cpp_strerror(r) << dendl;
654 m_acquire_attempts = 0;
657 dout(5) << "releasing due to error on notify" << dendl;
658 release_leader_lock();
659 m_timer_op_tracker.finish_op();
666 template <typename I>
667 void LeaderWatcher<I>::release_leader_lock() {
670 assert(m_lock.is_locked());
672 Context *ctx = create_async_context_callback(
673 m_work_queue, create_context_callback<
674 LeaderWatcher<I>, &LeaderWatcher<I>::handle_release_leader_lock>(this));
676 m_leader_lock->release_lock(ctx);
679 template <typename I>
680 void LeaderWatcher<I>::handle_release_leader_lock(int r) {
681 dout(20) << "r=" << r << dendl;
683 Mutex::Locker timer_locker(m_threads->timer_lock);
684 Mutex::Locker locker(m_lock);
687 derr << "error releasing lock: " << cpp_strerror(r) << dendl;
691 schedule_acquire_leader_lock(1);
694 template <typename I>
695 void LeaderWatcher<I>::init_status_watcher() {
698 assert(m_lock.is_locked());
699 assert(m_status_watcher == nullptr);
701 m_status_watcher = MirrorStatusWatcher<I>::create(m_ioctx, m_work_queue);
703 Context *ctx = create_context_callback<
704 LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_status_watcher>(this);
706 m_status_watcher->init(ctx);
709 template <typename I>
710 void LeaderWatcher<I>::handle_init_status_watcher(int r) {
711 dout(20) << "r=" << r << dendl;
713 Context *on_finish = nullptr;
715 Mutex::Locker locker(m_lock);
722 derr << "error initializing mirror status watcher: " << cpp_strerror(r)
724 m_status_watcher->destroy();
725 m_status_watcher = nullptr;
726 assert(m_on_finish != nullptr);
727 std::swap(m_on_finish, on_finish);
729 on_finish->complete(r);
732 template <typename I>
733 void LeaderWatcher<I>::shut_down_status_watcher() {
736 assert(m_lock.is_locked());
737 assert(m_status_watcher != nullptr);
739 Context *ctx = create_async_context_callback(
740 m_work_queue, create_context_callback<LeaderWatcher<I>,
741 &LeaderWatcher<I>::handle_shut_down_status_watcher>(this));
743 m_status_watcher->shut_down(ctx);
746 template <typename I>
747 void LeaderWatcher<I>::handle_shut_down_status_watcher(int r) {
748 dout(20) << "r=" << r << dendl;
750 Context *on_finish = nullptr;
752 Mutex::Locker locker(m_lock);
754 m_status_watcher->destroy();
755 m_status_watcher = nullptr;
758 derr << "error shutting mirror status watcher down: " << cpp_strerror(r)
762 if (m_ret_val != 0) {
766 if (!is_leader(m_lock)) {
767 // ignore on releasing
771 assert(m_on_finish != nullptr);
772 std::swap(m_on_finish, on_finish);
774 on_finish->complete(r);
777 template <typename I>
778 void LeaderWatcher<I>::init_instances() {
781 assert(m_lock.is_locked());
782 assert(m_instances == nullptr);
784 m_instances = Instances<I>::create(m_threads, m_ioctx);
786 Context *ctx = create_context_callback<
787 LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);
789 m_instances->init(ctx);
792 template <typename I>
793 void LeaderWatcher<I>::handle_init_instances(int r) {
794 dout(20) << "r=" << r << dendl;
796 Mutex::Locker locker(m_lock);
799 derr << "error initializing instances: " << cpp_strerror(r) << dendl;
801 m_instances->destroy();
802 m_instances = nullptr;
803 shut_down_status_watcher();
810 template <typename I>
811 void LeaderWatcher<I>::shut_down_instances() {
814 assert(m_lock.is_locked());
815 assert(m_instances != nullptr);
817 Context *ctx = create_async_context_callback(
818 m_work_queue, create_context_callback<LeaderWatcher<I>,
819 &LeaderWatcher<I>::handle_shut_down_instances>(this));
821 m_instances->shut_down(ctx);
824 template <typename I>
825 void LeaderWatcher<I>::handle_shut_down_instances(int r) {
826 dout(20) << "r=" << r << dendl;
829 Mutex::Locker locker(m_lock);
831 m_instances->destroy();
832 m_instances = nullptr;
834 shut_down_status_watcher();
837 template <typename I>
838 void LeaderWatcher<I>::notify_listener() {
841 assert(m_lock.is_locked());
843 Context *ctx = create_async_context_callback(
844 m_work_queue, create_context_callback<
845 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_listener>(this));
847 if (is_leader(m_lock)) {
848 ctx = new FunctionContext(
850 m_listener->post_acquire_handler(ctx);
853 ctx = new FunctionContext(
855 m_listener->pre_release_handler(ctx);
858 m_work_queue->queue(ctx, 0);
861 template <typename I>
862 void LeaderWatcher<I>::handle_notify_listener(int r) {
863 dout(20) << "r=" << r << dendl;
865 Mutex::Locker locker(m_lock);
868 derr << "error notifying listener: " << cpp_strerror(r) << dendl;
872 if (is_leader(m_lock)) {
873 notify_lock_acquired();
875 shut_down_instances();
879 template <typename I>
880 void LeaderWatcher<I>::notify_lock_acquired() {
883 assert(m_lock.is_locked());
885 Context *ctx = create_context_callback<
886 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_acquired>(this);
889 ::encode(NotifyMessage{LockAcquiredPayload{}}, bl);
891 send_notify(bl, nullptr, ctx);
894 template <typename I>
895 void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
896 dout(20) << "r=" << r << dendl;
898 Context *on_finish = nullptr;
900 Mutex::Locker locker(m_lock);
901 if (r < 0 && r != -ETIMEDOUT) {
902 derr << "error notifying leader lock acquired: " << cpp_strerror(r)
907 assert(m_on_finish != nullptr);
908 std::swap(m_on_finish, on_finish);
910 on_finish->complete(0);
913 template <typename I>
914 void LeaderWatcher<I>::notify_lock_released() {
917 assert(m_lock.is_locked());
919 Context *ctx = create_context_callback<
920 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_released>(this);
923 ::encode(NotifyMessage{LockReleasedPayload{}}, bl);
925 send_notify(bl, nullptr, ctx);
928 template <typename I>
929 void LeaderWatcher<I>::handle_notify_lock_released(int r) {
930 dout(20) << "r=" << r << dendl;
932 Context *on_finish = nullptr;
934 Mutex::Locker locker(m_lock);
935 if (r < 0 && r != -ETIMEDOUT) {
936 derr << "error notifying leader lock released: " << cpp_strerror(r)
940 assert(m_on_finish != nullptr);
941 std::swap(m_on_finish, on_finish);
943 on_finish->complete(r);
946 template <typename I>
947 void LeaderWatcher<I>::notify_heartbeat() {
950 assert(m_threads->timer_lock.is_locked());
951 assert(m_lock.is_locked());
952 assert(!m_timer_op_tracker.empty());
954 if (!is_leader(m_lock)) {
955 dout(5) << "not leader, canceling" << dendl;
956 m_timer_op_tracker.finish_op();
960 Context *ctx = create_context_callback<
961 LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_heartbeat>(this);
964 ::encode(NotifyMessage{HeartbeatPayload{}}, bl);
966 m_heartbeat_response.acks.clear();
967 send_notify(bl, &m_heartbeat_response, ctx);
970 template <typename I>
971 void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
972 dout(20) << "r=" << r << dendl;
974 Mutex::Locker timer_locker(m_threads->timer_lock);
975 Mutex::Locker locker(m_lock);
976 assert(!m_timer_op_tracker.empty());
978 m_timer_op_tracker.finish_op();
979 if (m_leader_lock->is_shutdown()) {
980 dout(20) << "canceling due to shutdown" << dendl;
982 } else if (!is_leader(m_lock)) {
986 if (r < 0 && r != -ETIMEDOUT) {
987 derr << "error notifying hearbeat: " << cpp_strerror(r)
988 << ", releasing leader" << dendl;
989 release_leader_lock();
993 dout(20) << m_heartbeat_response.acks.size() << " acks received, "
994 << m_heartbeat_response.timeouts.size() << " timed out" << dendl;
996 for (auto &it: m_heartbeat_response.acks) {
997 uint64_t notifier_id = it.first.gid;
998 if (notifier_id == m_notifier_id) {
1002 std::string instance_id = stringify(notifier_id);
1003 m_instances->notify(instance_id);
1006 schedule_timer_task("heartbeat", 1, true,
1007 &LeaderWatcher<I>::notify_heartbeat, false);
1010 template <typename I>
1011 void LeaderWatcher<I>::handle_heartbeat(Context *on_notify_ack) {
1015 Mutex::Locker timer_locker(m_threads->timer_lock);
1016 Mutex::Locker locker(m_lock);
1017 if (is_leader(m_lock)) {
1018 dout(5) << "got another leader heartbeat, ignoring" << dendl;
1020 cancel_timer_task();
1021 m_acquire_attempts = 0;
1022 schedule_acquire_leader_lock(1);
1026 on_notify_ack->complete(0);
1029 template <typename I>
1030 void LeaderWatcher<I>::handle_lock_acquired(Context *on_notify_ack) {
1034 Mutex::Locker timer_locker(m_threads->timer_lock);
1035 Mutex::Locker locker(m_lock);
1036 if (is_leader(m_lock)) {
1037 dout(5) << "got another leader lock_acquired, ignoring" << dendl;
1039 cancel_timer_task();
1040 schedule_get_locker(true, 0);
1044 on_notify_ack->complete(0);
1047 template <typename I>
1048 void LeaderWatcher<I>::handle_lock_released(Context *on_notify_ack) {
1052 Mutex::Locker timer_locker(m_threads->timer_lock);
1053 Mutex::Locker locker(m_lock);
1054 if (is_leader(m_lock)) {
1055 dout(5) << "got another leader lock_released, ignoring" << dendl;
1057 cancel_timer_task();
1058 schedule_get_locker(true, 0);
1062 on_notify_ack->complete(0);
1065 template <typename I>
1066 void LeaderWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1067 uint64_t notifier_id, bufferlist &bl) {
1068 dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
1069 << "notifier_id=" << notifier_id << dendl;
1071 Context *ctx = new C_NotifyAck(this, notify_id, handle);
1073 if (notifier_id == m_notifier_id) {
1074 dout(20) << "our own notification, ignoring" << dendl;
1079 NotifyMessage notify_message;
1081 bufferlist::iterator iter = bl.begin();
1082 ::decode(notify_message, iter);
1083 } catch (const buffer::error &err) {
1084 derr << ": error decoding image notification: " << err.what() << dendl;
1089 apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
1092 template <typename I>
1093 void LeaderWatcher<I>::handle_payload(const HeartbeatPayload &payload,
1094 Context *on_notify_ack) {
1095 dout(20) << "heartbeat" << dendl;
1097 handle_heartbeat(on_notify_ack);
1100 template <typename I>
1101 void LeaderWatcher<I>::handle_payload(const LockAcquiredPayload &payload,
1102 Context *on_notify_ack) {
1103 dout(20) << "lock_acquired" << dendl;
1105 handle_lock_acquired(on_notify_ack);
1108 template <typename I>
1109 void LeaderWatcher<I>::handle_payload(const LockReleasedPayload &payload,
1110 Context *on_notify_ack) {
1111 dout(20) << "lock_released" << dendl;
1113 handle_lock_released(on_notify_ack);
1116 template <typename I>
1117 void LeaderWatcher<I>::handle_payload(const UnknownPayload &payload,
1118 Context *on_notify_ack) {
1119 dout(20) << "unknown" << dendl;
1121 on_notify_ack->complete(0);
1124 } // namespace mirror
1127 template class rbd::mirror::LeaderWatcher<librbd::ImageCtx>;