X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftest%2Frbd_mirror%2Ftest_LeaderWatcher.cc;fp=src%2Fceph%2Fsrc%2Ftest%2Frbd_mirror%2Ftest_LeaderWatcher.cc;h=c1699892a731e1e36777b2c05fca6f3279e3ca64;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/test/rbd_mirror/test_LeaderWatcher.cc b/src/ceph/src/test/rbd_mirror/test_LeaderWatcher.cc new file mode 100644 index 0000000..c169989 --- /dev/null +++ b/src/ceph/src/test/rbd_mirror/test_LeaderWatcher.cc @@ -0,0 +1,317 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/rados/librados.hpp" +#include "librbd/internal.h" +#include "librbd/Utils.h" +#include "librbd/api/Mirror.h" +#include "test/librbd/test_support.h" +#include "test/rbd_mirror/test_fixture.h" +#include "tools/rbd_mirror/LeaderWatcher.h" +#include "tools/rbd_mirror/Threads.h" + +#include "test/librados/test.h" +#include "gtest/gtest.h" + +using librbd::util::unique_lock_name; +using rbd::mirror::LeaderWatcher; + +void register_test_leader_watcher() { +} + +class TestLeaderWatcher : public ::rbd::mirror::TestFixture { +public: + class Listener : public rbd::mirror::LeaderWatcher<>::Listener { + public: + Listener() + : m_test_lock(unique_lock_name("LeaderWatcher::m_test_lock", this)) { + } + + void on_acquire(int r, Context *ctx) { + Mutex::Locker locker(m_test_lock); + m_on_acquire_r = r; + m_on_acquire = ctx; + } + + void on_release(int r, Context *ctx) { + Mutex::Locker locker(m_test_lock); + m_on_release_r = r; + m_on_release = ctx; + } + + int acquire_count() const { + Mutex::Locker locker(m_test_lock); + return m_acquire_count; + } + + int release_count() const { + Mutex::Locker locker(m_test_lock); + return m_release_count; + } + + void post_acquire_handler(Context *on_finish) override { + Mutex::Locker locker(m_test_lock); + m_acquire_count++; + on_finish->complete(m_on_acquire_r); + m_on_acquire_r = 0; + if (m_on_acquire != nullptr) { + m_on_acquire->complete(0); + m_on_acquire = nullptr; + } + } + + void pre_release_handler(Context *on_finish) override { + Mutex::Locker locker(m_test_lock); + m_release_count++; + on_finish->complete(m_on_release_r); + m_on_release_r = 0; + if (m_on_release != nullptr) { + m_on_release->complete(0); + m_on_release = nullptr; + } + } + + void update_leader_handler(const std::string &leader_instance_id) override { + } + + private: + mutable Mutex m_test_lock; + int m_acquire_count = 0; + int m_release_count = 0; + int m_on_acquire_r = 0; + int m_on_release_r = 0; + Context *m_on_acquire = nullptr; + Context *m_on_release = nullptr; + }; + + struct Connection { + librados::Rados cluster; + librados::IoCtx io_ctx; + }; + + std::list > m_connections; + + void SetUp() override { + TestFixture::SetUp(); + EXPECT_EQ(0, librbd::api::Mirror<>::mode_set(m_local_io_ctx, + RBD_MIRROR_MODE_POOL)); + + if (is_librados_test_stub()) { + // speed testing up a little + EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval", + "1")); + } + } + + bool is_librados_test_stub() { + std::string fsid; + EXPECT_EQ(0, _rados->cluster_fsid(&fsid)); + return fsid == "00000000-1111-2222-3333-444444444444"; + } + + librados::IoCtx &create_connection(bool no_heartbeats = false) { + m_connections.push_back(std::unique_ptr(new Connection())); + Connection *c = m_connections.back().get(); + + EXPECT_EQ("", connect_cluster_pp(c->cluster)); + if (no_heartbeats) { + EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval", + "3600")); + } else if (is_librados_test_stub()) { + EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval", + "1")); + } + EXPECT_EQ(0, c->cluster.ioctx_create(_local_pool_name.c_str(), c->io_ctx)); + + return c->io_ctx; + } +}; + +TEST_F(TestLeaderWatcher, InitShutdown) +{ + Listener listener; + LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener); + + C_SaferCond on_init_acquire; + listener.on_acquire(0, &on_init_acquire); + ASSERT_EQ(0, leader_watcher.init()); + ASSERT_EQ(0, on_init_acquire.wait()); + ASSERT_TRUE(leader_watcher.is_leader()); + + leader_watcher.shut_down(); + ASSERT_EQ(1, listener.acquire_count()); + ASSERT_EQ(1, listener.release_count()); + ASSERT_FALSE(leader_watcher.is_leader()); +} + +TEST_F(TestLeaderWatcher, Release) +{ + Listener listener; + LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener); + + C_SaferCond on_init_acquire; + listener.on_acquire(0, &on_init_acquire); + ASSERT_EQ(0, leader_watcher.init()); + ASSERT_EQ(0, on_init_acquire.wait()); + ASSERT_TRUE(leader_watcher.is_leader()); + + C_SaferCond on_release; + C_SaferCond on_acquire; + listener.on_release(0, &on_release); + listener.on_acquire(0, &on_acquire); + leader_watcher.release_leader(); + ASSERT_EQ(0, on_release.wait()); + ASSERT_FALSE(leader_watcher.is_leader()); + + // wait for lock re-acquired due to no another locker + ASSERT_EQ(0, on_acquire.wait()); + ASSERT_TRUE(leader_watcher.is_leader()); + + C_SaferCond on_release2; + listener.on_release(0, &on_release2); + leader_watcher.release_leader(); + ASSERT_EQ(0, on_release2.wait()); + + leader_watcher.shut_down(); + ASSERT_EQ(2, listener.acquire_count()); + ASSERT_EQ(2, listener.release_count()); +} + +TEST_F(TestLeaderWatcher, ListenerError) +{ + Listener listener; + LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener); + + // make listener return error on acquire + C_SaferCond on_init_acquire, on_init_release; + listener.on_acquire(-EINVAL, &on_init_acquire); + listener.on_release(0, &on_init_release); + ASSERT_EQ(0, leader_watcher.init()); + ASSERT_EQ(0, on_init_acquire.wait()); + ASSERT_EQ(0, on_init_release.wait()); + ASSERT_FALSE(leader_watcher.is_leader()); + + // wait for lock re-acquired due to no another locker + C_SaferCond on_acquire; + listener.on_acquire(0, &on_acquire); + ASSERT_EQ(0, on_acquire.wait()); + ASSERT_TRUE(leader_watcher.is_leader()); + + // make listener return error on release + C_SaferCond on_release; + listener.on_release(-EINVAL, &on_release); + leader_watcher.release_leader(); + ASSERT_EQ(0, on_release.wait()); + ASSERT_FALSE(leader_watcher.is_leader()); + + leader_watcher.shut_down(); + ASSERT_EQ(2, listener.acquire_count()); + ASSERT_EQ(2, listener.release_count()); + ASSERT_FALSE(leader_watcher.is_leader()); +} + +TEST_F(TestLeaderWatcher, Two) +{ + Listener listener1; + LeaderWatcher<> leader_watcher1(m_threads, create_connection(), &listener1); + + C_SaferCond on_init_acquire; + listener1.on_acquire(0, &on_init_acquire); + ASSERT_EQ(0, leader_watcher1.init()); + ASSERT_EQ(0, on_init_acquire.wait()); + + Listener listener2; + LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2); + + ASSERT_EQ(0, leader_watcher2.init()); + ASSERT_TRUE(leader_watcher1.is_leader()); + ASSERT_FALSE(leader_watcher2.is_leader()); + + C_SaferCond on_release; + C_SaferCond on_acquire; + listener1.on_release(0, &on_release); + listener2.on_acquire(0, &on_acquire); + leader_watcher1.release_leader(); + ASSERT_EQ(0, on_release.wait()); + ASSERT_FALSE(leader_watcher1.is_leader()); + + // wait for lock acquired by another watcher + ASSERT_EQ(0, on_acquire.wait()); + ASSERT_TRUE(leader_watcher2.is_leader()); + + leader_watcher1.shut_down(); + leader_watcher2.shut_down(); + + ASSERT_EQ(1, listener1.acquire_count()); + ASSERT_EQ(1, listener1.release_count()); + ASSERT_EQ(1, listener2.acquire_count()); + ASSERT_EQ(1, listener2.release_count()); +} + +TEST_F(TestLeaderWatcher, Break) +{ + Listener listener1, listener2; + LeaderWatcher<> leader_watcher1(m_threads, + create_connection(true /* no heartbeats */), + &listener1); + LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2); + + C_SaferCond on_init_acquire; + listener1.on_acquire(0, &on_init_acquire); + ASSERT_EQ(0, leader_watcher1.init()); + ASSERT_EQ(0, on_init_acquire.wait()); + + C_SaferCond on_acquire; + listener2.on_acquire(0, &on_acquire); + ASSERT_EQ(0, leader_watcher2.init()); + ASSERT_FALSE(leader_watcher2.is_leader()); + + // wait for lock broken due to no heartbeats and re-acquired + ASSERT_EQ(0, on_acquire.wait()); + ASSERT_TRUE(leader_watcher2.is_leader()); + + leader_watcher1.shut_down(); + leader_watcher2.shut_down(); +} + +TEST_F(TestLeaderWatcher, Stress) +{ + const int WATCHERS_COUNT = 20; + std::list *> leader_watchers; + Listener listener; + + for (int i = 0; i < WATCHERS_COUNT; i++) { + auto leader_watcher = + new LeaderWatcher<>(m_threads, create_connection(), &listener); + leader_watchers.push_back(leader_watcher); + } + + C_SaferCond on_init_acquire; + listener.on_acquire(0, &on_init_acquire); + for (auto &leader_watcher : leader_watchers) { + ASSERT_EQ(0, leader_watcher->init()); + } + ASSERT_EQ(0, on_init_acquire.wait()); + + while (true) { + C_SaferCond on_acquire; + listener.on_acquire(0, &on_acquire); + std::unique_ptr > leader_watcher; + for (auto it = leader_watchers.begin(); it != leader_watchers.end(); ) { + if ((*it)->is_leader()) { + ASSERT_FALSE(leader_watcher); + leader_watcher.reset(*it); + it = leader_watchers.erase(it); + } else { + it++; + } + } + + ASSERT_TRUE(leader_watcher); + leader_watcher->shut_down(); + if (leader_watchers.empty()) { + break; + } + ASSERT_EQ(0, on_acquire.wait()); + } +}