Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / rbd_mirror / test_mock_InstanceWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librados/AioCompletionImpl.h"
5 #include "librbd/ManagedLock.h"
6 #include "test/librados/test.h"
7 #include "test/librados_test_stub/MockTestMemIoCtxImpl.h"
8 #include "test/librados_test_stub/MockTestMemRadosClient.h"
9 #include "test/librbd/mock/MockImageCtx.h"
10 #include "test/rbd_mirror/test_mock_fixture.h"
11 #include "tools/rbd_mirror/InstanceReplayer.h"
12 #include "tools/rbd_mirror/ImageSyncThrottler.h"
13 #include "tools/rbd_mirror/InstanceWatcher.h"
14 #include "tools/rbd_mirror/Threads.h"
15
16 namespace librbd {
17
18 namespace {
19
20 struct MockTestImageCtx : public MockImageCtx {
21   MockTestImageCtx(librbd::ImageCtx &image_ctx)
22     : librbd::MockImageCtx(image_ctx) {
23   }
24 };
25
26 } // anonymous namespace
27
28 template <>
29 struct ManagedLock<MockTestImageCtx> {
30   static ManagedLock* s_instance;
31
32   static ManagedLock *create(librados::IoCtx& ioctx, ContextWQ *work_queue,
33                              const std::string& oid, librbd::Watcher *watcher,
34                              managed_lock::Mode  mode,
35                              bool blacklist_on_break_lock,
36                              uint32_t blacklist_expire_seconds) {
37     assert(s_instance != nullptr);
38     return s_instance;
39   }
40
41   ManagedLock() {
42     assert(s_instance == nullptr);
43     s_instance = this;
44   }
45
46   ~ManagedLock() {
47     assert(s_instance == this);
48     s_instance = nullptr;
49   }
50
51   MOCK_METHOD0(destroy, void());
52   MOCK_METHOD1(shut_down, void(Context *));
53   MOCK_METHOD1(acquire_lock, void(Context *));
54   MOCK_METHOD2(get_locker, void(managed_lock::Locker *, Context *));
55   MOCK_METHOD3(break_lock, void(const managed_lock::Locker &, bool, Context *));
56 };
57
58 ManagedLock<MockTestImageCtx> *ManagedLock<MockTestImageCtx>::s_instance = nullptr;
59
60 } // namespace librbd
61
62 namespace rbd {
63 namespace mirror {
64
65 template <>
66 struct Threads<librbd::MockTestImageCtx> {
67   Mutex &timer_lock;
68   SafeTimer *timer;
69   ContextWQ *work_queue;
70
71   Threads(Threads<librbd::ImageCtx> *threads)
72     : timer_lock(threads->timer_lock), timer(threads->timer),
73       work_queue(threads->work_queue) {
74   }
75 };
76
77 template <>
78 struct InstanceReplayer<librbd::MockTestImageCtx> {
79   MOCK_METHOD3(acquire_image, void(InstanceWatcher<librbd::MockTestImageCtx> *,
80                                    const std::string &, Context *));
81   MOCK_METHOD2(release_image, void(const std::string &, Context *));
82   MOCK_METHOD3(remove_peer_image, void(const std::string&, const std::string&,
83                                        Context *));
84 };
85
86 template <>
87 struct ImageSyncThrottler<librbd::MockTestImageCtx> {
88   static ImageSyncThrottler* s_instance;
89
90   static ImageSyncThrottler *create() {
91     assert(s_instance != nullptr);
92     return s_instance;
93   }
94
95   ImageSyncThrottler() {
96     assert(s_instance == nullptr);
97     s_instance = this;
98   }
99
100   virtual ~ImageSyncThrottler() {
101     assert(s_instance == this);
102     s_instance = nullptr;
103   }
104
105   MOCK_METHOD0(destroy, void());
106   MOCK_METHOD1(drain, void(int));
107   MOCK_METHOD2(start_op, void(const std::string &, Context *));
108   MOCK_METHOD1(finish_op, void(const std::string &));
109 };
110
111 ImageSyncThrottler<librbd::MockTestImageCtx>* ImageSyncThrottler<librbd::MockTestImageCtx>::s_instance = nullptr;
112
113 } // namespace mirror
114 } // namespace rbd
115
116 // template definitions
117 #include "tools/rbd_mirror/InstanceWatcher.cc"
118
119 namespace rbd {
120 namespace mirror {
121
122 using ::testing::_;
123 using ::testing::InSequence;
124 using ::testing::Invoke;
125 using ::testing::Return;
126 using ::testing::StrEq;
127 using ::testing::WithArg;
128
129 class TestMockInstanceWatcher : public TestMockFixture {
130 public:
131   typedef librbd::ManagedLock<librbd::MockTestImageCtx> MockManagedLock;
132   typedef InstanceReplayer<librbd::MockTestImageCtx> MockInstanceReplayer;
133   typedef InstanceWatcher<librbd::MockTestImageCtx> MockInstanceWatcher;
134   typedef Threads<librbd::MockTestImageCtx> MockThreads;
135
136   std::string m_instance_id;
137   std::string m_oid;
138   MockThreads *m_mock_threads;
139
140   void SetUp() override {
141     TestFixture::SetUp();
142     m_local_io_ctx.remove(RBD_MIRROR_LEADER);
143     EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true));
144
145     m_instance_id = stringify(m_local_io_ctx.get_instance_id());
146     m_oid = RBD_MIRROR_INSTANCE_PREFIX + m_instance_id;
147
148     m_mock_threads = new MockThreads(m_threads);
149   }
150
151   void TearDown() override {
152     delete m_mock_threads;
153     TestMockFixture::TearDown();
154   }
155
156   void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) {
157     EXPECT_CALL(mock_io_ctx, aio_watch(m_oid, _, _, _));
158   }
159
160   void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx,
161                              const std::string &instance_id) {
162     std::string oid = RBD_MIRROR_INSTANCE_PREFIX + instance_id;
163     EXPECT_CALL(mock_io_ctx, aio_watch(oid, _, _, _));
164   }
165
166   void expect_unregister_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) {
167     EXPECT_CALL(mock_io_ctx, aio_unwatch(_, _));
168   }
169
170   void expect_register_instance(librados::MockTestMemIoCtxImpl &mock_io_ctx,
171                                 int r) {
172     EXPECT_CALL(mock_io_ctx, exec(RBD_MIRROR_LEADER, _, StrEq("rbd"),
173                                   StrEq("mirror_instances_add"), _, _, _))
174       .WillOnce(Return(r));
175   }
176
177   void expect_unregister_instance(librados::MockTestMemIoCtxImpl &mock_io_ctx,
178                                   int r) {
179     EXPECT_CALL(mock_io_ctx, exec(RBD_MIRROR_LEADER, _, StrEq("rbd"),
180                                   StrEq("mirror_instances_remove"), _, _, _))
181       .WillOnce(Return(r));
182   }
183
184   void expect_acquire_lock(MockManagedLock &mock_managed_lock, int r) {
185     EXPECT_CALL(mock_managed_lock, acquire_lock(_))
186       .WillOnce(CompleteContext(r));
187   }
188
189   void expect_release_lock(MockManagedLock &mock_managed_lock, int r) {
190     EXPECT_CALL(mock_managed_lock, shut_down(_)).WillOnce(CompleteContext(r));
191   }
192
193   void expect_destroy_lock(MockManagedLock &mock_managed_lock,
194                            Context *ctx = nullptr) {
195     EXPECT_CALL(mock_managed_lock, destroy())
196       .WillOnce(Invoke([ctx]() {
197             if (ctx != nullptr) {
198               ctx->complete(0);
199             }
200           }));
201   }
202
203   void expect_get_locker(MockManagedLock &mock_managed_lock,
204                          const librbd::managed_lock::Locker &locker, int r) {
205     EXPECT_CALL(mock_managed_lock, get_locker(_, _))
206       .WillOnce(Invoke([r, locker](librbd::managed_lock::Locker *out,
207                                    Context *ctx) {
208                          if (r == 0) {
209                            *out = locker;
210                          }
211                          ctx->complete(r);
212                        }));
213   }
214
215   void expect_break_lock(MockManagedLock &mock_managed_lock,
216                          const librbd::managed_lock::Locker &locker, int r) {
217     EXPECT_CALL(mock_managed_lock, break_lock(locker, true, _))
218       .WillOnce(WithArg<2>(CompleteContext(r)));
219   }
220 };
221
222 TEST_F(TestMockInstanceWatcher, InitShutdown) {
223   MockManagedLock mock_managed_lock;
224   librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
225
226   auto instance_watcher = new MockInstanceWatcher(
227     m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
228   InSequence seq;
229
230   // Init
231   expect_register_instance(mock_io_ctx, 0);
232   expect_register_watch(mock_io_ctx);
233   expect_acquire_lock(mock_managed_lock, 0);
234   ASSERT_EQ(0, instance_watcher->init());
235
236   // Shutdown
237   expect_release_lock(mock_managed_lock, 0);
238   expect_unregister_watch(mock_io_ctx);
239   expect_unregister_instance(mock_io_ctx, 0);
240   instance_watcher->shut_down();
241
242   expect_destroy_lock(mock_managed_lock);
243   delete instance_watcher;
244 }
245
246 TEST_F(TestMockInstanceWatcher, InitError) {
247   MockManagedLock mock_managed_lock;
248   librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
249
250   auto instance_watcher = new MockInstanceWatcher(
251     m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
252   InSequence seq;
253
254   expect_register_instance(mock_io_ctx, 0);
255   expect_register_watch(mock_io_ctx);
256   expect_acquire_lock(mock_managed_lock, -EINVAL);
257   expect_unregister_watch(mock_io_ctx);
258   expect_unregister_instance(mock_io_ctx, 0);
259
260   ASSERT_EQ(-EINVAL, instance_watcher->init());
261
262   expect_destroy_lock(mock_managed_lock);
263   delete instance_watcher;
264 }
265
266 TEST_F(TestMockInstanceWatcher, ShutdownError) {
267   MockManagedLock mock_managed_lock;
268   librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
269
270   auto instance_watcher = new MockInstanceWatcher(
271     m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
272   InSequence seq;
273
274   // Init
275   expect_register_instance(mock_io_ctx, 0);
276   expect_register_watch(mock_io_ctx);
277   expect_acquire_lock(mock_managed_lock, 0);
278   ASSERT_EQ(0, instance_watcher->init());
279
280   // Shutdown
281   expect_release_lock(mock_managed_lock, -EINVAL);
282   expect_unregister_watch(mock_io_ctx);
283   expect_unregister_instance(mock_io_ctx, 0);
284   instance_watcher->shut_down();
285
286   expect_destroy_lock(mock_managed_lock);
287   delete instance_watcher;
288 }
289
290
291 TEST_F(TestMockInstanceWatcher, Remove) {
292   MockManagedLock mock_managed_lock;
293   librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
294   librbd::managed_lock::Locker
295     locker{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123};
296
297   InSequence seq;
298
299   expect_get_locker(mock_managed_lock, locker, 0);
300   expect_break_lock(mock_managed_lock, locker, 0);
301   expect_unregister_instance(mock_io_ctx, 0);
302   C_SaferCond on_destroy;
303   expect_destroy_lock(mock_managed_lock, &on_destroy);
304
305   C_SaferCond on_remove;
306   MockInstanceWatcher::remove_instance(m_local_io_ctx,
307                                        m_mock_threads->work_queue,
308                                        "instance_id", &on_remove);
309   ASSERT_EQ(0, on_remove.wait());
310   ASSERT_EQ(0, on_destroy.wait());
311 }
312
313 TEST_F(TestMockInstanceWatcher, RemoveNoent) {
314   MockManagedLock mock_managed_lock;
315   librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
316
317   InSequence seq;
318
319   expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), -ENOENT);
320   expect_unregister_instance(mock_io_ctx, 0);
321   C_SaferCond on_destroy;
322   expect_destroy_lock(mock_managed_lock, &on_destroy);
323
324   C_SaferCond on_remove;
325   MockInstanceWatcher::remove_instance(m_local_io_ctx,
326                                        m_mock_threads->work_queue,
327                                        "instance_id", &on_remove);
328   ASSERT_EQ(0, on_remove.wait());
329   ASSERT_EQ(0, on_destroy.wait());
330 }
331
332 TEST_F(TestMockInstanceWatcher, ImageAcquireRelease) {
333   MockManagedLock mock_managed_lock;
334
335   librados::IoCtx& io_ctx1 = m_local_io_ctx;
336   std::string instance_id1 = m_instance_id;
337   librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
338   MockInstanceReplayer mock_instance_replayer1;
339   auto instance_watcher1 = MockInstanceWatcher::create(
340       io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1);
341
342   librados::Rados cluster;
343   librados::IoCtx io_ctx2;
344   EXPECT_EQ("", connect_cluster_pp(cluster));
345   EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
346   std::string instance_id2 = stringify(io_ctx2.get_instance_id());
347   librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
348   MockInstanceReplayer mock_instance_replayer2;
349   auto instance_watcher2 = MockInstanceWatcher::create(
350     io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2);
351
352   InSequence seq;
353
354   // Init instance watcher 1
355   expect_register_instance(mock_io_ctx1, 0);
356   expect_register_watch(mock_io_ctx1, instance_id1);
357   expect_acquire_lock(mock_managed_lock, 0);
358   ASSERT_EQ(0, instance_watcher1->init());
359
360   // Init instance watcher 2
361   expect_register_instance(mock_io_ctx2, 0);
362   expect_register_watch(mock_io_ctx2, instance_id2);
363   expect_acquire_lock(mock_managed_lock, 0);
364   ASSERT_EQ(0, instance_watcher2->init());
365
366   // Acquire Image on the the same instance
367   EXPECT_CALL(mock_instance_replayer1, acquire_image(instance_watcher1, "gid",
368                                                      _))
369       .WillOnce(WithArg<2>(CompleteContext(0)));
370   C_SaferCond on_acquire1;
371   instance_watcher1->notify_image_acquire(instance_id1, "gid", &on_acquire1);
372   ASSERT_EQ(0, on_acquire1.wait());
373
374   // Acquire Image on the other instance
375   EXPECT_CALL(mock_instance_replayer2, acquire_image(instance_watcher2, "gid",
376                                                      _))
377       .WillOnce(WithArg<2>(CompleteContext(0)));
378   C_SaferCond on_acquire2;
379   instance_watcher1->notify_image_acquire(instance_id2, "gid", &on_acquire2);
380   ASSERT_EQ(0, on_acquire2.wait());
381
382   // Release Image on the the same instance
383   EXPECT_CALL(mock_instance_replayer1, release_image("gid", _))
384       .WillOnce(WithArg<1>(CompleteContext(0)));
385   C_SaferCond on_release1;
386   instance_watcher1->notify_image_release(instance_id1, "gid", &on_release1);
387   ASSERT_EQ(0, on_release1.wait());
388
389   // Release Image on the other instance
390   EXPECT_CALL(mock_instance_replayer2, release_image("gid", _))
391       .WillOnce(WithArg<1>(CompleteContext(0)));
392   C_SaferCond on_release2;
393   instance_watcher1->notify_image_release(instance_id2, "gid", &on_release2);
394   ASSERT_EQ(0, on_release2.wait());
395
396   // Shutdown instance watcher 1
397   expect_release_lock(mock_managed_lock, 0);
398   expect_unregister_watch(mock_io_ctx1);
399   expect_unregister_instance(mock_io_ctx1, 0);
400   instance_watcher1->shut_down();
401
402   expect_destroy_lock(mock_managed_lock);
403   delete instance_watcher1;
404
405   // Shutdown instance watcher 2
406   expect_release_lock(mock_managed_lock, 0);
407   expect_unregister_watch(mock_io_ctx2);
408   expect_unregister_instance(mock_io_ctx2, 0);
409   instance_watcher2->shut_down();
410
411   expect_destroy_lock(mock_managed_lock);
412   delete instance_watcher2;
413 }
414
415 TEST_F(TestMockInstanceWatcher, PeerImageRemoved) {
416   MockManagedLock mock_managed_lock;
417
418   librados::IoCtx& io_ctx1 = m_local_io_ctx;
419   std::string instance_id1 = m_instance_id;
420   librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
421   MockInstanceReplayer mock_instance_replayer1;
422   auto instance_watcher1 = MockInstanceWatcher::create(
423       io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1);
424
425   librados::Rados cluster;
426   librados::IoCtx io_ctx2;
427   EXPECT_EQ("", connect_cluster_pp(cluster));
428   EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
429   std::string instance_id2 = stringify(io_ctx2.get_instance_id());
430   librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
431   MockInstanceReplayer mock_instance_replayer2;
432   auto instance_watcher2 = MockInstanceWatcher::create(
433     io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2);
434
435   InSequence seq;
436
437   // Init instance watcher 1
438   expect_register_instance(mock_io_ctx1, 0);
439   expect_register_watch(mock_io_ctx1, instance_id1);
440   expect_acquire_lock(mock_managed_lock, 0);
441   ASSERT_EQ(0, instance_watcher1->init());
442
443   // Init instance watcher 2
444   expect_register_instance(mock_io_ctx2, 0);
445   expect_register_watch(mock_io_ctx2, instance_id2);
446   expect_acquire_lock(mock_managed_lock, 0);
447   ASSERT_EQ(0, instance_watcher2->init());
448
449   // Peer Image Removed on the same instance
450   EXPECT_CALL(mock_instance_replayer1, remove_peer_image("gid", "uuid", _))
451       .WillOnce(WithArg<2>(CompleteContext(0)));
452   C_SaferCond on_removed1;
453   instance_watcher1->notify_peer_image_removed(instance_id1, "gid", "uuid",
454                                                &on_removed1);
455   ASSERT_EQ(0, on_removed1.wait());
456
457   // Peer Image Removed on the other instance
458   EXPECT_CALL(mock_instance_replayer2, remove_peer_image("gid", "uuid", _))
459       .WillOnce(WithArg<2>(CompleteContext(0)));
460   C_SaferCond on_removed2;
461   instance_watcher1->notify_peer_image_removed(instance_id2, "gid", "uuid",
462                                                &on_removed2);
463   ASSERT_EQ(0, on_removed2.wait());
464
465   // Shutdown instance watcher 1
466   expect_release_lock(mock_managed_lock, 0);
467   expect_unregister_watch(mock_io_ctx1);
468   expect_unregister_instance(mock_io_ctx1, 0);
469   instance_watcher1->shut_down();
470
471   expect_destroy_lock(mock_managed_lock);
472   delete instance_watcher1;
473
474   // Shutdown instance watcher 2
475   expect_release_lock(mock_managed_lock, 0);
476   expect_unregister_watch(mock_io_ctx2);
477   expect_unregister_instance(mock_io_ctx2, 0);
478   instance_watcher2->shut_down();
479
480   expect_destroy_lock(mock_managed_lock);
481   delete instance_watcher2;
482 }
483
484 TEST_F(TestMockInstanceWatcher, ImageAcquireReleaseCancel) {
485   MockManagedLock mock_managed_lock;
486   librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
487
488   auto instance_watcher = new MockInstanceWatcher(
489     m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
490   InSequence seq;
491
492   // Init
493   expect_register_instance(mock_io_ctx, 0);
494   expect_register_watch(mock_io_ctx);
495   expect_acquire_lock(mock_managed_lock, 0);
496   ASSERT_EQ(0, instance_watcher->init());
497
498   // Send Acquire Image and cancel
499   EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _))
500     .WillOnce(Invoke(
501                   [this, instance_watcher, &mock_io_ctx](
502                     const std::string& o, librados::AioCompletionImpl *c,
503                     bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
504                     c->get();
505                     auto ctx = new FunctionContext(
506                       [instance_watcher, &mock_io_ctx, c, pbl](int r) {
507                         instance_watcher->cancel_notify_requests("other");
508                         ::encode(librbd::watcher::NotifyResponse(), *pbl);
509                         mock_io_ctx.get_mock_rados_client()->
510                             finish_aio_completion(c, -ETIMEDOUT);
511                       });
512                     m_threads->work_queue->queue(ctx, 0);
513                   }));
514
515   C_SaferCond on_acquire;
516   instance_watcher->notify_image_acquire("other", "gid", &on_acquire);
517   ASSERT_EQ(-ECANCELED, on_acquire.wait());
518
519   // Send Release Image and cancel
520   EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _))
521     .WillOnce(Invoke(
522                   [this, instance_watcher, &mock_io_ctx](
523                     const std::string& o, librados::AioCompletionImpl *c,
524                     bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
525                     c->get();
526                     auto ctx = new FunctionContext(
527                       [instance_watcher, &mock_io_ctx, c, pbl](int r) {
528                         instance_watcher->cancel_notify_requests("other");
529                         ::encode(librbd::watcher::NotifyResponse(), *pbl);
530                         mock_io_ctx.get_mock_rados_client()->
531                             finish_aio_completion(c, -ETIMEDOUT);
532                       });
533                     m_threads->work_queue->queue(ctx, 0);
534                   }));
535
536   C_SaferCond on_release;
537   instance_watcher->notify_image_release("other", "gid", &on_release);
538   ASSERT_EQ(-ECANCELED, on_release.wait());
539
540   // Shutdown
541   expect_release_lock(mock_managed_lock, 0);
542   expect_unregister_watch(mock_io_ctx);
543   expect_unregister_instance(mock_io_ctx, 0);
544   instance_watcher->shut_down();
545
546   expect_destroy_lock(mock_managed_lock);
547   delete instance_watcher;
548 }
549
550 TEST_F(TestMockInstanceWatcher, PeerImageRemovedCancel) {
551   MockManagedLock mock_managed_lock;
552   librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx));
553
554   auto instance_watcher = new MockInstanceWatcher(
555     m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id);
556   InSequence seq;
557
558   // Init
559   expect_register_instance(mock_io_ctx, 0);
560   expect_register_watch(mock_io_ctx);
561   expect_acquire_lock(mock_managed_lock, 0);
562   ASSERT_EQ(0, instance_watcher->init());
563
564   // Send Acquire Image and cancel
565   EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _))
566     .WillOnce(Invoke(
567                   [this, instance_watcher, &mock_io_ctx](
568                     const std::string& o, librados::AioCompletionImpl *c,
569                     bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
570                     c->get();
571                     auto ctx = new FunctionContext(
572                       [instance_watcher, &mock_io_ctx, c, pbl](int r) {
573                         instance_watcher->cancel_notify_requests("other");
574                         ::encode(librbd::watcher::NotifyResponse(), *pbl);
575                         mock_io_ctx.get_mock_rados_client()->
576                             finish_aio_completion(c, -ETIMEDOUT);
577                       });
578                     m_threads->work_queue->queue(ctx, 0);
579                   }));
580
581   C_SaferCond on_acquire;
582   instance_watcher->notify_peer_image_removed("other", "gid", "uuid",
583                                               &on_acquire);
584   ASSERT_EQ(-ECANCELED, on_acquire.wait());
585
586   // Shutdown
587   expect_release_lock(mock_managed_lock, 0);
588   expect_unregister_watch(mock_io_ctx);
589   expect_unregister_instance(mock_io_ctx, 0);
590   instance_watcher->shut_down();
591
592   expect_destroy_lock(mock_managed_lock);
593   delete instance_watcher;
594 }
595
596
597 class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher {
598 public:
599   typedef ImageSyncThrottler<librbd::MockTestImageCtx> MockImageSyncThrottler;
600
601   MockManagedLock mock_managed_lock;
602   MockImageSyncThrottler mock_image_sync_throttler;
603   std::string instance_id1;
604   std::string instance_id2;
605
606   librados::Rados cluster;
607   librados::IoCtx io_ctx2;
608
609   MockInstanceWatcher *instance_watcher1;
610   MockInstanceWatcher *instance_watcher2;
611
612   void SetUp() override {
613     TestMockInstanceWatcher::SetUp();
614
615     instance_id1 = m_instance_id;
616     librados::IoCtx& io_ctx1 = m_local_io_ctx;
617     librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
618     instance_watcher1 = MockInstanceWatcher::create(io_ctx1,
619                                                     m_mock_threads->work_queue,
620                                                     nullptr);
621     EXPECT_EQ("", connect_cluster_pp(cluster));
622     EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2));
623     instance_id2 = stringify(io_ctx2.get_instance_id());
624     librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
625     instance_watcher2 = MockInstanceWatcher::create(io_ctx2,
626                                                     m_mock_threads->work_queue,
627                                                     nullptr);
628     InSequence seq;
629
630     // Init instance watcher 1 (leader)
631     expect_register_instance(mock_io_ctx1, 0);
632     expect_register_watch(mock_io_ctx1, instance_id1);
633     expect_acquire_lock(mock_managed_lock, 0);
634     EXPECT_EQ(0, instance_watcher1->init());
635     instance_watcher1->handle_acquire_leader();
636
637     // Init instance watcher 2
638     expect_register_instance(mock_io_ctx2, 0);
639     expect_register_watch(mock_io_ctx2, instance_id2);
640     expect_acquire_lock(mock_managed_lock, 0);
641     EXPECT_EQ(0, instance_watcher2->init());
642     instance_watcher2->handle_update_leader(instance_id1);
643   }
644
645   void TearDown() override {
646     librados::IoCtx& io_ctx1 = m_local_io_ctx;
647     librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1));
648     librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2));
649
650     InSequence seq;
651
652     expect_throttler_destroy();
653     instance_watcher1->handle_release_leader();
654
655     // Shutdown instance watcher 1
656     expect_release_lock(mock_managed_lock, 0);
657     expect_unregister_watch(mock_io_ctx1);
658     expect_unregister_instance(mock_io_ctx1, 0);
659     instance_watcher1->shut_down();
660
661     expect_destroy_lock(mock_managed_lock);
662     delete instance_watcher1;
663
664     // Shutdown instance watcher 2
665     expect_release_lock(mock_managed_lock, 0);
666     expect_unregister_watch(mock_io_ctx2);
667     expect_unregister_instance(mock_io_ctx2, 0);
668     instance_watcher2->shut_down();
669
670     expect_destroy_lock(mock_managed_lock);
671     delete instance_watcher2;
672
673     TestMockInstanceWatcher::TearDown();
674   }
675
676   void expect_throttler_destroy(
677       std::vector<Context *> *throttler_queue = nullptr) {
678     EXPECT_CALL(mock_image_sync_throttler, drain(-ESTALE))
679         .WillOnce(Invoke([throttler_queue] (int r) {
680               if (throttler_queue != nullptr) {
681                 for (auto ctx : *throttler_queue) {
682                   ctx->complete(r);
683                 }
684               }
685             }));
686     EXPECT_CALL(mock_image_sync_throttler, destroy());
687   }
688
689   void expect_throttler_start_op(const std::string &sync_id,
690                                  Context *on_call = nullptr,
691                                  Context **on_start_ctx = nullptr) {
692     EXPECT_CALL(mock_image_sync_throttler, start_op(sync_id, _))
693         .WillOnce(Invoke([on_call, on_start_ctx] (const std::string &,
694                                                   Context *ctx) {
695                            if (on_call != nullptr) {
696                              on_call->complete(0);
697                            }
698                            if (on_start_ctx != nullptr) {
699                              *on_start_ctx = ctx;
700                            } else {
701                              ctx->complete(0);
702                            }
703                          }));
704   }
705
706   void expect_throttler_finish_op(const std::string &sync_id,
707                                   Context *on_finish) {
708     EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
709         .WillOnce(Invoke([on_finish](const std::string &) {
710               on_finish->complete(0);
711             }));
712   }
713 };
714
715 TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) {
716   InSequence seq;
717
718   expect_throttler_start_op("sync_id");
719   C_SaferCond on_start;
720   instance_watcher1->notify_sync_request("sync_id", &on_start);
721   ASSERT_EQ(0, on_start.wait());
722
723   C_SaferCond on_finish;
724   expect_throttler_finish_op("sync_id", &on_finish);
725   instance_watcher1->notify_sync_complete("sync_id");
726   ASSERT_EQ(0, on_finish.wait());
727 }
728
729 TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnLeader) {
730   InSequence seq;
731
732   expect_throttler_start_op("sync_id");
733   C_SaferCond on_start;
734   instance_watcher1->notify_sync_request("sync_id", &on_start);
735   ASSERT_EQ(0, on_start.wait());
736
737   ASSERT_FALSE(instance_watcher1->cancel_sync_request("sync_id"));
738
739   C_SaferCond on_finish;
740   expect_throttler_finish_op("sync_id", &on_finish);
741   instance_watcher1->notify_sync_complete("sync_id");
742   ASSERT_EQ(0, on_finish.wait());
743 }
744
745 TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnNonLeader) {
746   InSequence seq;
747
748   expect_throttler_start_op("sync_id");
749   C_SaferCond on_start;
750   instance_watcher2->notify_sync_request("sync_id", &on_start);
751   ASSERT_EQ(0, on_start.wait());
752
753   C_SaferCond on_finish;
754   expect_throttler_finish_op("sync_id", &on_finish);
755   instance_watcher2->notify_sync_complete("sync_id");
756   ASSERT_EQ(0, on_finish.wait());
757 }
758
759 TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnNonLeader) {
760   InSequence seq;
761
762   expect_throttler_start_op("sync_id");
763   C_SaferCond on_start;
764   instance_watcher2->notify_sync_request("sync_id", &on_start);
765   ASSERT_EQ(0, on_start.wait());
766
767   ASSERT_FALSE(instance_watcher2->cancel_sync_request("sync_id"));
768
769   C_SaferCond on_finish;
770   expect_throttler_finish_op("sync_id", &on_finish);
771   instance_watcher2->notify_sync_complete("sync_id");
772   ASSERT_EQ(0, on_finish.wait());
773 }
774
775 TEST_F(TestMockInstanceWatcher_NotifySync, CancelWaitingOnNonLeader) {
776   InSequence seq;
777
778   C_SaferCond on_start_op_called;
779   Context *on_start_ctx;
780   expect_throttler_start_op("sync_id", &on_start_op_called,
781                                           &on_start_ctx);
782   C_SaferCond on_start;
783   instance_watcher2->notify_sync_request("sync_id", &on_start);
784   ASSERT_EQ(0, on_start_op_called.wait());
785
786   ASSERT_TRUE(instance_watcher2->cancel_sync_request("sync_id"));
787   // emulate watcher timeout
788   on_start_ctx->complete(-ETIMEDOUT);
789   ASSERT_EQ(-ECANCELED, on_start.wait());
790 }
791
792 TEST_F(TestMockInstanceWatcher_NotifySync, InFlightPrevNotification) {
793   // start sync when previous notification is still in flight
794
795   InSequence seq;
796
797   expect_throttler_start_op("sync_id");
798   C_SaferCond on_start1;
799   instance_watcher2->notify_sync_request("sync_id", &on_start1);
800   ASSERT_EQ(0, on_start1.wait());
801
802   C_SaferCond on_start2;
803   EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id"))
804       .WillOnce(Invoke([this, &on_start2](const std::string &) {
805             instance_watcher2->notify_sync_request("sync_id", &on_start2);
806           }));
807   expect_throttler_start_op("sync_id");
808   instance_watcher2->notify_sync_complete("sync_id");
809
810   ASSERT_EQ(0, on_start2.wait());
811   C_SaferCond on_finish;
812   expect_throttler_finish_op("sync_id", &on_finish);
813   instance_watcher2->notify_sync_complete("sync_id");
814   ASSERT_EQ(0, on_finish.wait());
815 }
816
817 TEST_F(TestMockInstanceWatcher_NotifySync, NoInFlightReleaseAcquireLeader) {
818   InSequence seq;
819
820   expect_throttler_destroy();
821   instance_watcher1->handle_release_leader();
822   instance_watcher1->handle_acquire_leader();
823 }
824
825 TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnLeaderReleaseLeader) {
826   InSequence seq;
827
828   expect_throttler_destroy();
829   instance_watcher1->handle_release_leader();
830   instance_watcher2->handle_acquire_leader();
831
832   expect_throttler_start_op("sync_id");
833   C_SaferCond on_start;
834   instance_watcher2->notify_sync_request("sync_id", &on_start);
835   ASSERT_EQ(0, on_start.wait());
836   expect_throttler_destroy();
837   instance_watcher2->handle_release_leader();
838   instance_watcher2->notify_sync_complete("sync_id");
839
840   instance_watcher1->handle_acquire_leader();
841 }
842
843 TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnLeaderReleaseLeader) {
844   InSequence seq;
845
846   C_SaferCond on_start_op_called;
847   Context *on_start_ctx;
848   expect_throttler_start_op("sync_id", &on_start_op_called,
849                                           &on_start_ctx);
850   C_SaferCond on_start;
851   instance_watcher1->notify_sync_request("sync_id", &on_start);
852   ASSERT_EQ(0, on_start_op_called.wait());
853
854   std::vector<Context *> throttler_queue = {on_start_ctx};
855   expect_throttler_destroy(&throttler_queue);
856   instance_watcher1->handle_release_leader();
857   instance_watcher2->handle_acquire_leader();
858   instance_watcher1->handle_update_leader(instance_id2);
859
860   expect_throttler_start_op("sync_id");
861   ASSERT_EQ(0, on_start.wait());
862   C_SaferCond on_finish;
863   expect_throttler_finish_op("sync_id", &on_finish);
864   instance_watcher1->notify_sync_complete("sync_id");
865   ASSERT_EQ(0, on_finish.wait());
866
867   expect_throttler_destroy();
868   instance_watcher2->handle_release_leader();
869   instance_watcher1->handle_acquire_leader();
870 }
871
872 TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnNonLeaderAcquireLeader) {
873   InSequence seq;
874
875   expect_throttler_destroy();
876   instance_watcher1->handle_release_leader();
877   instance_watcher2->handle_acquire_leader();
878   instance_watcher1->handle_update_leader(instance_id2);
879
880   expect_throttler_start_op("sync_id");
881   C_SaferCond on_start;
882   instance_watcher1->notify_sync_request("sync_id", &on_start);
883   ASSERT_EQ(0, on_start.wait());
884
885   expect_throttler_destroy();
886   instance_watcher2->handle_release_leader();
887   instance_watcher1->handle_acquire_leader();
888   instance_watcher2->handle_update_leader(instance_id2);
889
890   instance_watcher1->notify_sync_complete("sync_id");
891 }
892
893 TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnNonLeaderAcquireLeader) {
894   InSequence seq;
895
896   C_SaferCond on_start_op_called;
897   Context *on_start_ctx;
898   expect_throttler_start_op("sync_id", &on_start_op_called,
899                                           &on_start_ctx);
900   C_SaferCond on_start;
901   instance_watcher2->notify_sync_request("sync_id", &on_start);
902   ASSERT_EQ(0, on_start_op_called.wait());
903
904   std::vector<Context *> throttler_queue = {on_start_ctx};
905   expect_throttler_destroy(&throttler_queue);
906   instance_watcher1->handle_release_leader();
907
908   EXPECT_CALL(mock_image_sync_throttler, start_op("sync_id", _))
909       .WillOnce(WithArg<1>(CompleteContext(0)));
910   instance_watcher2->handle_acquire_leader();
911   instance_watcher1->handle_update_leader(instance_id2);
912
913   ASSERT_EQ(0, on_start.wait());
914
915   C_SaferCond on_finish;
916   expect_throttler_finish_op("sync_id", &on_finish);
917   instance_watcher2->notify_sync_complete("sync_id");
918   ASSERT_EQ(0, on_finish.wait());
919
920   expect_throttler_destroy();
921   instance_watcher2->handle_release_leader();
922   instance_watcher1->handle_acquire_leader();
923 }
924
925 } // namespace mirror
926 } // namespace rbd