1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/rados/librados.hpp"
5 #include "librbd/internal.h"
6 #include "librbd/Utils.h"
7 #include "librbd/api/Mirror.h"
8 #include "test/librbd/test_support.h"
9 #include "test/rbd_mirror/test_fixture.h"
10 #include "tools/rbd_mirror/LeaderWatcher.h"
11 #include "tools/rbd_mirror/Threads.h"
13 #include "test/librados/test.h"
14 #include "gtest/gtest.h"
16 using librbd::util::unique_lock_name;
17 using rbd::mirror::LeaderWatcher;
19 void register_test_leader_watcher() {
22 class TestLeaderWatcher : public ::rbd::mirror::TestFixture {
24 class Listener : public rbd::mirror::LeaderWatcher<>::Listener {
27 : m_test_lock(unique_lock_name("LeaderWatcher::m_test_lock", this)) {
30 void on_acquire(int r, Context *ctx) {
31 Mutex::Locker locker(m_test_lock);
36 void on_release(int r, Context *ctx) {
37 Mutex::Locker locker(m_test_lock);
42 int acquire_count() const {
43 Mutex::Locker locker(m_test_lock);
44 return m_acquire_count;
47 int release_count() const {
48 Mutex::Locker locker(m_test_lock);
49 return m_release_count;
52 void post_acquire_handler(Context *on_finish) override {
53 Mutex::Locker locker(m_test_lock);
55 on_finish->complete(m_on_acquire_r);
57 if (m_on_acquire != nullptr) {
58 m_on_acquire->complete(0);
59 m_on_acquire = nullptr;
63 void pre_release_handler(Context *on_finish) override {
64 Mutex::Locker locker(m_test_lock);
66 on_finish->complete(m_on_release_r);
68 if (m_on_release != nullptr) {
69 m_on_release->complete(0);
70 m_on_release = nullptr;
74 void update_leader_handler(const std::string &leader_instance_id) override {
78 mutable Mutex m_test_lock;
79 int m_acquire_count = 0;
80 int m_release_count = 0;
81 int m_on_acquire_r = 0;
82 int m_on_release_r = 0;
83 Context *m_on_acquire = nullptr;
84 Context *m_on_release = nullptr;
88 librados::Rados cluster;
89 librados::IoCtx io_ctx;
92 std::list<std::unique_ptr<Connection> > m_connections;
94 void SetUp() override {
96 EXPECT_EQ(0, librbd::api::Mirror<>::mode_set(m_local_io_ctx,
97 RBD_MIRROR_MODE_POOL));
99 if (is_librados_test_stub()) {
100 // speed testing up a little
101 EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval",
106 bool is_librados_test_stub() {
108 EXPECT_EQ(0, _rados->cluster_fsid(&fsid));
109 return fsid == "00000000-1111-2222-3333-444444444444";
112 librados::IoCtx &create_connection(bool no_heartbeats = false) {
113 m_connections.push_back(std::unique_ptr<Connection>(new Connection()));
114 Connection *c = m_connections.back().get();
116 EXPECT_EQ("", connect_cluster_pp(c->cluster));
118 EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval",
120 } else if (is_librados_test_stub()) {
121 EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval",
124 EXPECT_EQ(0, c->cluster.ioctx_create(_local_pool_name.c_str(), c->io_ctx));
130 TEST_F(TestLeaderWatcher, InitShutdown)
133 LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
135 C_SaferCond on_init_acquire;
136 listener.on_acquire(0, &on_init_acquire);
137 ASSERT_EQ(0, leader_watcher.init());
138 ASSERT_EQ(0, on_init_acquire.wait());
139 ASSERT_TRUE(leader_watcher.is_leader());
141 leader_watcher.shut_down();
142 ASSERT_EQ(1, listener.acquire_count());
143 ASSERT_EQ(1, listener.release_count());
144 ASSERT_FALSE(leader_watcher.is_leader());
147 TEST_F(TestLeaderWatcher, Release)
150 LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
152 C_SaferCond on_init_acquire;
153 listener.on_acquire(0, &on_init_acquire);
154 ASSERT_EQ(0, leader_watcher.init());
155 ASSERT_EQ(0, on_init_acquire.wait());
156 ASSERT_TRUE(leader_watcher.is_leader());
158 C_SaferCond on_release;
159 C_SaferCond on_acquire;
160 listener.on_release(0, &on_release);
161 listener.on_acquire(0, &on_acquire);
162 leader_watcher.release_leader();
163 ASSERT_EQ(0, on_release.wait());
164 ASSERT_FALSE(leader_watcher.is_leader());
166 // wait for lock re-acquired due to no another locker
167 ASSERT_EQ(0, on_acquire.wait());
168 ASSERT_TRUE(leader_watcher.is_leader());
170 C_SaferCond on_release2;
171 listener.on_release(0, &on_release2);
172 leader_watcher.release_leader();
173 ASSERT_EQ(0, on_release2.wait());
175 leader_watcher.shut_down();
176 ASSERT_EQ(2, listener.acquire_count());
177 ASSERT_EQ(2, listener.release_count());
180 TEST_F(TestLeaderWatcher, ListenerError)
183 LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
185 // make listener return error on acquire
186 C_SaferCond on_init_acquire, on_init_release;
187 listener.on_acquire(-EINVAL, &on_init_acquire);
188 listener.on_release(0, &on_init_release);
189 ASSERT_EQ(0, leader_watcher.init());
190 ASSERT_EQ(0, on_init_acquire.wait());
191 ASSERT_EQ(0, on_init_release.wait());
192 ASSERT_FALSE(leader_watcher.is_leader());
194 // wait for lock re-acquired due to no another locker
195 C_SaferCond on_acquire;
196 listener.on_acquire(0, &on_acquire);
197 ASSERT_EQ(0, on_acquire.wait());
198 ASSERT_TRUE(leader_watcher.is_leader());
200 // make listener return error on release
201 C_SaferCond on_release;
202 listener.on_release(-EINVAL, &on_release);
203 leader_watcher.release_leader();
204 ASSERT_EQ(0, on_release.wait());
205 ASSERT_FALSE(leader_watcher.is_leader());
207 leader_watcher.shut_down();
208 ASSERT_EQ(2, listener.acquire_count());
209 ASSERT_EQ(2, listener.release_count());
210 ASSERT_FALSE(leader_watcher.is_leader());
213 TEST_F(TestLeaderWatcher, Two)
216 LeaderWatcher<> leader_watcher1(m_threads, create_connection(), &listener1);
218 C_SaferCond on_init_acquire;
219 listener1.on_acquire(0, &on_init_acquire);
220 ASSERT_EQ(0, leader_watcher1.init());
221 ASSERT_EQ(0, on_init_acquire.wait());
224 LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2);
226 ASSERT_EQ(0, leader_watcher2.init());
227 ASSERT_TRUE(leader_watcher1.is_leader());
228 ASSERT_FALSE(leader_watcher2.is_leader());
230 C_SaferCond on_release;
231 C_SaferCond on_acquire;
232 listener1.on_release(0, &on_release);
233 listener2.on_acquire(0, &on_acquire);
234 leader_watcher1.release_leader();
235 ASSERT_EQ(0, on_release.wait());
236 ASSERT_FALSE(leader_watcher1.is_leader());
238 // wait for lock acquired by another watcher
239 ASSERT_EQ(0, on_acquire.wait());
240 ASSERT_TRUE(leader_watcher2.is_leader());
242 leader_watcher1.shut_down();
243 leader_watcher2.shut_down();
245 ASSERT_EQ(1, listener1.acquire_count());
246 ASSERT_EQ(1, listener1.release_count());
247 ASSERT_EQ(1, listener2.acquire_count());
248 ASSERT_EQ(1, listener2.release_count());
251 TEST_F(TestLeaderWatcher, Break)
253 Listener listener1, listener2;
254 LeaderWatcher<> leader_watcher1(m_threads,
255 create_connection(true /* no heartbeats */),
257 LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2);
259 C_SaferCond on_init_acquire;
260 listener1.on_acquire(0, &on_init_acquire);
261 ASSERT_EQ(0, leader_watcher1.init());
262 ASSERT_EQ(0, on_init_acquire.wait());
264 C_SaferCond on_acquire;
265 listener2.on_acquire(0, &on_acquire);
266 ASSERT_EQ(0, leader_watcher2.init());
267 ASSERT_FALSE(leader_watcher2.is_leader());
269 // wait for lock broken due to no heartbeats and re-acquired
270 ASSERT_EQ(0, on_acquire.wait());
271 ASSERT_TRUE(leader_watcher2.is_leader());
273 leader_watcher1.shut_down();
274 leader_watcher2.shut_down();
277 TEST_F(TestLeaderWatcher, Stress)
279 const int WATCHERS_COUNT = 20;
280 std::list<LeaderWatcher<> *> leader_watchers;
283 for (int i = 0; i < WATCHERS_COUNT; i++) {
284 auto leader_watcher =
285 new LeaderWatcher<>(m_threads, create_connection(), &listener);
286 leader_watchers.push_back(leader_watcher);
289 C_SaferCond on_init_acquire;
290 listener.on_acquire(0, &on_init_acquire);
291 for (auto &leader_watcher : leader_watchers) {
292 ASSERT_EQ(0, leader_watcher->init());
294 ASSERT_EQ(0, on_init_acquire.wait());
297 C_SaferCond on_acquire;
298 listener.on_acquire(0, &on_acquire);
299 std::unique_ptr<LeaderWatcher<> > leader_watcher;
300 for (auto it = leader_watchers.begin(); it != leader_watchers.end(); ) {
301 if ((*it)->is_leader()) {
302 ASSERT_FALSE(leader_watcher);
303 leader_watcher.reset(*it);
304 it = leader_watchers.erase(it);
310 ASSERT_TRUE(leader_watcher);
311 leader_watcher->shut_down();
312 if (leader_watchers.empty()) {
315 ASSERT_EQ(0, on_acquire.wait());