// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "librados/AioCompletionImpl.h" #include "librbd/ManagedLock.h" #include "test/librados/test.h" #include "test/librados_test_stub/MockTestMemIoCtxImpl.h" #include "test/librados_test_stub/MockTestMemRadosClient.h" #include "test/librbd/mock/MockImageCtx.h" #include "test/rbd_mirror/test_mock_fixture.h" #include "tools/rbd_mirror/InstanceReplayer.h" #include "tools/rbd_mirror/ImageSyncThrottler.h" #include "tools/rbd_mirror/InstanceWatcher.h" #include "tools/rbd_mirror/Threads.h" namespace librbd { namespace { struct MockTestImageCtx : public MockImageCtx { MockTestImageCtx(librbd::ImageCtx &image_ctx) : librbd::MockImageCtx(image_ctx) { } }; } // anonymous namespace template <> struct ManagedLock { static ManagedLock* s_instance; static ManagedLock *create(librados::IoCtx& ioctx, ContextWQ *work_queue, const std::string& oid, librbd::Watcher *watcher, managed_lock::Mode mode, bool blacklist_on_break_lock, uint32_t blacklist_expire_seconds) { assert(s_instance != nullptr); return s_instance; } ManagedLock() { assert(s_instance == nullptr); s_instance = this; } ~ManagedLock() { assert(s_instance == this); s_instance = nullptr; } MOCK_METHOD0(destroy, void()); MOCK_METHOD1(shut_down, void(Context *)); MOCK_METHOD1(acquire_lock, void(Context *)); MOCK_METHOD2(get_locker, void(managed_lock::Locker *, Context *)); MOCK_METHOD3(break_lock, void(const managed_lock::Locker &, bool, Context *)); }; ManagedLock *ManagedLock::s_instance = nullptr; } // namespace librbd namespace rbd { namespace mirror { template <> struct Threads { Mutex &timer_lock; SafeTimer *timer; ContextWQ *work_queue; Threads(Threads *threads) : timer_lock(threads->timer_lock), timer(threads->timer), work_queue(threads->work_queue) { } }; template <> struct InstanceReplayer { MOCK_METHOD3(acquire_image, void(InstanceWatcher *, const std::string &, Context *)); MOCK_METHOD2(release_image, void(const std::string &, Context *)); MOCK_METHOD3(remove_peer_image, void(const std::string&, const std::string&, Context *)); }; template <> struct ImageSyncThrottler { static ImageSyncThrottler* s_instance; static ImageSyncThrottler *create() { assert(s_instance != nullptr); return s_instance; } ImageSyncThrottler() { assert(s_instance == nullptr); s_instance = this; } virtual ~ImageSyncThrottler() { assert(s_instance == this); s_instance = nullptr; } MOCK_METHOD0(destroy, void()); MOCK_METHOD1(drain, void(int)); MOCK_METHOD2(start_op, void(const std::string &, Context *)); MOCK_METHOD1(finish_op, void(const std::string &)); }; ImageSyncThrottler* ImageSyncThrottler::s_instance = nullptr; } // namespace mirror } // namespace rbd // template definitions #include "tools/rbd_mirror/InstanceWatcher.cc" namespace rbd { namespace mirror { using ::testing::_; using ::testing::InSequence; using ::testing::Invoke; using ::testing::Return; using ::testing::StrEq; using ::testing::WithArg; class TestMockInstanceWatcher : public TestMockFixture { public: typedef librbd::ManagedLock MockManagedLock; typedef InstanceReplayer MockInstanceReplayer; typedef InstanceWatcher MockInstanceWatcher; typedef Threads MockThreads; std::string m_instance_id; std::string m_oid; MockThreads *m_mock_threads; void SetUp() override { TestFixture::SetUp(); m_local_io_ctx.remove(RBD_MIRROR_LEADER); EXPECT_EQ(0, m_local_io_ctx.create(RBD_MIRROR_LEADER, true)); m_instance_id = stringify(m_local_io_ctx.get_instance_id()); m_oid = RBD_MIRROR_INSTANCE_PREFIX + m_instance_id; m_mock_threads = new MockThreads(m_threads); } void TearDown() override { delete m_mock_threads; TestMockFixture::TearDown(); } void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) { EXPECT_CALL(mock_io_ctx, aio_watch(m_oid, _, _, _)); } void expect_register_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx, const std::string &instance_id) { std::string oid = RBD_MIRROR_INSTANCE_PREFIX + instance_id; EXPECT_CALL(mock_io_ctx, aio_watch(oid, _, _, _)); } void expect_unregister_watch(librados::MockTestMemIoCtxImpl &mock_io_ctx) { EXPECT_CALL(mock_io_ctx, aio_unwatch(_, _)); } void expect_register_instance(librados::MockTestMemIoCtxImpl &mock_io_ctx, int r) { EXPECT_CALL(mock_io_ctx, exec(RBD_MIRROR_LEADER, _, StrEq("rbd"), StrEq("mirror_instances_add"), _, _, _)) .WillOnce(Return(r)); } void expect_unregister_instance(librados::MockTestMemIoCtxImpl &mock_io_ctx, int r) { EXPECT_CALL(mock_io_ctx, exec(RBD_MIRROR_LEADER, _, StrEq("rbd"), StrEq("mirror_instances_remove"), _, _, _)) .WillOnce(Return(r)); } void expect_acquire_lock(MockManagedLock &mock_managed_lock, int r) { EXPECT_CALL(mock_managed_lock, acquire_lock(_)) .WillOnce(CompleteContext(r)); } void expect_release_lock(MockManagedLock &mock_managed_lock, int r) { EXPECT_CALL(mock_managed_lock, shut_down(_)).WillOnce(CompleteContext(r)); } void expect_destroy_lock(MockManagedLock &mock_managed_lock, Context *ctx = nullptr) { EXPECT_CALL(mock_managed_lock, destroy()) .WillOnce(Invoke([ctx]() { if (ctx != nullptr) { ctx->complete(0); } })); } void expect_get_locker(MockManagedLock &mock_managed_lock, const librbd::managed_lock::Locker &locker, int r) { EXPECT_CALL(mock_managed_lock, get_locker(_, _)) .WillOnce(Invoke([r, locker](librbd::managed_lock::Locker *out, Context *ctx) { if (r == 0) { *out = locker; } ctx->complete(r); })); } void expect_break_lock(MockManagedLock &mock_managed_lock, const librbd::managed_lock::Locker &locker, int r) { EXPECT_CALL(mock_managed_lock, break_lock(locker, true, _)) .WillOnce(WithArg<2>(CompleteContext(r))); } }; TEST_F(TestMockInstanceWatcher, InitShutdown) { MockManagedLock mock_managed_lock; librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); auto instance_watcher = new MockInstanceWatcher( m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); InSequence seq; // Init expect_register_instance(mock_io_ctx, 0); expect_register_watch(mock_io_ctx); expect_acquire_lock(mock_managed_lock, 0); ASSERT_EQ(0, instance_watcher->init()); // Shutdown expect_release_lock(mock_managed_lock, 0); expect_unregister_watch(mock_io_ctx); expect_unregister_instance(mock_io_ctx, 0); instance_watcher->shut_down(); expect_destroy_lock(mock_managed_lock); delete instance_watcher; } TEST_F(TestMockInstanceWatcher, InitError) { MockManagedLock mock_managed_lock; librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); auto instance_watcher = new MockInstanceWatcher( m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); InSequence seq; expect_register_instance(mock_io_ctx, 0); expect_register_watch(mock_io_ctx); expect_acquire_lock(mock_managed_lock, -EINVAL); expect_unregister_watch(mock_io_ctx); expect_unregister_instance(mock_io_ctx, 0); ASSERT_EQ(-EINVAL, instance_watcher->init()); expect_destroy_lock(mock_managed_lock); delete instance_watcher; } TEST_F(TestMockInstanceWatcher, ShutdownError) { MockManagedLock mock_managed_lock; librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); auto instance_watcher = new MockInstanceWatcher( m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); InSequence seq; // Init expect_register_instance(mock_io_ctx, 0); expect_register_watch(mock_io_ctx); expect_acquire_lock(mock_managed_lock, 0); ASSERT_EQ(0, instance_watcher->init()); // Shutdown expect_release_lock(mock_managed_lock, -EINVAL); expect_unregister_watch(mock_io_ctx); expect_unregister_instance(mock_io_ctx, 0); instance_watcher->shut_down(); expect_destroy_lock(mock_managed_lock); delete instance_watcher; } TEST_F(TestMockInstanceWatcher, Remove) { MockManagedLock mock_managed_lock; librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); librbd::managed_lock::Locker locker{entity_name_t::CLIENT(1), "auto 123", "1.2.3.4:0/0", 123}; InSequence seq; expect_get_locker(mock_managed_lock, locker, 0); expect_break_lock(mock_managed_lock, locker, 0); expect_unregister_instance(mock_io_ctx, 0); C_SaferCond on_destroy; expect_destroy_lock(mock_managed_lock, &on_destroy); C_SaferCond on_remove; MockInstanceWatcher::remove_instance(m_local_io_ctx, m_mock_threads->work_queue, "instance_id", &on_remove); ASSERT_EQ(0, on_remove.wait()); ASSERT_EQ(0, on_destroy.wait()); } TEST_F(TestMockInstanceWatcher, RemoveNoent) { MockManagedLock mock_managed_lock; librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); InSequence seq; expect_get_locker(mock_managed_lock, librbd::managed_lock::Locker(), -ENOENT); expect_unregister_instance(mock_io_ctx, 0); C_SaferCond on_destroy; expect_destroy_lock(mock_managed_lock, &on_destroy); C_SaferCond on_remove; MockInstanceWatcher::remove_instance(m_local_io_ctx, m_mock_threads->work_queue, "instance_id", &on_remove); ASSERT_EQ(0, on_remove.wait()); ASSERT_EQ(0, on_destroy.wait()); } TEST_F(TestMockInstanceWatcher, ImageAcquireRelease) { MockManagedLock mock_managed_lock; librados::IoCtx& io_ctx1 = m_local_io_ctx; std::string instance_id1 = m_instance_id; librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1)); MockInstanceReplayer mock_instance_replayer1; auto instance_watcher1 = MockInstanceWatcher::create( io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1); librados::Rados cluster; librados::IoCtx io_ctx2; EXPECT_EQ("", connect_cluster_pp(cluster)); EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2)); std::string instance_id2 = stringify(io_ctx2.get_instance_id()); librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2)); MockInstanceReplayer mock_instance_replayer2; auto instance_watcher2 = MockInstanceWatcher::create( io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2); InSequence seq; // Init instance watcher 1 expect_register_instance(mock_io_ctx1, 0); expect_register_watch(mock_io_ctx1, instance_id1); expect_acquire_lock(mock_managed_lock, 0); ASSERT_EQ(0, instance_watcher1->init()); // Init instance watcher 2 expect_register_instance(mock_io_ctx2, 0); expect_register_watch(mock_io_ctx2, instance_id2); expect_acquire_lock(mock_managed_lock, 0); ASSERT_EQ(0, instance_watcher2->init()); // Acquire Image on the the same instance EXPECT_CALL(mock_instance_replayer1, acquire_image(instance_watcher1, "gid", _)) .WillOnce(WithArg<2>(CompleteContext(0))); C_SaferCond on_acquire1; instance_watcher1->notify_image_acquire(instance_id1, "gid", &on_acquire1); ASSERT_EQ(0, on_acquire1.wait()); // Acquire Image on the other instance EXPECT_CALL(mock_instance_replayer2, acquire_image(instance_watcher2, "gid", _)) .WillOnce(WithArg<2>(CompleteContext(0))); C_SaferCond on_acquire2; instance_watcher1->notify_image_acquire(instance_id2, "gid", &on_acquire2); ASSERT_EQ(0, on_acquire2.wait()); // Release Image on the the same instance EXPECT_CALL(mock_instance_replayer1, release_image("gid", _)) .WillOnce(WithArg<1>(CompleteContext(0))); C_SaferCond on_release1; instance_watcher1->notify_image_release(instance_id1, "gid", &on_release1); ASSERT_EQ(0, on_release1.wait()); // Release Image on the other instance EXPECT_CALL(mock_instance_replayer2, release_image("gid", _)) .WillOnce(WithArg<1>(CompleteContext(0))); C_SaferCond on_release2; instance_watcher1->notify_image_release(instance_id2, "gid", &on_release2); ASSERT_EQ(0, on_release2.wait()); // Shutdown instance watcher 1 expect_release_lock(mock_managed_lock, 0); expect_unregister_watch(mock_io_ctx1); expect_unregister_instance(mock_io_ctx1, 0); instance_watcher1->shut_down(); expect_destroy_lock(mock_managed_lock); delete instance_watcher1; // Shutdown instance watcher 2 expect_release_lock(mock_managed_lock, 0); expect_unregister_watch(mock_io_ctx2); expect_unregister_instance(mock_io_ctx2, 0); instance_watcher2->shut_down(); expect_destroy_lock(mock_managed_lock); delete instance_watcher2; } TEST_F(TestMockInstanceWatcher, PeerImageRemoved) { MockManagedLock mock_managed_lock; librados::IoCtx& io_ctx1 = m_local_io_ctx; std::string instance_id1 = m_instance_id; librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1)); MockInstanceReplayer mock_instance_replayer1; auto instance_watcher1 = MockInstanceWatcher::create( io_ctx1, m_mock_threads->work_queue, &mock_instance_replayer1); librados::Rados cluster; librados::IoCtx io_ctx2; EXPECT_EQ("", connect_cluster_pp(cluster)); EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2)); std::string instance_id2 = stringify(io_ctx2.get_instance_id()); librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2)); MockInstanceReplayer mock_instance_replayer2; auto instance_watcher2 = MockInstanceWatcher::create( io_ctx2, m_mock_threads->work_queue, &mock_instance_replayer2); InSequence seq; // Init instance watcher 1 expect_register_instance(mock_io_ctx1, 0); expect_register_watch(mock_io_ctx1, instance_id1); expect_acquire_lock(mock_managed_lock, 0); ASSERT_EQ(0, instance_watcher1->init()); // Init instance watcher 2 expect_register_instance(mock_io_ctx2, 0); expect_register_watch(mock_io_ctx2, instance_id2); expect_acquire_lock(mock_managed_lock, 0); ASSERT_EQ(0, instance_watcher2->init()); // Peer Image Removed on the same instance EXPECT_CALL(mock_instance_replayer1, remove_peer_image("gid", "uuid", _)) .WillOnce(WithArg<2>(CompleteContext(0))); C_SaferCond on_removed1; instance_watcher1->notify_peer_image_removed(instance_id1, "gid", "uuid", &on_removed1); ASSERT_EQ(0, on_removed1.wait()); // Peer Image Removed on the other instance EXPECT_CALL(mock_instance_replayer2, remove_peer_image("gid", "uuid", _)) .WillOnce(WithArg<2>(CompleteContext(0))); C_SaferCond on_removed2; instance_watcher1->notify_peer_image_removed(instance_id2, "gid", "uuid", &on_removed2); ASSERT_EQ(0, on_removed2.wait()); // Shutdown instance watcher 1 expect_release_lock(mock_managed_lock, 0); expect_unregister_watch(mock_io_ctx1); expect_unregister_instance(mock_io_ctx1, 0); instance_watcher1->shut_down(); expect_destroy_lock(mock_managed_lock); delete instance_watcher1; // Shutdown instance watcher 2 expect_release_lock(mock_managed_lock, 0); expect_unregister_watch(mock_io_ctx2); expect_unregister_instance(mock_io_ctx2, 0); instance_watcher2->shut_down(); expect_destroy_lock(mock_managed_lock); delete instance_watcher2; } TEST_F(TestMockInstanceWatcher, ImageAcquireReleaseCancel) { MockManagedLock mock_managed_lock; librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); auto instance_watcher = new MockInstanceWatcher( m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); InSequence seq; // Init expect_register_instance(mock_io_ctx, 0); expect_register_watch(mock_io_ctx); expect_acquire_lock(mock_managed_lock, 0); ASSERT_EQ(0, instance_watcher->init()); // Send Acquire Image and cancel EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _)) .WillOnce(Invoke( [this, instance_watcher, &mock_io_ctx]( const std::string& o, librados::AioCompletionImpl *c, bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) { c->get(); auto ctx = new FunctionContext( [instance_watcher, &mock_io_ctx, c, pbl](int r) { instance_watcher->cancel_notify_requests("other"); ::encode(librbd::watcher::NotifyResponse(), *pbl); mock_io_ctx.get_mock_rados_client()-> finish_aio_completion(c, -ETIMEDOUT); }); m_threads->work_queue->queue(ctx, 0); })); C_SaferCond on_acquire; instance_watcher->notify_image_acquire("other", "gid", &on_acquire); ASSERT_EQ(-ECANCELED, on_acquire.wait()); // Send Release Image and cancel EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _)) .WillOnce(Invoke( [this, instance_watcher, &mock_io_ctx]( const std::string& o, librados::AioCompletionImpl *c, bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) { c->get(); auto ctx = new FunctionContext( [instance_watcher, &mock_io_ctx, c, pbl](int r) { instance_watcher->cancel_notify_requests("other"); ::encode(librbd::watcher::NotifyResponse(), *pbl); mock_io_ctx.get_mock_rados_client()-> finish_aio_completion(c, -ETIMEDOUT); }); m_threads->work_queue->queue(ctx, 0); })); C_SaferCond on_release; instance_watcher->notify_image_release("other", "gid", &on_release); ASSERT_EQ(-ECANCELED, on_release.wait()); // Shutdown expect_release_lock(mock_managed_lock, 0); expect_unregister_watch(mock_io_ctx); expect_unregister_instance(mock_io_ctx, 0); instance_watcher->shut_down(); expect_destroy_lock(mock_managed_lock); delete instance_watcher; } TEST_F(TestMockInstanceWatcher, PeerImageRemovedCancel) { MockManagedLock mock_managed_lock; librados::MockTestMemIoCtxImpl &mock_io_ctx(get_mock_io_ctx(m_local_io_ctx)); auto instance_watcher = new MockInstanceWatcher( m_local_io_ctx, m_mock_threads->work_queue, nullptr, m_instance_id); InSequence seq; // Init expect_register_instance(mock_io_ctx, 0); expect_register_watch(mock_io_ctx); expect_acquire_lock(mock_managed_lock, 0); ASSERT_EQ(0, instance_watcher->init()); // Send Acquire Image and cancel EXPECT_CALL(mock_io_ctx, aio_notify(_, _, _, _, _)) .WillOnce(Invoke( [this, instance_watcher, &mock_io_ctx]( const std::string& o, librados::AioCompletionImpl *c, bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) { c->get(); auto ctx = new FunctionContext( [instance_watcher, &mock_io_ctx, c, pbl](int r) { instance_watcher->cancel_notify_requests("other"); ::encode(librbd::watcher::NotifyResponse(), *pbl); mock_io_ctx.get_mock_rados_client()-> finish_aio_completion(c, -ETIMEDOUT); }); m_threads->work_queue->queue(ctx, 0); })); C_SaferCond on_acquire; instance_watcher->notify_peer_image_removed("other", "gid", "uuid", &on_acquire); ASSERT_EQ(-ECANCELED, on_acquire.wait()); // Shutdown expect_release_lock(mock_managed_lock, 0); expect_unregister_watch(mock_io_ctx); expect_unregister_instance(mock_io_ctx, 0); instance_watcher->shut_down(); expect_destroy_lock(mock_managed_lock); delete instance_watcher; } class TestMockInstanceWatcher_NotifySync : public TestMockInstanceWatcher { public: typedef ImageSyncThrottler MockImageSyncThrottler; MockManagedLock mock_managed_lock; MockImageSyncThrottler mock_image_sync_throttler; std::string instance_id1; std::string instance_id2; librados::Rados cluster; librados::IoCtx io_ctx2; MockInstanceWatcher *instance_watcher1; MockInstanceWatcher *instance_watcher2; void SetUp() override { TestMockInstanceWatcher::SetUp(); instance_id1 = m_instance_id; librados::IoCtx& io_ctx1 = m_local_io_ctx; librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1)); instance_watcher1 = MockInstanceWatcher::create(io_ctx1, m_mock_threads->work_queue, nullptr); EXPECT_EQ("", connect_cluster_pp(cluster)); EXPECT_EQ(0, cluster.ioctx_create(_local_pool_name.c_str(), io_ctx2)); instance_id2 = stringify(io_ctx2.get_instance_id()); librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2)); instance_watcher2 = MockInstanceWatcher::create(io_ctx2, m_mock_threads->work_queue, nullptr); InSequence seq; // Init instance watcher 1 (leader) expect_register_instance(mock_io_ctx1, 0); expect_register_watch(mock_io_ctx1, instance_id1); expect_acquire_lock(mock_managed_lock, 0); EXPECT_EQ(0, instance_watcher1->init()); instance_watcher1->handle_acquire_leader(); // Init instance watcher 2 expect_register_instance(mock_io_ctx2, 0); expect_register_watch(mock_io_ctx2, instance_id2); expect_acquire_lock(mock_managed_lock, 0); EXPECT_EQ(0, instance_watcher2->init()); instance_watcher2->handle_update_leader(instance_id1); } void TearDown() override { librados::IoCtx& io_ctx1 = m_local_io_ctx; librados::MockTestMemIoCtxImpl &mock_io_ctx1(get_mock_io_ctx(io_ctx1)); librados::MockTestMemIoCtxImpl &mock_io_ctx2(get_mock_io_ctx(io_ctx2)); InSequence seq; expect_throttler_destroy(); instance_watcher1->handle_release_leader(); // Shutdown instance watcher 1 expect_release_lock(mock_managed_lock, 0); expect_unregister_watch(mock_io_ctx1); expect_unregister_instance(mock_io_ctx1, 0); instance_watcher1->shut_down(); expect_destroy_lock(mock_managed_lock); delete instance_watcher1; // Shutdown instance watcher 2 expect_release_lock(mock_managed_lock, 0); expect_unregister_watch(mock_io_ctx2); expect_unregister_instance(mock_io_ctx2, 0); instance_watcher2->shut_down(); expect_destroy_lock(mock_managed_lock); delete instance_watcher2; TestMockInstanceWatcher::TearDown(); } void expect_throttler_destroy( std::vector *throttler_queue = nullptr) { EXPECT_CALL(mock_image_sync_throttler, drain(-ESTALE)) .WillOnce(Invoke([throttler_queue] (int r) { if (throttler_queue != nullptr) { for (auto ctx : *throttler_queue) { ctx->complete(r); } } })); EXPECT_CALL(mock_image_sync_throttler, destroy()); } void expect_throttler_start_op(const std::string &sync_id, Context *on_call = nullptr, Context **on_start_ctx = nullptr) { EXPECT_CALL(mock_image_sync_throttler, start_op(sync_id, _)) .WillOnce(Invoke([on_call, on_start_ctx] (const std::string &, Context *ctx) { if (on_call != nullptr) { on_call->complete(0); } if (on_start_ctx != nullptr) { *on_start_ctx = ctx; } else { ctx->complete(0); } })); } void expect_throttler_finish_op(const std::string &sync_id, Context *on_finish) { EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id")) .WillOnce(Invoke([on_finish](const std::string &) { on_finish->complete(0); })); } }; TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnLeader) { InSequence seq; expect_throttler_start_op("sync_id"); C_SaferCond on_start; instance_watcher1->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start.wait()); C_SaferCond on_finish; expect_throttler_finish_op("sync_id", &on_finish); instance_watcher1->notify_sync_complete("sync_id"); ASSERT_EQ(0, on_finish.wait()); } TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnLeader) { InSequence seq; expect_throttler_start_op("sync_id"); C_SaferCond on_start; instance_watcher1->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start.wait()); ASSERT_FALSE(instance_watcher1->cancel_sync_request("sync_id")); C_SaferCond on_finish; expect_throttler_finish_op("sync_id", &on_finish); instance_watcher1->notify_sync_complete("sync_id"); ASSERT_EQ(0, on_finish.wait()); } TEST_F(TestMockInstanceWatcher_NotifySync, StartStopOnNonLeader) { InSequence seq; expect_throttler_start_op("sync_id"); C_SaferCond on_start; instance_watcher2->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start.wait()); C_SaferCond on_finish; expect_throttler_finish_op("sync_id", &on_finish); instance_watcher2->notify_sync_complete("sync_id"); ASSERT_EQ(0, on_finish.wait()); } TEST_F(TestMockInstanceWatcher_NotifySync, CancelStartedOnNonLeader) { InSequence seq; expect_throttler_start_op("sync_id"); C_SaferCond on_start; instance_watcher2->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start.wait()); ASSERT_FALSE(instance_watcher2->cancel_sync_request("sync_id")); C_SaferCond on_finish; expect_throttler_finish_op("sync_id", &on_finish); instance_watcher2->notify_sync_complete("sync_id"); ASSERT_EQ(0, on_finish.wait()); } TEST_F(TestMockInstanceWatcher_NotifySync, CancelWaitingOnNonLeader) { InSequence seq; C_SaferCond on_start_op_called; Context *on_start_ctx; expect_throttler_start_op("sync_id", &on_start_op_called, &on_start_ctx); C_SaferCond on_start; instance_watcher2->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start_op_called.wait()); ASSERT_TRUE(instance_watcher2->cancel_sync_request("sync_id")); // emulate watcher timeout on_start_ctx->complete(-ETIMEDOUT); ASSERT_EQ(-ECANCELED, on_start.wait()); } TEST_F(TestMockInstanceWatcher_NotifySync, InFlightPrevNotification) { // start sync when previous notification is still in flight InSequence seq; expect_throttler_start_op("sync_id"); C_SaferCond on_start1; instance_watcher2->notify_sync_request("sync_id", &on_start1); ASSERT_EQ(0, on_start1.wait()); C_SaferCond on_start2; EXPECT_CALL(mock_image_sync_throttler, finish_op("sync_id")) .WillOnce(Invoke([this, &on_start2](const std::string &) { instance_watcher2->notify_sync_request("sync_id", &on_start2); })); expect_throttler_start_op("sync_id"); instance_watcher2->notify_sync_complete("sync_id"); ASSERT_EQ(0, on_start2.wait()); C_SaferCond on_finish; expect_throttler_finish_op("sync_id", &on_finish); instance_watcher2->notify_sync_complete("sync_id"); ASSERT_EQ(0, on_finish.wait()); } TEST_F(TestMockInstanceWatcher_NotifySync, NoInFlightReleaseAcquireLeader) { InSequence seq; expect_throttler_destroy(); instance_watcher1->handle_release_leader(); instance_watcher1->handle_acquire_leader(); } TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnLeaderReleaseLeader) { InSequence seq; expect_throttler_destroy(); instance_watcher1->handle_release_leader(); instance_watcher2->handle_acquire_leader(); expect_throttler_start_op("sync_id"); C_SaferCond on_start; instance_watcher2->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start.wait()); expect_throttler_destroy(); instance_watcher2->handle_release_leader(); instance_watcher2->notify_sync_complete("sync_id"); instance_watcher1->handle_acquire_leader(); } TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnLeaderReleaseLeader) { InSequence seq; C_SaferCond on_start_op_called; Context *on_start_ctx; expect_throttler_start_op("sync_id", &on_start_op_called, &on_start_ctx); C_SaferCond on_start; instance_watcher1->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start_op_called.wait()); std::vector throttler_queue = {on_start_ctx}; expect_throttler_destroy(&throttler_queue); instance_watcher1->handle_release_leader(); instance_watcher2->handle_acquire_leader(); instance_watcher1->handle_update_leader(instance_id2); expect_throttler_start_op("sync_id"); ASSERT_EQ(0, on_start.wait()); C_SaferCond on_finish; expect_throttler_finish_op("sync_id", &on_finish); instance_watcher1->notify_sync_complete("sync_id"); ASSERT_EQ(0, on_finish.wait()); expect_throttler_destroy(); instance_watcher2->handle_release_leader(); instance_watcher1->handle_acquire_leader(); } TEST_F(TestMockInstanceWatcher_NotifySync, StartedOnNonLeaderAcquireLeader) { InSequence seq; expect_throttler_destroy(); instance_watcher1->handle_release_leader(); instance_watcher2->handle_acquire_leader(); instance_watcher1->handle_update_leader(instance_id2); expect_throttler_start_op("sync_id"); C_SaferCond on_start; instance_watcher1->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start.wait()); expect_throttler_destroy(); instance_watcher2->handle_release_leader(); instance_watcher1->handle_acquire_leader(); instance_watcher2->handle_update_leader(instance_id2); instance_watcher1->notify_sync_complete("sync_id"); } TEST_F(TestMockInstanceWatcher_NotifySync, WaitingOnNonLeaderAcquireLeader) { InSequence seq; C_SaferCond on_start_op_called; Context *on_start_ctx; expect_throttler_start_op("sync_id", &on_start_op_called, &on_start_ctx); C_SaferCond on_start; instance_watcher2->notify_sync_request("sync_id", &on_start); ASSERT_EQ(0, on_start_op_called.wait()); std::vector throttler_queue = {on_start_ctx}; expect_throttler_destroy(&throttler_queue); instance_watcher1->handle_release_leader(); EXPECT_CALL(mock_image_sync_throttler, start_op("sync_id", _)) .WillOnce(WithArg<1>(CompleteContext(0))); instance_watcher2->handle_acquire_leader(); instance_watcher1->handle_update_leader(instance_id2); ASSERT_EQ(0, on_start.wait()); C_SaferCond on_finish; expect_throttler_finish_op("sync_id", &on_finish); instance_watcher2->notify_sync_complete("sync_id"); ASSERT_EQ(0, on_finish.wait()); expect_throttler_destroy(); instance_watcher2->handle_release_leader(); instance_watcher1->handle_acquire_leader(); } } // namespace mirror } // namespace rbd