Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd_mirror / LeaderWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "LeaderWatcher.h"
5 #include "common/Timer.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "cls/rbd/cls_rbd_client.h"
9 #include "include/stringify.h"
10 #include "librbd/Utils.h"
11 #include "librbd/watcher/Types.h"
12 #include "Threads.h"
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
16 #undef dout_prefix
17 #define dout_prefix *_dout << "rbd::mirror::LeaderWatcher: " \
18                            << this << " " << __func__ << ": "
19 namespace rbd {
20 namespace mirror {
21
22 using namespace leader_watcher;
23
24 using librbd::util::create_async_context_callback;
25 using librbd::util::create_context_callback;
26 using librbd::util::create_rados_callback;
27
28 template <typename I>
29 LeaderWatcher<I>::LeaderWatcher(Threads<I> *threads, librados::IoCtx &io_ctx,
30                                 Listener *listener)
31   : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER),
32     m_threads(threads), m_listener(listener),
33     m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()),
34     m_notifier_id(librados::Rados(io_ctx).get_instance_id()),
35     m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true,
36                                  m_cct->_conf->get_val<int64_t>(
37                                    "rbd_blacklist_expire_seconds"))) {
38 }
39
40 template <typename I>
41 LeaderWatcher<I>::~LeaderWatcher() {
42   assert(m_status_watcher == nullptr);
43   assert(m_instances == nullptr);
44   assert(m_timer_task == nullptr);
45
46   delete m_leader_lock;
47 }
48
49 template <typename I>
50 std::string LeaderWatcher<I>::get_instance_id() {
51   return stringify(m_notifier_id);
52 }
53
54 template <typename I>
55 int LeaderWatcher<I>::init() {
56   C_SaferCond init_ctx;
57   init(&init_ctx);
58   return init_ctx.wait();
59 }
60
61 template <typename I>
62 void LeaderWatcher<I>::init(Context *on_finish) {
63   dout(20) << "notifier_id=" << m_notifier_id << dendl;
64
65   Mutex::Locker locker(m_lock);
66
67   assert(m_on_finish == nullptr);
68   m_on_finish = on_finish;
69
70   create_leader_object();
71 }
72
73 template <typename I>
74 void LeaderWatcher<I>::create_leader_object() {
75   dout(20) << dendl;
76
77   assert(m_lock.is_locked());
78
79   librados::ObjectWriteOperation op;
80   op.create(false);
81
82   librados::AioCompletion *aio_comp = create_rados_callback<
83     LeaderWatcher<I>, &LeaderWatcher<I>::handle_create_leader_object>(this);
84   int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
85   assert(r == 0);
86   aio_comp->release();
87 }
88
89 template <typename I>
90 void LeaderWatcher<I>::handle_create_leader_object(int r) {
91   dout(20) << "r=" << r << dendl;
92
93   Context *on_finish = nullptr;
94   {
95     Mutex::Locker locker(m_lock);
96
97     if (r == 0) {
98       register_watch();
99       return;
100     }
101
102     derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
103          << dendl;
104
105     std::swap(on_finish, m_on_finish);
106   }
107   on_finish->complete(r);
108 }
109
110 template <typename I>
111 void LeaderWatcher<I>::register_watch() {
112   dout(20) << dendl;
113
114   assert(m_lock.is_locked());
115
116   Context *ctx = create_async_context_callback(
117     m_work_queue, create_context_callback<
118       LeaderWatcher<I>, &LeaderWatcher<I>::handle_register_watch>(this));
119
120   librbd::Watcher::register_watch(ctx);
121 }
122
123 template <typename I>
124 void LeaderWatcher<I>::handle_register_watch(int r) {
125   dout(20) << "r=" << r << dendl;
126
127   Context *on_finish = nullptr;
128   {
129     Mutex::Locker timer_locker(m_threads->timer_lock);
130     Mutex::Locker locker(m_lock);
131
132     if (r < 0) {
133       derr << "error registering leader watcher for " << m_oid << " object: "
134            << cpp_strerror(r) << dendl;
135     } else {
136       schedule_acquire_leader_lock(0);
137     }
138
139     std::swap(on_finish, m_on_finish);
140   }
141   on_finish->complete(r);
142 }
143
144 template <typename I>
145 void LeaderWatcher<I>::shut_down() {
146   C_SaferCond shut_down_ctx;
147   shut_down(&shut_down_ctx);
148   int r = shut_down_ctx.wait();
149   assert(r == 0);
150 }
151
152 template <typename I>
153 void LeaderWatcher<I>::shut_down(Context *on_finish) {
154   dout(20) << dendl;
155
156   Mutex::Locker timer_locker(m_threads->timer_lock);
157   Mutex::Locker locker(m_lock);
158
159   assert(m_on_shut_down_finish == nullptr);
160   m_on_shut_down_finish = on_finish;
161   cancel_timer_task();
162   shut_down_leader_lock();
163 }
164
165 template <typename I>
166 void LeaderWatcher<I>::shut_down_leader_lock() {
167   dout(20) << dendl;
168
169   assert(m_lock.is_locked());
170
171   Context *ctx = create_async_context_callback(
172     m_work_queue, create_context_callback<
173       LeaderWatcher<I>, &LeaderWatcher<I>::handle_shut_down_leader_lock>(this));
174
175   m_leader_lock->shut_down(ctx);
176 }
177
178 template <typename I>
179 void LeaderWatcher<I>::handle_shut_down_leader_lock(int r) {
180   dout(20) << "r=" << r << dendl;
181
182   Mutex::Locker locker(m_lock);
183
184   if (r < 0) {
185     derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl;
186   }
187
188   unregister_watch();
189 }
190
191 template <typename I>
192 void LeaderWatcher<I>::unregister_watch() {
193   dout(20) << dendl;
194
195   assert(m_lock.is_locked());
196
197   Context *ctx = create_async_context_callback(
198     m_work_queue, create_context_callback<
199       LeaderWatcher<I>, &LeaderWatcher<I>::handle_unregister_watch>(this));
200
201   librbd::Watcher::unregister_watch(ctx);
202 }
203
204 template <typename I>
205 void LeaderWatcher<I>::handle_unregister_watch(int r) {
206   dout(20) << "r=" << r << dendl;
207
208   if (r < 0) {
209     derr << "error unregistering leader watcher for " << m_oid << " object: "
210          << cpp_strerror(r) << dendl;
211   }
212   wait_for_tasks();
213 }
214
215 template <typename I>
216 void LeaderWatcher<I>::wait_for_tasks() {
217   dout(20) << dendl;
218
219   Mutex::Locker timer_locker(m_threads->timer_lock);
220   Mutex::Locker locker(m_lock);
221   schedule_timer_task("wait for tasks", 0, false,
222                       &LeaderWatcher<I>::handle_wait_for_tasks, true);
223 }
224
225 template <typename I>
226 void LeaderWatcher<I>::handle_wait_for_tasks() {
227   dout(20) << dendl;
228
229   assert(m_threads->timer_lock.is_locked());
230   assert(m_lock.is_locked());
231   assert(m_on_shut_down_finish != nullptr);
232
233   assert(!m_timer_op_tracker.empty());
234   m_timer_op_tracker.finish_op();
235
236   auto ctx = new FunctionContext([this](int r) {
237       Context *on_finish;
238       {
239         // ensure lock isn't held when completing shut down
240         Mutex::Locker locker(m_lock);
241         assert(m_on_shut_down_finish != nullptr);
242         on_finish = m_on_shut_down_finish;
243       }
244       on_finish->complete(0);
245     });
246   m_work_queue->queue(ctx, 0);
247 }
248
249 template <typename I>
250 bool LeaderWatcher<I>::is_leader() const {
251   Mutex::Locker locker(m_lock);
252
253   return is_leader(m_lock);
254 }
255
256 template <typename I>
257 bool LeaderWatcher<I>::is_leader(Mutex &lock) const {
258   assert(m_lock.is_locked());
259
260   bool leader = m_leader_lock->is_leader();
261   dout(20) << leader << dendl;
262   return leader;
263 }
264
265 template <typename I>
266 bool LeaderWatcher<I>::is_releasing_leader() const {
267   Mutex::Locker locker(m_lock);
268
269   return is_releasing_leader(m_lock);
270 }
271
272 template <typename I>
273 bool LeaderWatcher<I>::is_releasing_leader(Mutex &lock) const {
274   assert(m_lock.is_locked());
275
276   bool releasing = m_leader_lock->is_releasing_leader();
277   dout(20) << releasing << dendl;
278   return releasing;
279 }
280
281 template <typename I>
282 bool LeaderWatcher<I>::get_leader_instance_id(std::string *instance_id) const {
283   dout(20) << dendl;
284
285   Mutex::Locker locker(m_lock);
286
287   if (is_leader(m_lock) || is_releasing_leader(m_lock)) {
288     *instance_id = stringify(m_notifier_id);
289     return true;
290   }
291
292   if (!m_locker.cookie.empty()) {
293     *instance_id = stringify(m_locker.entity.num());
294     return true;
295   }
296
297   return false;
298 }
299
300 template <typename I>
301 void LeaderWatcher<I>::release_leader() {
302   dout(20) << dendl;
303
304   Mutex::Locker locker(m_lock);
305   if (!is_leader(m_lock)) {
306     return;
307   }
308
309   release_leader_lock();
310 }
311
312 template <typename I>
313 void LeaderWatcher<I>::list_instances(std::vector<std::string> *instance_ids) {
314   dout(20) << dendl;
315
316   Mutex::Locker locker(m_lock);
317
318   instance_ids->clear();
319   if (m_instances != nullptr) {
320     m_instances->list(instance_ids);
321   }
322 }
323
324 template <typename I>
325 void LeaderWatcher<I>::cancel_timer_task() {
326   assert(m_threads->timer_lock.is_locked());
327   assert(m_lock.is_locked());
328
329   if (m_timer_task == nullptr) {
330     return;
331   }
332
333   dout(20) << m_timer_task << dendl;
334   bool canceled = m_threads->timer->cancel_event(m_timer_task);
335   assert(canceled);
336   m_timer_task = nullptr;
337 }
338
339 template <typename I>
340 void LeaderWatcher<I>::schedule_timer_task(const std::string &name,
341                                            int delay_factor, bool leader,
342                                            TimerCallback timer_callback,
343                                            bool shutting_down) {
344   assert(m_threads->timer_lock.is_locked());
345   assert(m_lock.is_locked());
346
347   if (!shutting_down && m_on_shut_down_finish != nullptr) {
348     return;
349   }
350
351   cancel_timer_task();
352
353   m_timer_task = new FunctionContext(
354     [this, leader, timer_callback](int r) {
355       assert(m_threads->timer_lock.is_locked());
356       m_timer_task = nullptr;
357
358       if (m_timer_op_tracker.empty()) {
359         Mutex::Locker locker(m_lock);
360         execute_timer_task(leader, timer_callback);
361         return;
362       }
363
364       // old timer task is still running -- do not start next
365       // task until the previous task completes
366       if (m_timer_gate == nullptr) {
367         m_timer_gate = new C_TimerGate(this);
368         m_timer_op_tracker.wait_for_ops(m_timer_gate);
369       }
370       m_timer_gate->leader = leader;
371       m_timer_gate->timer_callback = timer_callback;
372     });
373
374   int after = delay_factor * m_cct->_conf->get_val<int64_t>(
375     "rbd_mirror_leader_heartbeat_interval");
376
377   dout(20) << "scheduling " << name << " after " << after << " sec (task "
378            << m_timer_task << ")" << dendl;
379   m_threads->timer->add_event_after(after, m_timer_task);
380 }
381
382 template <typename I>
383 void LeaderWatcher<I>::execute_timer_task(bool leader,
384                                           TimerCallback timer_callback) {
385   dout(20) << dendl;
386
387   assert(m_threads->timer_lock.is_locked());
388   assert(m_lock.is_locked());
389   assert(m_timer_op_tracker.empty());
390
391   if (is_leader(m_lock) != leader) {
392     return;
393   }
394
395   m_timer_op_tracker.start_op();
396   (this->*timer_callback)();
397 }
398
399 template <typename I>
400 void LeaderWatcher<I>::handle_post_acquire_leader_lock(int r,
401                                                        Context *on_finish) {
402   dout(20) << "r=" << r << dendl;
403
404   if (r < 0) {
405     if (r == -EAGAIN) {
406       dout(20) << "already locked" << dendl;
407     } else {
408       derr << "error acquiring leader lock: " << cpp_strerror(r) << dendl;
409     }
410     on_finish->complete(r);
411     return;
412   }
413
414   Mutex::Locker locker(m_lock);
415   assert(m_on_finish == nullptr);
416   m_on_finish = on_finish;
417   m_ret_val = 0;
418
419   init_status_watcher();
420 }
421
422 template <typename I>
423 void LeaderWatcher<I>::handle_pre_release_leader_lock(Context *on_finish) {
424   dout(20) << dendl;
425
426   Mutex::Locker locker(m_lock);
427   assert(m_on_finish == nullptr);
428   m_on_finish = on_finish;
429   m_ret_val = 0;
430
431   notify_listener();
432 }
433
434 template <typename I>
435 void LeaderWatcher<I>::handle_post_release_leader_lock(int r,
436                                                        Context *on_finish) {
437   dout(20) << "r=" << r << dendl;
438
439   if (r < 0) {
440     on_finish->complete(r);
441     return;
442   }
443
444   Mutex::Locker locker(m_lock);
445   assert(m_on_finish == nullptr);
446   m_on_finish = on_finish;
447
448   notify_lock_released();
449 }
450
451 template <typename I>
452 void LeaderWatcher<I>::break_leader_lock() {
453   dout(20) << dendl;
454
455   assert(m_threads->timer_lock.is_locked());
456   assert(m_lock.is_locked());
457   assert(!m_timer_op_tracker.empty());
458
459   if (m_locker.cookie.empty()) {
460     get_locker();
461     return;
462   }
463
464   Context *ctx = create_async_context_callback(
465     m_work_queue, create_context_callback<
466       LeaderWatcher<I>, &LeaderWatcher<I>::handle_break_leader_lock>(this));
467
468   m_leader_lock->break_lock(m_locker, true, ctx);
469 }
470
471 template <typename I>
472 void LeaderWatcher<I>::handle_break_leader_lock(int r) {
473   dout(20) << "r=" << r << dendl;
474
475   Mutex::Locker timer_locker(m_threads->timer_lock);
476   Mutex::Locker locker(m_lock);
477   assert(!m_timer_op_tracker.empty());
478
479   if (m_leader_lock->is_shutdown()) {
480     dout(20) << "canceling due to shutdown" << dendl;
481     m_timer_op_tracker.finish_op();
482     return;
483   }
484
485   if (r < 0 && r != -ENOENT) {
486     derr << "error beaking leader lock: " << cpp_strerror(r)  << dendl;
487     schedule_acquire_leader_lock(1);
488     m_timer_op_tracker.finish_op();
489     return;
490   }
491
492   m_locker = {};
493   m_acquire_attempts = 0;
494   acquire_leader_lock();
495 }
496
497 template <typename I>
498 void LeaderWatcher<I>::schedule_get_locker(bool reset_leader,
499                                            uint32_t delay_factor) {
500   dout(20) << dendl;
501
502   assert(m_threads->timer_lock.is_locked());
503   assert(m_lock.is_locked());
504
505   if (reset_leader) {
506     m_locker = {};
507     m_acquire_attempts = 0;
508   }
509
510   schedule_timer_task("get locker", delay_factor, false,
511                       &LeaderWatcher<I>::get_locker, false);
512 }
513
514 template <typename I>
515 void LeaderWatcher<I>::get_locker() {
516   dout(20) << dendl;
517
518   assert(m_threads->timer_lock.is_locked());
519   assert(m_lock.is_locked());
520   assert(!m_timer_op_tracker.empty());
521
522   C_GetLocker *get_locker_ctx = new C_GetLocker(this);
523   Context *ctx = create_async_context_callback(m_work_queue, get_locker_ctx);
524
525   m_leader_lock->get_locker(&get_locker_ctx->locker, ctx);
526 }
527
528 template <typename I>
529 void LeaderWatcher<I>::handle_get_locker(int r,
530                                          librbd::managed_lock::Locker& locker) {
531   dout(20) << "r=" << r << dendl;
532
533   Mutex::Locker timer_locker(m_threads->timer_lock);
534   Mutex::Locker mutex_locker(m_lock);
535   assert(!m_timer_op_tracker.empty());
536
537   if (m_leader_lock->is_shutdown()) {
538     dout(20) << "canceling due to shutdown" << dendl;
539     m_timer_op_tracker.finish_op();
540     return;
541   }
542
543   if (is_leader(m_lock)) {
544     m_locker = {};
545     m_timer_op_tracker.finish_op();
546     return;
547   }
548
549   if (r == -ENOENT) {
550     m_locker = {};
551     m_acquire_attempts = 0;
552     acquire_leader_lock();
553     return;
554   } else if (r < 0) {
555     derr << "error retrieving leader locker: " << cpp_strerror(r) << dendl;
556     schedule_get_locker(true, 1);
557     m_timer_op_tracker.finish_op();
558     return;
559   }
560
561   bool notify_listener = false;
562   if (m_locker != locker) {
563     m_locker = locker;
564     notify_listener = true;
565     if (m_acquire_attempts > 1) {
566       dout(10) << "new lock owner detected -- resetting heartbeat counter"
567                << dendl;
568       m_acquire_attempts = 0;
569     }
570   }
571
572   if (m_acquire_attempts >= m_cct->_conf->get_val<int64_t>(
573         "rbd_mirror_leader_max_acquire_attempts_before_break")) {
574     dout(0) << "breaking leader lock after " << m_acquire_attempts << " "
575             << "failed attempts to acquire" << dendl;
576     break_leader_lock();
577     return;
578   }
579
580   schedule_acquire_leader_lock(1);
581
582   if (!notify_listener) {
583     m_timer_op_tracker.finish_op();
584     return;
585   }
586
587   auto ctx = new FunctionContext(
588     [this](int r) {
589       std::string instance_id;
590       if (get_leader_instance_id(&instance_id)) {
591         m_listener->update_leader_handler(instance_id);
592       }
593       Mutex::Locker timer_locker(m_threads->timer_lock);
594       Mutex::Locker locker(m_lock);
595       m_timer_op_tracker.finish_op();
596     });
597   m_work_queue->queue(ctx, 0);
598 }
599
600 template <typename I>
601 void LeaderWatcher<I>::schedule_acquire_leader_lock(uint32_t delay_factor) {
602   dout(20) << dendl;
603
604   assert(m_threads->timer_lock.is_locked());
605   assert(m_lock.is_locked());
606
607   schedule_timer_task("acquire leader lock",
608                       delay_factor *
609                         m_cct->_conf->get_val<int64_t>("rbd_mirror_leader_max_missed_heartbeats"),
610                       false, &LeaderWatcher<I>::acquire_leader_lock, false);
611 }
612
613 template <typename I>
614 void LeaderWatcher<I>::acquire_leader_lock() {
615   assert(m_threads->timer_lock.is_locked());
616   assert(m_lock.is_locked());
617   assert(!m_timer_op_tracker.empty());
618
619   ++m_acquire_attempts;
620   dout(20) << "acquire_attempts=" << m_acquire_attempts << dendl;
621
622   Context *ctx = create_async_context_callback(
623     m_work_queue, create_context_callback<
624       LeaderWatcher<I>, &LeaderWatcher<I>::handle_acquire_leader_lock>(this));
625   m_leader_lock->try_acquire_lock(ctx);
626 }
627
628 template <typename I>
629 void LeaderWatcher<I>::handle_acquire_leader_lock(int r) {
630   dout(20) << "r=" << r << dendl;
631
632   Mutex::Locker timer_locker(m_threads->timer_lock);
633   Mutex::Locker locker(m_lock);
634   assert(!m_timer_op_tracker.empty());
635
636   if (m_leader_lock->is_shutdown()) {
637     dout(20) << "canceling due to shutdown" << dendl;
638     m_timer_op_tracker.finish_op();
639     return;
640   }
641
642   if (r < 0) {
643     if (r == -EAGAIN) {
644       dout(20) << "already locked" << dendl;
645     } else {
646       derr << "error acquiring lock: " << cpp_strerror(r) << dendl;
647     }
648
649     get_locker();
650     return;
651   }
652
653   m_locker = {};
654   m_acquire_attempts = 0;
655
656   if (m_ret_val) {
657     dout(5) << "releasing due to error on notify" << dendl;
658     release_leader_lock();
659     m_timer_op_tracker.finish_op();
660     return;
661   }
662
663   notify_heartbeat();
664 }
665
666 template <typename I>
667 void LeaderWatcher<I>::release_leader_lock() {
668   dout(20) << dendl;
669
670   assert(m_lock.is_locked());
671
672   Context *ctx = create_async_context_callback(
673     m_work_queue, create_context_callback<
674       LeaderWatcher<I>, &LeaderWatcher<I>::handle_release_leader_lock>(this));
675
676   m_leader_lock->release_lock(ctx);
677 }
678
679 template <typename I>
680 void LeaderWatcher<I>::handle_release_leader_lock(int r) {
681   dout(20) << "r=" << r << dendl;
682
683   Mutex::Locker timer_locker(m_threads->timer_lock);
684   Mutex::Locker locker(m_lock);
685
686   if (r < 0) {
687     derr << "error releasing lock: " << cpp_strerror(r) << dendl;
688     return;
689   }
690
691   schedule_acquire_leader_lock(1);
692 }
693
694 template <typename I>
695 void LeaderWatcher<I>::init_status_watcher() {
696   dout(20) << dendl;
697
698   assert(m_lock.is_locked());
699   assert(m_status_watcher == nullptr);
700
701   m_status_watcher = MirrorStatusWatcher<I>::create(m_ioctx, m_work_queue);
702
703   Context *ctx = create_context_callback<
704     LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_status_watcher>(this);
705
706   m_status_watcher->init(ctx);
707 }
708
709 template <typename I>
710 void LeaderWatcher<I>::handle_init_status_watcher(int r) {
711   dout(20) << "r=" << r << dendl;
712
713   Context *on_finish = nullptr;
714   {
715     Mutex::Locker locker(m_lock);
716
717     if (r == 0) {
718       init_instances();
719       return;
720     }
721
722     derr << "error initializing mirror status watcher: " << cpp_strerror(r)
723          << dendl;
724     m_status_watcher->destroy();
725     m_status_watcher = nullptr;
726     assert(m_on_finish != nullptr);
727     std::swap(m_on_finish, on_finish);
728   }
729   on_finish->complete(r);
730 }
731
732 template <typename I>
733 void LeaderWatcher<I>::shut_down_status_watcher() {
734   dout(20) << dendl;
735
736   assert(m_lock.is_locked());
737   assert(m_status_watcher != nullptr);
738
739   Context *ctx = create_async_context_callback(
740     m_work_queue, create_context_callback<LeaderWatcher<I>,
741       &LeaderWatcher<I>::handle_shut_down_status_watcher>(this));
742
743   m_status_watcher->shut_down(ctx);
744 }
745
746 template <typename I>
747 void LeaderWatcher<I>::handle_shut_down_status_watcher(int r) {
748   dout(20) << "r=" << r << dendl;
749
750   Context *on_finish = nullptr;
751   {
752     Mutex::Locker locker(m_lock);
753
754     m_status_watcher->destroy();
755     m_status_watcher = nullptr;
756
757     if (r < 0) {
758       derr << "error shutting mirror status watcher down: " << cpp_strerror(r)
759            << dendl;
760     }
761
762     if (m_ret_val != 0) {
763       r = m_ret_val;
764     }
765
766     if (!is_leader(m_lock)) {
767       // ignore on releasing
768       r = 0;
769     }
770
771     assert(m_on_finish != nullptr);
772     std::swap(m_on_finish, on_finish);
773   }
774   on_finish->complete(r);
775 }
776
777 template <typename I>
778 void LeaderWatcher<I>::init_instances() {
779   dout(20) << dendl;
780
781   assert(m_lock.is_locked());
782   assert(m_instances == nullptr);
783
784   m_instances = Instances<I>::create(m_threads, m_ioctx);
785
786   Context *ctx = create_context_callback<
787     LeaderWatcher<I>, &LeaderWatcher<I>::handle_init_instances>(this);
788
789   m_instances->init(ctx);
790 }
791
792 template <typename I>
793 void LeaderWatcher<I>::handle_init_instances(int r) {
794   dout(20) << "r=" << r << dendl;
795
796   Mutex::Locker locker(m_lock);
797
798   if (r < 0) {
799     derr << "error initializing instances: " << cpp_strerror(r) << dendl;
800     m_ret_val = r;
801     m_instances->destroy();
802     m_instances = nullptr;
803     shut_down_status_watcher();
804     return;
805   }
806
807   notify_listener();
808 }
809
810 template <typename I>
811 void LeaderWatcher<I>::shut_down_instances() {
812   dout(20) << dendl;
813
814   assert(m_lock.is_locked());
815   assert(m_instances != nullptr);
816
817   Context *ctx = create_async_context_callback(
818     m_work_queue, create_context_callback<LeaderWatcher<I>,
819       &LeaderWatcher<I>::handle_shut_down_instances>(this));
820
821   m_instances->shut_down(ctx);
822 }
823
824 template <typename I>
825 void LeaderWatcher<I>::handle_shut_down_instances(int r) {
826   dout(20) << "r=" << r << dendl;
827   assert(r == 0);
828
829   Mutex::Locker locker(m_lock);
830
831   m_instances->destroy();
832   m_instances = nullptr;
833
834   shut_down_status_watcher();
835 }
836
837 template <typename I>
838 void LeaderWatcher<I>::notify_listener() {
839   dout(20) << dendl;
840
841   assert(m_lock.is_locked());
842
843   Context *ctx = create_async_context_callback(
844     m_work_queue, create_context_callback<
845       LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_listener>(this));
846
847   if (is_leader(m_lock)) {
848     ctx = new FunctionContext(
849       [this, ctx](int r) {
850         m_listener->post_acquire_handler(ctx);
851       });
852   } else {
853     ctx = new FunctionContext(
854       [this, ctx](int r) {
855         m_listener->pre_release_handler(ctx);
856       });
857   }
858   m_work_queue->queue(ctx, 0);
859 }
860
861 template <typename I>
862 void LeaderWatcher<I>::handle_notify_listener(int r) {
863   dout(20) << "r=" << r << dendl;
864
865   Mutex::Locker locker(m_lock);
866
867   if (r < 0) {
868     derr << "error notifying listener: " << cpp_strerror(r) << dendl;
869     m_ret_val = r;
870   }
871
872   if (is_leader(m_lock)) {
873     notify_lock_acquired();
874   } else {
875     shut_down_instances();
876   }
877 }
878
879 template <typename I>
880 void LeaderWatcher<I>::notify_lock_acquired() {
881   dout(20) << dendl;
882
883   assert(m_lock.is_locked());
884
885   Context *ctx = create_context_callback<
886     LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_acquired>(this);
887
888   bufferlist bl;
889   ::encode(NotifyMessage{LockAcquiredPayload{}}, bl);
890
891   send_notify(bl, nullptr, ctx);
892 }
893
894 template <typename I>
895 void LeaderWatcher<I>::handle_notify_lock_acquired(int r) {
896   dout(20) << "r=" << r << dendl;
897
898   Context *on_finish = nullptr;
899   {
900     Mutex::Locker locker(m_lock);
901     if (r < 0 && r != -ETIMEDOUT) {
902       derr << "error notifying leader lock acquired: " << cpp_strerror(r)
903            << dendl;
904       m_ret_val = r;
905     }
906
907     assert(m_on_finish != nullptr);
908     std::swap(m_on_finish, on_finish);
909   }
910   on_finish->complete(0);
911 }
912
913 template <typename I>
914 void LeaderWatcher<I>::notify_lock_released() {
915   dout(20) << dendl;
916
917   assert(m_lock.is_locked());
918
919   Context *ctx = create_context_callback<
920     LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_lock_released>(this);
921
922   bufferlist bl;
923   ::encode(NotifyMessage{LockReleasedPayload{}}, bl);
924
925   send_notify(bl, nullptr, ctx);
926 }
927
928 template <typename I>
929 void LeaderWatcher<I>::handle_notify_lock_released(int r) {
930   dout(20) << "r=" << r << dendl;
931
932   Context *on_finish = nullptr;
933   {
934     Mutex::Locker locker(m_lock);
935     if (r < 0 && r != -ETIMEDOUT) {
936       derr << "error notifying leader lock released: " << cpp_strerror(r)
937            << dendl;
938     }
939
940     assert(m_on_finish != nullptr);
941     std::swap(m_on_finish, on_finish);
942   }
943   on_finish->complete(r);
944 }
945
946 template <typename I>
947 void LeaderWatcher<I>::notify_heartbeat() {
948   dout(20) << dendl;
949
950   assert(m_threads->timer_lock.is_locked());
951   assert(m_lock.is_locked());
952   assert(!m_timer_op_tracker.empty());
953
954   if (!is_leader(m_lock)) {
955     dout(5) << "not leader, canceling" << dendl;
956     m_timer_op_tracker.finish_op();
957     return;
958   }
959
960   Context *ctx = create_context_callback<
961     LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_heartbeat>(this);
962
963   bufferlist bl;
964   ::encode(NotifyMessage{HeartbeatPayload{}}, bl);
965
966   m_heartbeat_response.acks.clear();
967   send_notify(bl, &m_heartbeat_response, ctx);
968 }
969
970 template <typename I>
971 void LeaderWatcher<I>::handle_notify_heartbeat(int r) {
972   dout(20) << "r=" << r << dendl;
973
974   Mutex::Locker timer_locker(m_threads->timer_lock);
975   Mutex::Locker locker(m_lock);
976   assert(!m_timer_op_tracker.empty());
977
978   m_timer_op_tracker.finish_op();
979   if (m_leader_lock->is_shutdown()) {
980     dout(20) << "canceling due to shutdown" << dendl;
981     return;
982   } else if (!is_leader(m_lock)) {
983     return;
984   }
985
986   if (r < 0 && r != -ETIMEDOUT) {
987     derr << "error notifying hearbeat: " << cpp_strerror(r)
988          <<  ", releasing leader" << dendl;
989     release_leader_lock();
990     return;
991   }
992
993   dout(20) << m_heartbeat_response.acks.size() << " acks received, "
994            << m_heartbeat_response.timeouts.size() << " timed out" << dendl;
995
996   for (auto &it: m_heartbeat_response.acks) {
997     uint64_t notifier_id = it.first.gid;
998     if (notifier_id == m_notifier_id) {
999       continue;
1000     }
1001
1002     std::string instance_id = stringify(notifier_id);
1003     m_instances->notify(instance_id);
1004   }
1005
1006   schedule_timer_task("heartbeat", 1, true,
1007                       &LeaderWatcher<I>::notify_heartbeat, false);
1008 }
1009
1010 template <typename I>
1011 void LeaderWatcher<I>::handle_heartbeat(Context *on_notify_ack) {
1012   dout(20) << dendl;
1013
1014   {
1015     Mutex::Locker timer_locker(m_threads->timer_lock);
1016     Mutex::Locker locker(m_lock);
1017     if (is_leader(m_lock)) {
1018       dout(5) << "got another leader heartbeat, ignoring" << dendl;
1019     } else {
1020       cancel_timer_task();
1021       m_acquire_attempts = 0;
1022       schedule_acquire_leader_lock(1);
1023     }
1024   }
1025
1026   on_notify_ack->complete(0);
1027 }
1028
1029 template <typename I>
1030 void LeaderWatcher<I>::handle_lock_acquired(Context *on_notify_ack) {
1031   dout(20) << dendl;
1032
1033   {
1034     Mutex::Locker timer_locker(m_threads->timer_lock);
1035     Mutex::Locker locker(m_lock);
1036     if (is_leader(m_lock)) {
1037       dout(5) << "got another leader lock_acquired, ignoring" << dendl;
1038     } else {
1039       cancel_timer_task();
1040       schedule_get_locker(true, 0);
1041     }
1042   }
1043
1044   on_notify_ack->complete(0);
1045 }
1046
1047 template <typename I>
1048 void LeaderWatcher<I>::handle_lock_released(Context *on_notify_ack) {
1049   dout(20) << dendl;
1050
1051   {
1052     Mutex::Locker timer_locker(m_threads->timer_lock);
1053     Mutex::Locker locker(m_lock);
1054     if (is_leader(m_lock)) {
1055       dout(5) << "got another leader lock_released, ignoring" << dendl;
1056     } else {
1057       cancel_timer_task();
1058       schedule_get_locker(true, 0);
1059     }
1060   }
1061
1062   on_notify_ack->complete(0);
1063 }
1064
1065 template <typename I>
1066 void LeaderWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1067                                      uint64_t notifier_id, bufferlist &bl) {
1068   dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
1069            << "notifier_id=" << notifier_id << dendl;
1070
1071   Context *ctx = new C_NotifyAck(this, notify_id, handle);
1072
1073   if (notifier_id == m_notifier_id) {
1074     dout(20) << "our own notification, ignoring" << dendl;
1075     ctx->complete(0);
1076     return;
1077   }
1078
1079   NotifyMessage notify_message;
1080   try {
1081     bufferlist::iterator iter = bl.begin();
1082     ::decode(notify_message, iter);
1083   } catch (const buffer::error &err) {
1084     derr << ": error decoding image notification: " << err.what() << dendl;
1085     ctx->complete(0);
1086     return;
1087   }
1088
1089   apply_visitor(HandlePayloadVisitor(this, ctx), notify_message.payload);
1090 }
1091
1092 template <typename I>
1093 void LeaderWatcher<I>::handle_payload(const HeartbeatPayload &payload,
1094                                       Context *on_notify_ack) {
1095   dout(20) << "heartbeat" << dendl;
1096
1097   handle_heartbeat(on_notify_ack);
1098 }
1099
1100 template <typename I>
1101 void LeaderWatcher<I>::handle_payload(const LockAcquiredPayload &payload,
1102                                       Context *on_notify_ack) {
1103   dout(20) << "lock_acquired" << dendl;
1104
1105   handle_lock_acquired(on_notify_ack);
1106 }
1107
1108 template <typename I>
1109 void LeaderWatcher<I>::handle_payload(const LockReleasedPayload &payload,
1110                                       Context *on_notify_ack) {
1111   dout(20) << "lock_released" << dendl;
1112
1113   handle_lock_released(on_notify_ack);
1114 }
1115
1116 template <typename I>
1117 void LeaderWatcher<I>::handle_payload(const UnknownPayload &payload,
1118                                       Context *on_notify_ack) {
1119   dout(20) << "unknown" << dendl;
1120
1121   on_notify_ack->complete(0);
1122 }
1123
1124 } // namespace mirror
1125 } // namespace rbd
1126
1127 template class rbd::mirror::LeaderWatcher<librbd::ImageCtx>;