Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / tools / rbd_mirror / InstanceWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "InstanceWatcher.h"
5 #include "include/stringify.h"
6 #include "common/debug.h"
7 #include "common/errno.h"
8 #include "cls/rbd/cls_rbd_client.h"
9 #include "librbd/ManagedLock.h"
10 #include "librbd/Utils.h"
11 #include "InstanceReplayer.h"
12 #include "ImageSyncThrottler.h"
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rbd_mirror
16 #undef dout_prefix
17 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: "
18
19 namespace rbd {
20 namespace mirror {
21
22 using namespace instance_watcher;
23
24 using librbd::util::create_async_context_callback;
25 using librbd::util::create_context_callback;
26 using librbd::util::create_rados_callback;
27 using librbd::util::unique_lock_name;
28
29 namespace {
30
31 struct C_GetInstances : public Context {
32   std::vector<std::string> *instance_ids;
33   Context *on_finish;
34   bufferlist out_bl;
35
36   C_GetInstances(std::vector<std::string> *instance_ids, Context *on_finish)
37     : instance_ids(instance_ids), on_finish(on_finish) {
38   }
39
40   void finish(int r) override {
41     dout(20) << "C_GetInstances: " << this << " " <<  __func__ << ": r=" << r
42              << dendl;
43
44     if (r == 0) {
45       bufferlist::iterator it = out_bl.begin();
46       r = librbd::cls_client::mirror_instances_list_finish(&it, instance_ids);
47     } else if (r == -ENOENT) {
48       r = 0;
49     }
50     on_finish->complete(r);
51   }
52 };
53
54 template <typename I>
55 struct C_RemoveInstanceRequest : public Context {
56   InstanceWatcher<I> instance_watcher;
57   Context *on_finish;
58
59   C_RemoveInstanceRequest(librados::IoCtx &io_ctx, ContextWQ *work_queue,
60                           const std::string &instance_id, Context *on_finish)
61     : instance_watcher(io_ctx, work_queue, nullptr, instance_id),
62       on_finish(on_finish) {
63   }
64
65   void send() {
66     dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << dendl;
67
68     instance_watcher.remove(this);
69   }
70
71   void finish(int r) override {
72     dout(20) << "C_RemoveInstanceRequest: " << this << " " << __func__ << ": r="
73              << r << dendl;
74     assert(r == 0);
75
76     on_finish->complete(r);
77   }
78 };
79
80 } // anonymous namespace
81
82 template <typename I>
83 struct InstanceWatcher<I>::C_NotifyInstanceRequest : public Context {
84   InstanceWatcher<I> *instance_watcher;
85   std::string instance_id;
86   uint64_t request_id;
87   bufferlist bl;
88   Context *on_finish;
89   bool send_to_leader;
90   std::unique_ptr<librbd::watcher::Notifier> notifier;
91   librbd::watcher::NotifyResponse response;
92   bool canceling = false;
93
94   C_NotifyInstanceRequest(InstanceWatcher<I> *instance_watcher,
95                           const std::string &instance_id, uint64_t request_id,
96                           bufferlist &&bl, Context *on_finish)
97     : instance_watcher(instance_watcher), instance_id(instance_id),
98       request_id(request_id), bl(bl), on_finish(on_finish),
99       send_to_leader(instance_id.empty()) {
100     dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
101              << ": instance_watcher=" << instance_watcher << ", instance_id="
102              << instance_id << ", request_id=" << request_id << dendl;
103
104     assert(instance_watcher->m_lock.is_locked());
105
106     if (!send_to_leader) {
107       assert((!instance_id.empty()));
108       notifier.reset(new librbd::watcher::Notifier(
109                          instance_watcher->m_work_queue,
110                          instance_watcher->m_ioctx,
111                          RBD_MIRROR_INSTANCE_PREFIX + instance_id));
112     }
113
114     instance_watcher->m_notify_op_tracker.start_op();
115     auto result = instance_watcher->m_notify_ops.insert(
116         std::make_pair(instance_id, this)).second;
117     assert(result);
118   }
119
120   void send() {
121     dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
122
123     assert(instance_watcher->m_lock.is_locked());
124
125     if (canceling) {
126       dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
127                << ": canceling" << dendl;
128       instance_watcher->m_work_queue->queue(this, -ECANCELED);
129       return;
130     }
131
132     if (send_to_leader) {
133       if (instance_watcher->m_leader_instance_id.empty()) {
134         dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
135                  << ": suspending" << dendl;
136         instance_watcher->suspend_notify_request(this);
137         return;
138       }
139
140       if (instance_watcher->m_leader_instance_id != instance_id) {
141         auto count = instance_watcher->m_notify_ops.erase(
142             std::make_pair(instance_id, this));
143         assert(count > 0);
144
145         instance_id = instance_watcher->m_leader_instance_id;
146
147         auto result = instance_watcher->m_notify_ops.insert(
148             std::make_pair(instance_id, this)).second;
149         assert(result);
150
151         notifier.reset(new librbd::watcher::Notifier(
152                            instance_watcher->m_work_queue,
153                            instance_watcher->m_ioctx,
154                            RBD_MIRROR_INSTANCE_PREFIX + instance_id));
155       }
156     }
157
158     dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
159              << ": sending to " << instance_id << dendl;
160     notifier->notify(bl, &response, this);
161   }
162
163   void cancel() {
164     dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl;
165
166     assert(instance_watcher->m_lock.is_locked());
167
168     canceling = true;
169     instance_watcher->unsuspend_notify_request(this);
170   }
171
172   void finish(int r) override {
173     dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": r="
174              << r << dendl;
175
176     if (r == 0 || r == -ETIMEDOUT) {
177       bool found = false;
178       for (auto &it : response.acks) {
179         auto &bl = it.second;
180         if (it.second.length() == 0) {
181           dout(20) << "C_NotifyInstanceRequest: " << this << " " << __func__
182                    << ": no payload in ack, ignoring" << dendl;
183           continue;
184         }
185         try {
186           auto iter = bl.begin();
187           NotifyAckPayload ack;
188           ::decode(ack, iter);
189           if (ack.instance_id != instance_watcher->get_instance_id()) {
190             derr << "C_NotifyInstanceRequest: " << this << " " << __func__
191                  << ": ack instance_id (" << ack.instance_id << ") "
192                  << "does not match, ignoring" << dendl;
193             continue;
194           }
195           if (ack.request_id != request_id) {
196             derr << "C_NotifyInstanceRequest: " << this << " " << __func__
197                  << ": ack request_id (" << ack.request_id << ") "
198                  << "does not match, ignoring" << dendl;
199             continue;
200           }
201           r = ack.ret_val;
202           found = true;
203           break;
204         } catch (const buffer::error &err) {
205           derr << "C_NotifyInstanceRequest: " << this << " " << __func__
206                << ": failed to decode ack: " << err.what() << dendl;
207           continue;
208         }
209       }
210
211       if (!found) {
212         if (r == -ETIMEDOUT) {
213           derr << "C_NotifyInstanceRequest: " << this << " " << __func__
214                << ": resending after timeout" << dendl;
215           Mutex::Locker locker(instance_watcher->m_lock);
216           send();
217           return;
218         } else {
219           r = -EINVAL;
220         }
221       } else {
222         if (r == -ESTALE && send_to_leader) {
223           derr << "C_NotifyInstanceRequest: " << this << " " << __func__
224                << ": resending due to leader change" << dendl;
225           Mutex::Locker locker(instance_watcher->m_lock);
226           send();
227           return;
228         }
229       }
230     }
231
232     on_finish->complete(r);
233
234     {
235       Mutex::Locker locker(instance_watcher->m_lock);
236       auto result = instance_watcher->m_notify_ops.erase(
237         std::make_pair(instance_id, this));
238       assert(result > 0);
239       instance_watcher->m_notify_op_tracker.finish_op();
240     }
241
242     delete this;
243   }
244
245   void complete(int r) override {
246     finish(r);
247   }
248 };
249
250 template <typename I>
251 struct InstanceWatcher<I>::C_SyncRequest : public Context {
252   InstanceWatcher<I> *instance_watcher;
253   std::string sync_id;
254   Context *on_start;
255   Context *on_complete = nullptr;
256   C_NotifyInstanceRequest *req = nullptr;
257
258   C_SyncRequest(InstanceWatcher<I> *instance_watcher,
259                 const std::string &sync_id, Context *on_start)
260     : instance_watcher(instance_watcher), sync_id(sync_id),
261       on_start(on_start) {
262     dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": sync_id="
263              << sync_id << dendl;
264   }
265
266   void finish(int r) override {
267     dout(20) << "C_SyncRequest: " << this << " " << __func__ << ": r="
268              << r << dendl;
269
270     if (on_start != nullptr) {
271       instance_watcher->handle_notify_sync_request(this, r);
272     } else {
273       instance_watcher->handle_notify_sync_complete(this, r);
274       delete this;
275     }
276   }
277
278   // called twice
279   void complete(int r) override {
280     finish(r);
281   }
282 };
283
284 #undef dout_prefix
285 #define dout_prefix *_dout << "rbd::mirror::InstanceWatcher: " \
286                            << this << " " << __func__ << ": "
287 template <typename I>
288 void InstanceWatcher<I>::get_instances(librados::IoCtx &io_ctx,
289                                        std::vector<std::string> *instance_ids,
290                                        Context *on_finish) {
291   librados::ObjectReadOperation op;
292   librbd::cls_client::mirror_instances_list_start(&op);
293   C_GetInstances *ctx = new C_GetInstances(instance_ids, on_finish);
294   librados::AioCompletion *aio_comp = create_rados_callback(ctx);
295
296   int r = io_ctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op, &ctx->out_bl);
297   assert(r == 0);
298   aio_comp->release();
299 }
300
301 template <typename I>
302 void InstanceWatcher<I>::remove_instance(librados::IoCtx &io_ctx,
303                                          ContextWQ *work_queue,
304                                          const std::string &instance_id,
305                                          Context *on_finish) {
306   auto req = new C_RemoveInstanceRequest<I>(io_ctx, work_queue, instance_id,
307                                             on_finish);
308   req->send();
309 }
310
311 template <typename I>
312 InstanceWatcher<I> *InstanceWatcher<I>::create(
313     librados::IoCtx &io_ctx, ContextWQ *work_queue,
314     InstanceReplayer<I> *instance_replayer) {
315   return new InstanceWatcher<I>(io_ctx, work_queue, instance_replayer,
316                                 stringify(io_ctx.get_instance_id()));
317 }
318
319 template <typename I>
320 InstanceWatcher<I>::InstanceWatcher(librados::IoCtx &io_ctx,
321                                     ContextWQ *work_queue,
322                                     InstanceReplayer<I> *instance_replayer,
323                                     const std::string &instance_id)
324   : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id),
325     m_instance_replayer(instance_replayer), m_instance_id(instance_id),
326     m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)),
327     m_instance_lock(librbd::ManagedLock<I>::create(
328       m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true,
329       m_cct->_conf->get_val<int64_t>("rbd_blacklist_expire_seconds"))) {
330 }
331
332 template <typename I>
333 InstanceWatcher<I>::~InstanceWatcher() {
334   assert(m_notify_ops.empty());
335   assert(m_notify_op_tracker.empty());
336   assert(m_suspended_ops.empty());
337   assert(m_inflight_sync_reqs.empty());
338   assert(m_image_sync_throttler == nullptr);
339   m_instance_lock->destroy();
340 }
341
342 template <typename I>
343 int InstanceWatcher<I>::init() {
344   C_SaferCond init_ctx;
345   init(&init_ctx);
346   return init_ctx.wait();
347 }
348
349 template <typename I>
350 void InstanceWatcher<I>::init(Context *on_finish) {
351   dout(20) << "instance_id=" << m_instance_id << dendl;
352
353   Mutex::Locker locker(m_lock);
354
355   assert(m_on_finish == nullptr);
356   m_on_finish = on_finish;
357   m_ret_val = 0;
358
359   register_instance();
360 }
361
362 template <typename I>
363 void InstanceWatcher<I>::shut_down() {
364   C_SaferCond shut_down_ctx;
365   shut_down(&shut_down_ctx);
366   int r = shut_down_ctx.wait();
367   assert(r == 0);
368 }
369
370 template <typename I>
371 void InstanceWatcher<I>::shut_down(Context *on_finish) {
372   dout(20) << dendl;
373
374   Mutex::Locker locker(m_lock);
375
376   assert(m_on_finish == nullptr);
377   m_on_finish = on_finish;
378   m_ret_val = 0;
379
380   release_lock();
381 }
382
383 template <typename I>
384 void InstanceWatcher<I>::remove(Context *on_finish) {
385   dout(20) << dendl;
386
387   Mutex::Locker locker(m_lock);
388
389   assert(m_on_finish == nullptr);
390   m_on_finish = on_finish;
391   m_ret_val = 0;
392   m_removing = true;
393
394   get_instance_locker();
395 }
396
397 template <typename I>
398 void InstanceWatcher<I>::notify_image_acquire(
399     const std::string &instance_id, const std::string &global_image_id,
400     Context *on_notify_ack) {
401   dout(20) << "instance_id=" << instance_id << ", global_image_id="
402            << global_image_id << dendl;
403
404   Mutex::Locker locker(m_lock);
405
406   assert(m_on_finish == nullptr);
407
408   if (instance_id == m_instance_id) {
409     handle_image_acquire(global_image_id, on_notify_ack);
410   } else {
411     uint64_t request_id = ++m_request_seq;
412     bufferlist bl;
413     ::encode(NotifyMessage{ImageAcquirePayload{request_id, global_image_id}},
414              bl);
415     auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
416                                            std::move(bl), on_notify_ack);
417     req->send();
418   }
419 }
420
421 template <typename I>
422 void InstanceWatcher<I>::notify_image_release(
423     const std::string &instance_id, const std::string &global_image_id,
424     Context *on_notify_ack) {
425   dout(20) << "instance_id=" << instance_id << ", global_image_id="
426            << global_image_id << dendl;
427
428   Mutex::Locker locker(m_lock);
429
430   assert(m_on_finish == nullptr);
431
432   if (instance_id == m_instance_id) {
433     handle_image_release(global_image_id, on_notify_ack);
434   } else {
435     uint64_t request_id = ++m_request_seq;
436     bufferlist bl;
437     ::encode(NotifyMessage{ImageReleasePayload{request_id, global_image_id}},
438              bl);
439     auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
440                                            std::move(bl), on_notify_ack);
441     req->send();
442   }
443 }
444
445 template <typename I>
446 void InstanceWatcher<I>::notify_peer_image_removed(
447     const std::string &instance_id, const std::string &global_image_id,
448     const std::string &peer_mirror_uuid, Context *on_notify_ack) {
449   dout(20) << "instance_id=" << instance_id << ", "
450            << "global_image_id=" << global_image_id << ", "
451            << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
452
453   Mutex::Locker locker(m_lock);
454   assert(m_on_finish == nullptr);
455
456   if (instance_id == m_instance_id) {
457     handle_peer_image_removed(global_image_id, peer_mirror_uuid, on_notify_ack);
458   } else {
459     uint64_t request_id = ++m_request_seq;
460     bufferlist bl;
461     ::encode(NotifyMessage{PeerImageRemovedPayload{request_id, global_image_id,
462                                                    peer_mirror_uuid}}, bl);
463     auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
464                                            std::move(bl), on_notify_ack);
465     req->send();
466   }
467 }
468
469 template <typename I>
470 void InstanceWatcher<I>::notify_sync_request(const std::string &sync_id,
471                                              Context *on_sync_start) {
472   dout(20) << "sync_id=" << sync_id << dendl;
473
474   Mutex::Locker locker(m_lock);
475
476   assert(m_inflight_sync_reqs.count(sync_id) == 0);
477
478   uint64_t request_id = ++m_request_seq;
479
480   bufferlist bl;
481   ::encode(NotifyMessage{SyncRequestPayload{request_id, sync_id}}, bl);
482
483   auto sync_ctx = new C_SyncRequest(this, sync_id, on_sync_start);
484   sync_ctx->req = new C_NotifyInstanceRequest(this, "", request_id,
485                                               std::move(bl), sync_ctx);
486
487   m_inflight_sync_reqs[sync_id] = sync_ctx;
488   sync_ctx->req->send();
489 }
490
491 template <typename I>
492 bool InstanceWatcher<I>::cancel_sync_request(const std::string &sync_id) {
493   dout(20) << "sync_id=" << sync_id << dendl;
494
495   Mutex::Locker locker(m_lock);
496
497   auto it = m_inflight_sync_reqs.find(sync_id);
498   if (it == m_inflight_sync_reqs.end()) {
499     return false;
500   }
501
502   auto sync_ctx = it->second;
503
504   if (sync_ctx->on_start == nullptr) {
505     return false;
506   }
507
508   assert(sync_ctx->req != nullptr);
509   sync_ctx->req->cancel();
510   return true;
511 }
512
513 template <typename I>
514 void InstanceWatcher<I>::notify_sync_start(const std::string &instance_id,
515                                            const std::string &sync_id) {
516   dout(20) << "sync_id=" << sync_id << dendl;
517
518   Mutex::Locker locker(m_lock);
519
520   uint64_t request_id = ++m_request_seq;
521
522   bufferlist bl;
523   ::encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
524
525   auto ctx = new FunctionContext(
526     [this, sync_id] (int r) {
527       dout(20) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
528       Mutex::Locker locker(m_lock);
529       if (r != -ESTALE && m_image_sync_throttler != nullptr) {
530         m_image_sync_throttler->finish_op(sync_id);
531       }
532     });
533   auto req = new C_NotifyInstanceRequest(this, instance_id, request_id,
534                                          std::move(bl), ctx);
535   req->send();
536 }
537
538 template <typename I>
539 void InstanceWatcher<I>::notify_sync_complete(const std::string &sync_id) {
540   dout(20) << "sync_id=" << sync_id << dendl;
541
542   Mutex::Locker locker(m_lock);
543
544   auto it = m_inflight_sync_reqs.find(sync_id);
545   assert(it != m_inflight_sync_reqs.end());
546
547   auto sync_ctx = it->second;
548   assert(sync_ctx->req == nullptr);
549
550   m_inflight_sync_reqs.erase(it);
551   m_work_queue->queue(sync_ctx, 0);
552 }
553
554 template <typename I>
555 void InstanceWatcher<I>::handle_notify_sync_request(C_SyncRequest *sync_ctx,
556                                                     int r) {
557   dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
558
559   Context *on_start = nullptr;
560   {
561     Mutex::Locker locker(m_lock);
562
563     assert(sync_ctx->req != nullptr);
564     assert(sync_ctx->on_start != nullptr);
565
566     if (sync_ctx->req->canceling) {
567       r = -ECANCELED;
568     }
569
570     std::swap(sync_ctx->on_start, on_start);
571     sync_ctx->req = nullptr;
572   }
573
574   on_start->complete(r == -ECANCELED ? r : 0);
575
576   if (r == -ECANCELED) {
577     notify_sync_complete(sync_ctx->sync_id);
578   }
579 }
580
581 template <typename I>
582 void InstanceWatcher<I>::handle_notify_sync_complete(C_SyncRequest *sync_ctx,
583                                                      int r) {
584   dout(20) << "sync_id=" << sync_ctx->sync_id << ", r=" << r << dendl;
585
586   if (sync_ctx->on_complete != nullptr) {
587     sync_ctx->on_complete->complete(r);
588   }
589 }
590
591 template <typename I>
592 void InstanceWatcher<I>::print_sync_status(Formatter *f, stringstream *ss) {
593   dout(20) << dendl;
594
595   Mutex::Locker locker(m_lock);
596   if (m_image_sync_throttler != nullptr) {
597     m_image_sync_throttler->print_status(f, ss);
598   }
599 }
600
601 template <typename I>
602 void InstanceWatcher<I>::handle_acquire_leader() {
603   dout(20) << dendl;
604
605   Mutex::Locker locker(m_lock);
606
607   assert(m_image_sync_throttler == nullptr);
608   m_image_sync_throttler = ImageSyncThrottler<I>::create();
609
610   m_leader_instance_id = m_instance_id;
611   unsuspend_notify_requests();
612 }
613
614 template <typename I>
615 void InstanceWatcher<I>::handle_release_leader() {
616   dout(20) << dendl;
617
618   Mutex::Locker locker(m_lock);
619
620   assert(m_image_sync_throttler != nullptr);
621
622   m_leader_instance_id.clear();
623
624   m_image_sync_throttler->drain(-ESTALE);
625   m_image_sync_throttler->destroy();
626   m_image_sync_throttler = nullptr;
627 }
628
629 template <typename I>
630 void InstanceWatcher<I>::handle_update_leader(
631   const std::string &leader_instance_id) {
632   dout(20) << "leader_instance_id=" << leader_instance_id << dendl;
633
634   Mutex::Locker locker(m_lock);
635
636   m_leader_instance_id = leader_instance_id;
637
638   if (!m_leader_instance_id.empty()) {
639     unsuspend_notify_requests();
640   }
641 }
642
643 template <typename I>
644 void InstanceWatcher<I>::cancel_notify_requests(
645     const std::string &instance_id) {
646   dout(20) << "instance_id=" << instance_id << dendl;
647
648   Mutex::Locker locker(m_lock);
649
650   for (auto op : m_notify_ops) {
651     if (op.first == instance_id && !op.second->send_to_leader) {
652       op.second->cancel();
653     }
654   }
655 }
656
657 template <typename I>
658 void InstanceWatcher<I>::register_instance() {
659   assert(m_lock.is_locked());
660
661   dout(20) << dendl;
662
663   librados::ObjectWriteOperation op;
664   librbd::cls_client::mirror_instances_add(&op, m_instance_id);
665   librados::AioCompletion *aio_comp = create_rados_callback<
666     InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_instance>(this);
667
668   int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
669   assert(r == 0);
670   aio_comp->release();
671 }
672
673 template <typename I>
674 void InstanceWatcher<I>::handle_register_instance(int r) {
675   dout(20) << "r=" << r << dendl;
676
677   Context *on_finish = nullptr;
678   {
679     Mutex::Locker locker(m_lock);
680
681     if (r == 0) {
682       create_instance_object();
683       return;
684     }
685
686     derr << "error registering instance: " << cpp_strerror(r) << dendl;
687
688     std::swap(on_finish, m_on_finish);
689   }
690   on_finish->complete(r);
691 }
692
693
694 template <typename I>
695 void InstanceWatcher<I>::create_instance_object() {
696   dout(20) << dendl;
697
698   assert(m_lock.is_locked());
699
700   librados::ObjectWriteOperation op;
701   op.create(true);
702
703   librados::AioCompletion *aio_comp = create_rados_callback<
704     InstanceWatcher<I>,
705     &InstanceWatcher<I>::handle_create_instance_object>(this);
706   int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
707   assert(r == 0);
708   aio_comp->release();
709 }
710
711 template <typename I>
712 void InstanceWatcher<I>::handle_create_instance_object(int r) {
713   dout(20) << "r=" << r << dendl;
714
715   Mutex::Locker locker(m_lock);
716
717   if (r < 0) {
718     derr << "error creating " << m_oid << " object: " << cpp_strerror(r)
719          << dendl;
720
721     m_ret_val = r;
722     unregister_instance();
723     return;
724   }
725
726   register_watch();
727 }
728
729 template <typename I>
730 void InstanceWatcher<I>::register_watch() {
731   dout(20) << dendl;
732
733   assert(m_lock.is_locked());
734
735   Context *ctx = create_async_context_callback(
736     m_work_queue, create_context_callback<
737     InstanceWatcher<I>, &InstanceWatcher<I>::handle_register_watch>(this));
738
739   librbd::Watcher::register_watch(ctx);
740 }
741
742 template <typename I>
743 void InstanceWatcher<I>::handle_register_watch(int r) {
744   dout(20) << "r=" << r << dendl;
745
746   Mutex::Locker locker(m_lock);
747
748   if (r < 0) {
749     derr << "error registering instance watcher for " << m_oid << " object: "
750          << cpp_strerror(r) << dendl;
751
752     m_ret_val = r;
753     remove_instance_object();
754     return;
755   }
756
757   acquire_lock();
758 }
759
760 template <typename I>
761 void InstanceWatcher<I>::acquire_lock() {
762   dout(20) << dendl;
763
764   assert(m_lock.is_locked());
765
766   Context *ctx = create_async_context_callback(
767     m_work_queue, create_context_callback<
768     InstanceWatcher<I>, &InstanceWatcher<I>::handle_acquire_lock>(this));
769
770   m_instance_lock->acquire_lock(ctx);
771 }
772
773 template <typename I>
774 void InstanceWatcher<I>::handle_acquire_lock(int r) {
775   dout(20) << "r=" << r << dendl;
776
777   Context *on_finish = nullptr;
778   {
779     Mutex::Locker locker(m_lock);
780
781     if (r < 0) {
782
783       derr << "error acquiring instance lock: " << cpp_strerror(r) << dendl;
784
785       m_ret_val = r;
786       unregister_watch();
787       return;
788     }
789
790     std::swap(on_finish, m_on_finish);
791   }
792
793   on_finish->complete(r);
794 }
795
796 template <typename I>
797 void InstanceWatcher<I>::release_lock() {
798   dout(20) << dendl;
799
800   assert(m_lock.is_locked());
801
802   Context *ctx = create_async_context_callback(
803     m_work_queue, create_context_callback<
804     InstanceWatcher<I>, &InstanceWatcher<I>::handle_release_lock>(this));
805
806   m_instance_lock->shut_down(ctx);
807 }
808
809 template <typename I>
810 void InstanceWatcher<I>::handle_release_lock(int r) {
811   dout(20) << "r=" << r << dendl;
812
813   Mutex::Locker locker(m_lock);
814
815   if (r < 0) {
816     derr << "error releasing instance lock: " << cpp_strerror(r) << dendl;
817   }
818
819   unregister_watch();
820 }
821
822 template <typename I>
823 void InstanceWatcher<I>::unregister_watch() {
824   dout(20) << dendl;
825
826   assert(m_lock.is_locked());
827
828   Context *ctx = create_async_context_callback(
829     m_work_queue, create_context_callback<
830       InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_watch>(this));
831
832   librbd::Watcher::unregister_watch(ctx);
833 }
834
835 template <typename I>
836 void InstanceWatcher<I>::handle_unregister_watch(int r) {
837   dout(20) << "r=" << r << dendl;
838
839   if (r < 0) {
840     derr << "error unregistering instance watcher for " << m_oid << " object: "
841          << cpp_strerror(r) << dendl;
842   }
843
844   Mutex::Locker locker(m_lock);
845   remove_instance_object();
846 }
847
848 template <typename I>
849 void InstanceWatcher<I>::remove_instance_object() {
850   assert(m_lock.is_locked());
851
852   dout(20) << dendl;
853
854   librados::ObjectWriteOperation op;
855   op.remove();
856
857   librados::AioCompletion *aio_comp = create_rados_callback<
858     InstanceWatcher<I>,
859     &InstanceWatcher<I>::handle_remove_instance_object>(this);
860   int r = m_ioctx.aio_operate(m_oid, aio_comp, &op);
861   assert(r == 0);
862   aio_comp->release();
863 }
864
865 template <typename I>
866 void InstanceWatcher<I>::handle_remove_instance_object(int r) {
867   dout(20) << "r=" << r << dendl;
868
869   if (m_removing && r == -ENOENT) {
870     r = 0;
871   }
872
873   if (r < 0) {
874     derr << "error removing " << m_oid << " object: " << cpp_strerror(r)
875          << dendl;
876   }
877
878   Mutex::Locker locker(m_lock);
879   unregister_instance();
880 }
881
882 template <typename I>
883 void InstanceWatcher<I>::unregister_instance() {
884   dout(20) << dendl;
885
886   assert(m_lock.is_locked());
887
888   librados::ObjectWriteOperation op;
889   librbd::cls_client::mirror_instances_remove(&op, m_instance_id);
890   librados::AioCompletion *aio_comp = create_rados_callback<
891     InstanceWatcher<I>, &InstanceWatcher<I>::handle_unregister_instance>(this);
892
893   int r = m_ioctx.aio_operate(RBD_MIRROR_LEADER, aio_comp, &op);
894   assert(r == 0);
895   aio_comp->release();
896 }
897
898 template <typename I>
899 void InstanceWatcher<I>::handle_unregister_instance(int r) {
900   dout(20) << "r=" << r << dendl;
901
902   if (r < 0) {
903     derr << "error unregistering instance: " << cpp_strerror(r) << dendl;
904   }
905
906   Mutex::Locker locker(m_lock);
907   wait_for_notify_ops();
908 }
909
910 template <typename I>
911 void InstanceWatcher<I>::wait_for_notify_ops() {
912   dout(20) << dendl;
913
914   assert(m_lock.is_locked());
915
916   for (auto op : m_notify_ops) {
917     op.second->cancel();
918   }
919
920   Context *ctx = create_async_context_callback(
921     m_work_queue, create_context_callback<
922     InstanceWatcher<I>, &InstanceWatcher<I>::handle_wait_for_notify_ops>(this));
923
924   m_notify_op_tracker.wait_for_ops(ctx);
925 }
926
927 template <typename I>
928 void InstanceWatcher<I>::handle_wait_for_notify_ops(int r) {
929   dout(20) << "r=" << r << dendl;
930
931   assert(r == 0);
932
933   Context *on_finish = nullptr;
934   {
935     Mutex::Locker locker(m_lock);
936
937     assert(m_notify_ops.empty());
938
939     std::swap(on_finish, m_on_finish);
940     r = m_ret_val;
941
942     if (m_removing) {
943       m_removing = false;
944     }
945   }
946   on_finish->complete(r);
947 }
948
949 template <typename I>
950 void InstanceWatcher<I>::get_instance_locker() {
951   dout(20) << dendl;
952
953   assert(m_lock.is_locked());
954
955   Context *ctx = create_async_context_callback(
956     m_work_queue, create_context_callback<
957     InstanceWatcher<I>, &InstanceWatcher<I>::handle_get_instance_locker>(this));
958
959   m_instance_lock->get_locker(&m_instance_locker, ctx);
960 }
961
962 template <typename I>
963 void InstanceWatcher<I>::handle_get_instance_locker(int r) {
964   dout(20) << "r=" << r << dendl;
965
966   Mutex::Locker locker(m_lock);
967
968   if (r < 0) {
969     if (r != -ENOENT) {
970       derr << "error retrieving instance locker: " << cpp_strerror(r) << dendl;
971     }
972     remove_instance_object();
973     return;
974   }
975
976   break_instance_lock();
977 }
978
979 template <typename I>
980 void InstanceWatcher<I>::break_instance_lock() {
981   dout(20) << dendl;
982
983   assert(m_lock.is_locked());
984
985   Context *ctx = create_async_context_callback(
986     m_work_queue, create_context_callback<
987     InstanceWatcher<I>, &InstanceWatcher<I>::handle_break_instance_lock>(this));
988
989   m_instance_lock->break_lock(m_instance_locker, true, ctx);
990 }
991
992 template <typename I>
993 void InstanceWatcher<I>::handle_break_instance_lock(int r) {
994   dout(20) << "r=" << r << dendl;
995
996   Mutex::Locker locker(m_lock);
997
998   if (r < 0) {
999     if (r != -ENOENT) {
1000       derr << "error breaking instance lock: " << cpp_strerror(r) << dendl;
1001     }
1002     remove_instance_object();
1003     return;
1004   }
1005
1006   remove_instance_object();
1007 }
1008
1009 template <typename I>
1010 void InstanceWatcher<I>::suspend_notify_request(C_NotifyInstanceRequest *req) {
1011   dout(20) << req << dendl;
1012
1013   assert(m_lock.is_locked());
1014
1015   auto result = m_suspended_ops.insert(req).second;
1016   assert(result);
1017 }
1018
1019 template <typename I>
1020 bool InstanceWatcher<I>::unsuspend_notify_request(
1021   C_NotifyInstanceRequest *req) {
1022   dout(20) << req << dendl;
1023
1024   assert(m_lock.is_locked());
1025
1026   auto result = m_suspended_ops.erase(req);
1027   if (result == 0) {
1028     return false;
1029   }
1030
1031   req->send();
1032   return true;
1033 }
1034
1035 template <typename I>
1036 void InstanceWatcher<I>::unsuspend_notify_requests() {
1037   dout(20) << dendl;
1038
1039   assert(m_lock.is_locked());
1040
1041   std::set<C_NotifyInstanceRequest *> suspended_ops;
1042   std::swap(m_suspended_ops, suspended_ops);
1043
1044   for (auto op : suspended_ops) {
1045     op->send();
1046   }
1047 }
1048
1049 template <typename I>
1050 Context *InstanceWatcher<I>::prepare_request(const std::string &instance_id,
1051                                              uint64_t request_id,
1052                                              C_NotifyAck *on_notify_ack) {
1053   dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
1054            << dendl;
1055
1056   Mutex::Locker locker(m_lock);
1057
1058   Context *ctx = nullptr;
1059   Request request(instance_id, request_id);
1060   auto it = m_requests.find(request);
1061
1062   if (it != m_requests.end()) {
1063     dout(20) << "duplicate for in-progress request" << dendl;
1064     delete it->on_notify_ack;
1065     m_requests.erase(it);
1066   } else {
1067     ctx = create_async_context_callback(
1068         m_work_queue, new FunctionContext(
1069             [this, instance_id, request_id] (int r) {
1070               complete_request(instance_id, request_id, r);
1071             }));
1072   }
1073
1074   request.on_notify_ack = on_notify_ack;
1075   m_requests.insert(request);
1076   return ctx;
1077 }
1078
1079 template <typename I>
1080 void InstanceWatcher<I>::complete_request(const std::string &instance_id,
1081                                           uint64_t request_id, int r) {
1082   dout(20) << "instance_id=" << instance_id << ", request_id=" << request_id
1083            << dendl;
1084
1085   C_NotifyAck *on_notify_ack;
1086   {
1087     Mutex::Locker locker(m_lock);
1088     Request request(instance_id, request_id);
1089     auto it = m_requests.find(request);
1090     assert(it != m_requests.end());
1091     on_notify_ack = it->on_notify_ack;
1092     m_requests.erase(it);
1093   }
1094
1095   ::encode(NotifyAckPayload(instance_id, request_id, r), on_notify_ack->out);
1096   on_notify_ack->complete(0);
1097 }
1098
1099 template <typename I>
1100 void InstanceWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
1101                                        uint64_t notifier_id, bufferlist &bl) {
1102   dout(20) << "notify_id=" << notify_id << ", handle=" << handle << ", "
1103            << "notifier_id=" << notifier_id << dendl;
1104
1105   auto ctx = new C_NotifyAck(this, notify_id, handle);
1106
1107   NotifyMessage notify_message;
1108   try {
1109     bufferlist::iterator iter = bl.begin();
1110     ::decode(notify_message, iter);
1111   } catch (const buffer::error &err) {
1112     derr << "error decoding image notification: " << err.what() << dendl;
1113     ctx->complete(0);
1114     return;
1115   }
1116
1117   apply_visitor(HandlePayloadVisitor(this, stringify(notifier_id), ctx),
1118                 notify_message.payload);
1119 }
1120
1121 template <typename I>
1122 void InstanceWatcher<I>::handle_image_acquire(
1123     const std::string &global_image_id, Context *on_finish) {
1124   dout(20) << "global_image_id=" << global_image_id << dendl;
1125
1126   auto ctx = new FunctionContext(
1127       [this, global_image_id, on_finish] (int r) {
1128         m_instance_replayer->acquire_image(this, global_image_id, on_finish);
1129         m_notify_op_tracker.finish_op();
1130       });
1131
1132   m_notify_op_tracker.start_op();
1133   m_work_queue->queue(ctx, 0);
1134 }
1135
1136 template <typename I>
1137 void InstanceWatcher<I>::handle_image_release(
1138     const std::string &global_image_id, Context *on_finish) {
1139   dout(20) << "global_image_id=" << global_image_id << dendl;
1140
1141   auto ctx = new FunctionContext(
1142       [this, global_image_id, on_finish] (int r) {
1143         m_instance_replayer->release_image(global_image_id, on_finish);
1144         m_notify_op_tracker.finish_op();
1145       });
1146
1147   m_notify_op_tracker.start_op();
1148   m_work_queue->queue(ctx, 0);
1149 }
1150
1151 template <typename I>
1152 void InstanceWatcher<I>::handle_peer_image_removed(
1153     const std::string &global_image_id, const std::string &peer_mirror_uuid,
1154     Context *on_finish) {
1155   dout(20) << "global_image_id=" << global_image_id << ", "
1156            << "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
1157
1158   auto ctx = new FunctionContext(
1159       [this, peer_mirror_uuid, global_image_id, on_finish] (int r) {
1160         m_instance_replayer->remove_peer_image(global_image_id,
1161                                                peer_mirror_uuid, on_finish);
1162         m_notify_op_tracker.finish_op();
1163       });
1164
1165   m_notify_op_tracker.start_op();
1166   m_work_queue->queue(ctx, 0);
1167 }
1168
1169 template <typename I>
1170 void InstanceWatcher<I>::handle_sync_request(const std::string &instance_id,
1171                                              const std::string &sync_id,
1172                                              Context *on_finish) {
1173   dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
1174
1175   Mutex::Locker locker(m_lock);
1176
1177   if (m_image_sync_throttler == nullptr) {
1178     dout(20) << "sync request for non-leader" << dendl;
1179     m_work_queue->queue(on_finish, -ESTALE);
1180     return;
1181   }
1182
1183   Context *on_start = create_async_context_callback(
1184     m_work_queue, new FunctionContext(
1185       [this, instance_id, sync_id, on_finish] (int r) {
1186         dout(20) << "handle_sync_request: finish: instance_id=" << instance_id
1187                  << ", sync_id=" << sync_id << ", r=" << r << dendl;
1188         if (r == 0) {
1189           notify_sync_start(instance_id, sync_id);
1190         }
1191         on_finish->complete(r);
1192       }));
1193   m_image_sync_throttler->start_op(sync_id, on_start);
1194 }
1195
1196 template <typename I>
1197 void InstanceWatcher<I>::handle_sync_start(const std::string &instance_id,
1198                                            const std::string &sync_id,
1199                                            Context *on_finish) {
1200   dout(20) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl;
1201
1202   Mutex::Locker locker(m_lock);
1203
1204   auto it = m_inflight_sync_reqs.find(sync_id);
1205   if (it == m_inflight_sync_reqs.end()) {
1206     dout(20) << "not found" << dendl;
1207     m_work_queue->queue(on_finish, 0);
1208     return;
1209   }
1210
1211   auto sync_ctx = it->second;
1212
1213   if (sync_ctx->on_complete != nullptr) {
1214     dout(20) << "duplicate request" << dendl;
1215     m_work_queue->queue(sync_ctx->on_complete, -ESTALE);
1216   }
1217
1218   sync_ctx->on_complete = on_finish;
1219 }
1220
1221 template <typename I>
1222 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1223                                         const ImageAcquirePayload &payload,
1224                                         C_NotifyAck *on_notify_ack) {
1225   dout(20) << "image_acquire: instance_id=" << instance_id << ", "
1226            << "request_id=" << payload.request_id << dendl;
1227
1228   auto on_finish = prepare_request(instance_id, payload.request_id,
1229                                    on_notify_ack);
1230   if (on_finish != nullptr) {
1231     handle_image_acquire(payload.global_image_id, on_finish);
1232   }
1233 }
1234
1235 template <typename I>
1236 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1237                                         const ImageReleasePayload &payload,
1238                                         C_NotifyAck *on_notify_ack) {
1239   dout(20) << "image_release: instance_id=" << instance_id << ", "
1240            << "request_id=" << payload.request_id << dendl;
1241
1242   auto on_finish = prepare_request(instance_id, payload.request_id,
1243                                    on_notify_ack);
1244   if (on_finish != nullptr) {
1245     handle_image_release(payload.global_image_id, on_finish);
1246   }
1247 }
1248
1249 template <typename I>
1250 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1251                                         const PeerImageRemovedPayload &payload,
1252                                         C_NotifyAck *on_notify_ack) {
1253   dout(20) << "remove_peer_image: instance_id=" << instance_id << ", "
1254            << "request_id=" << payload.request_id << dendl;
1255
1256   auto on_finish = prepare_request(instance_id, payload.request_id,
1257                                    on_notify_ack);
1258   if (on_finish != nullptr) {
1259     handle_peer_image_removed(payload.global_image_id, payload.peer_mirror_uuid,
1260                               on_finish);
1261   }
1262 }
1263
1264 template <typename I>
1265 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1266                                         const SyncRequestPayload &payload,
1267                                         C_NotifyAck *on_notify_ack) {
1268   dout(20) << "sync_request: instance_id=" << instance_id << ", "
1269            << "request_id=" << payload.request_id << dendl;
1270
1271   auto on_finish = prepare_request(instance_id, payload.request_id,
1272                                    on_notify_ack);
1273   if (on_finish == nullptr) {
1274     return;
1275   }
1276
1277   handle_sync_request(instance_id, payload.sync_id, on_finish);
1278 }
1279
1280 template <typename I>
1281 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1282                                         const SyncStartPayload &payload,
1283                                         C_NotifyAck *on_notify_ack) {
1284   dout(20) << "sync_start: instance_id=" << instance_id << ", "
1285            << "request_id=" << payload.request_id << dendl;
1286
1287   auto on_finish = prepare_request(instance_id, payload.request_id,
1288                                    on_notify_ack);
1289   if (on_finish == nullptr) {
1290     return;
1291   }
1292
1293   handle_sync_start(instance_id, payload.sync_id, on_finish);
1294 }
1295
1296 template <typename I>
1297 void InstanceWatcher<I>::handle_payload(const std::string &instance_id,
1298                                         const UnknownPayload &payload,
1299                                         C_NotifyAck *on_notify_ack) {
1300   dout(20) << "unknown: instance_id=" << instance_id << dendl;
1301
1302   on_notify_ack->complete(0);
1303 }
1304
1305 } // namespace mirror
1306 } // namespace rbd
1307
1308 template class rbd::mirror::InstanceWatcher<librbd::ImageCtx>;