Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / rbd_mirror / test_LeaderWatcher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/rados/librados.hpp"
5 #include "librbd/internal.h"
6 #include "librbd/Utils.h"
7 #include "librbd/api/Mirror.h"
8 #include "test/librbd/test_support.h"
9 #include "test/rbd_mirror/test_fixture.h"
10 #include "tools/rbd_mirror/LeaderWatcher.h"
11 #include "tools/rbd_mirror/Threads.h"
12
13 #include "test/librados/test.h"
14 #include "gtest/gtest.h"
15
16 using librbd::util::unique_lock_name;
17 using rbd::mirror::LeaderWatcher;
18
19 void register_test_leader_watcher() {
20 }
21
22 class TestLeaderWatcher : public ::rbd::mirror::TestFixture {
23 public:
24   class Listener : public rbd::mirror::LeaderWatcher<>::Listener {
25   public:
26     Listener()
27       : m_test_lock(unique_lock_name("LeaderWatcher::m_test_lock", this)) {
28     }
29
30     void on_acquire(int r, Context *ctx) {
31       Mutex::Locker locker(m_test_lock);
32       m_on_acquire_r = r;
33       m_on_acquire = ctx;
34     }
35
36     void on_release(int r, Context *ctx) {
37       Mutex::Locker locker(m_test_lock);
38       m_on_release_r = r;
39       m_on_release = ctx;
40     }
41
42     int acquire_count() const {
43       Mutex::Locker locker(m_test_lock);
44       return m_acquire_count;
45     }
46
47     int release_count() const {
48       Mutex::Locker locker(m_test_lock);
49       return m_release_count;
50     }
51
52     void post_acquire_handler(Context *on_finish) override {
53       Mutex::Locker locker(m_test_lock);
54       m_acquire_count++;
55       on_finish->complete(m_on_acquire_r);
56       m_on_acquire_r = 0;
57       if (m_on_acquire != nullptr) {
58         m_on_acquire->complete(0);
59         m_on_acquire = nullptr;
60       }
61     }
62
63     void pre_release_handler(Context *on_finish) override {
64       Mutex::Locker locker(m_test_lock);
65       m_release_count++;
66       on_finish->complete(m_on_release_r);
67       m_on_release_r = 0;
68       if (m_on_release != nullptr) {
69         m_on_release->complete(0);
70         m_on_release = nullptr;
71       }
72     }
73
74     void update_leader_handler(const std::string &leader_instance_id) override {
75     }
76
77   private:
78     mutable Mutex m_test_lock;
79     int m_acquire_count = 0;
80     int m_release_count = 0;
81     int m_on_acquire_r = 0;
82     int m_on_release_r = 0;
83     Context *m_on_acquire = nullptr;
84     Context *m_on_release = nullptr;
85   };
86
87   struct Connection {
88     librados::Rados cluster;
89     librados::IoCtx io_ctx;
90   };
91
92   std::list<std::unique_ptr<Connection> > m_connections;
93
94   void SetUp() override {
95     TestFixture::SetUp();
96     EXPECT_EQ(0, librbd::api::Mirror<>::mode_set(m_local_io_ctx,
97                                                  RBD_MIRROR_MODE_POOL));
98
99     if (is_librados_test_stub()) {
100       // speed testing up a little
101       EXPECT_EQ(0, _rados->conf_set("rbd_mirror_leader_heartbeat_interval",
102                                     "1"));
103     }
104   }
105
106   bool is_librados_test_stub() {
107     std::string fsid;
108     EXPECT_EQ(0, _rados->cluster_fsid(&fsid));
109     return fsid == "00000000-1111-2222-3333-444444444444";
110   }
111
112   librados::IoCtx &create_connection(bool no_heartbeats = false) {
113     m_connections.push_back(std::unique_ptr<Connection>(new Connection()));
114     Connection *c = m_connections.back().get();
115
116     EXPECT_EQ("", connect_cluster_pp(c->cluster));
117     if (no_heartbeats) {
118       EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval",
119                                        "3600"));
120     } else if (is_librados_test_stub()) {
121       EXPECT_EQ(0, c->cluster.conf_set("rbd_mirror_leader_heartbeat_interval",
122                                        "1"));
123     }
124     EXPECT_EQ(0, c->cluster.ioctx_create(_local_pool_name.c_str(), c->io_ctx));
125
126     return c->io_ctx;
127   }
128 };
129
130 TEST_F(TestLeaderWatcher, InitShutdown)
131 {
132   Listener listener;
133   LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
134
135   C_SaferCond on_init_acquire;
136   listener.on_acquire(0, &on_init_acquire);
137   ASSERT_EQ(0, leader_watcher.init());
138   ASSERT_EQ(0, on_init_acquire.wait());
139   ASSERT_TRUE(leader_watcher.is_leader());
140
141   leader_watcher.shut_down();
142   ASSERT_EQ(1, listener.acquire_count());
143   ASSERT_EQ(1, listener.release_count());
144   ASSERT_FALSE(leader_watcher.is_leader());
145 }
146
147 TEST_F(TestLeaderWatcher, Release)
148 {
149   Listener listener;
150   LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
151
152   C_SaferCond on_init_acquire;
153   listener.on_acquire(0, &on_init_acquire);
154   ASSERT_EQ(0, leader_watcher.init());
155   ASSERT_EQ(0, on_init_acquire.wait());
156   ASSERT_TRUE(leader_watcher.is_leader());
157
158   C_SaferCond on_release;
159   C_SaferCond on_acquire;
160   listener.on_release(0, &on_release);
161   listener.on_acquire(0, &on_acquire);
162   leader_watcher.release_leader();
163   ASSERT_EQ(0, on_release.wait());
164   ASSERT_FALSE(leader_watcher.is_leader());
165
166   // wait for lock re-acquired due to no another locker
167   ASSERT_EQ(0, on_acquire.wait());
168   ASSERT_TRUE(leader_watcher.is_leader());
169
170   C_SaferCond on_release2;
171   listener.on_release(0, &on_release2);
172   leader_watcher.release_leader();
173   ASSERT_EQ(0, on_release2.wait());
174
175   leader_watcher.shut_down();
176   ASSERT_EQ(2, listener.acquire_count());
177   ASSERT_EQ(2, listener.release_count());
178 }
179
180 TEST_F(TestLeaderWatcher, ListenerError)
181 {
182   Listener listener;
183   LeaderWatcher<> leader_watcher(m_threads, m_local_io_ctx, &listener);
184
185   // make listener return error on acquire
186   C_SaferCond on_init_acquire, on_init_release;
187   listener.on_acquire(-EINVAL, &on_init_acquire);
188   listener.on_release(0, &on_init_release);
189   ASSERT_EQ(0, leader_watcher.init());
190   ASSERT_EQ(0, on_init_acquire.wait());
191   ASSERT_EQ(0, on_init_release.wait());
192   ASSERT_FALSE(leader_watcher.is_leader());
193
194   // wait for lock re-acquired due to no another locker
195   C_SaferCond on_acquire;
196   listener.on_acquire(0, &on_acquire);
197   ASSERT_EQ(0, on_acquire.wait());
198   ASSERT_TRUE(leader_watcher.is_leader());
199
200   // make listener return error on release
201   C_SaferCond on_release;
202   listener.on_release(-EINVAL, &on_release);
203   leader_watcher.release_leader();
204   ASSERT_EQ(0, on_release.wait());
205   ASSERT_FALSE(leader_watcher.is_leader());
206
207   leader_watcher.shut_down();
208   ASSERT_EQ(2, listener.acquire_count());
209   ASSERT_EQ(2, listener.release_count());
210   ASSERT_FALSE(leader_watcher.is_leader());
211 }
212
213 TEST_F(TestLeaderWatcher, Two)
214 {
215   Listener listener1;
216   LeaderWatcher<> leader_watcher1(m_threads, create_connection(), &listener1);
217
218   C_SaferCond on_init_acquire;
219   listener1.on_acquire(0, &on_init_acquire);
220   ASSERT_EQ(0, leader_watcher1.init());
221   ASSERT_EQ(0, on_init_acquire.wait());
222
223   Listener listener2;
224   LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2);
225
226   ASSERT_EQ(0, leader_watcher2.init());
227   ASSERT_TRUE(leader_watcher1.is_leader());
228   ASSERT_FALSE(leader_watcher2.is_leader());
229
230   C_SaferCond on_release;
231   C_SaferCond on_acquire;
232   listener1.on_release(0, &on_release);
233   listener2.on_acquire(0, &on_acquire);
234   leader_watcher1.release_leader();
235   ASSERT_EQ(0, on_release.wait());
236   ASSERT_FALSE(leader_watcher1.is_leader());
237
238   // wait for lock acquired by another watcher
239   ASSERT_EQ(0, on_acquire.wait());
240   ASSERT_TRUE(leader_watcher2.is_leader());
241
242   leader_watcher1.shut_down();
243   leader_watcher2.shut_down();
244
245   ASSERT_EQ(1, listener1.acquire_count());
246   ASSERT_EQ(1, listener1.release_count());
247   ASSERT_EQ(1, listener2.acquire_count());
248   ASSERT_EQ(1, listener2.release_count());
249 }
250
251 TEST_F(TestLeaderWatcher, Break)
252 {
253   Listener listener1, listener2;
254   LeaderWatcher<> leader_watcher1(m_threads,
255                                   create_connection(true /* no heartbeats */),
256                                   &listener1);
257   LeaderWatcher<> leader_watcher2(m_threads, create_connection(), &listener2);
258
259   C_SaferCond on_init_acquire;
260   listener1.on_acquire(0, &on_init_acquire);
261   ASSERT_EQ(0, leader_watcher1.init());
262   ASSERT_EQ(0, on_init_acquire.wait());
263
264   C_SaferCond on_acquire;
265   listener2.on_acquire(0, &on_acquire);
266   ASSERT_EQ(0, leader_watcher2.init());
267   ASSERT_FALSE(leader_watcher2.is_leader());
268
269   // wait for lock broken due to no heartbeats and re-acquired
270   ASSERT_EQ(0, on_acquire.wait());
271   ASSERT_TRUE(leader_watcher2.is_leader());
272
273   leader_watcher1.shut_down();
274   leader_watcher2.shut_down();
275 }
276
277 TEST_F(TestLeaderWatcher, Stress)
278 {
279   const int WATCHERS_COUNT = 20;
280   std::list<LeaderWatcher<> *> leader_watchers;
281   Listener listener;
282
283   for (int i = 0; i < WATCHERS_COUNT; i++) {
284     auto leader_watcher =
285       new LeaderWatcher<>(m_threads, create_connection(), &listener);
286     leader_watchers.push_back(leader_watcher);
287   }
288
289   C_SaferCond on_init_acquire;
290   listener.on_acquire(0, &on_init_acquire);
291   for (auto &leader_watcher : leader_watchers) {
292     ASSERT_EQ(0, leader_watcher->init());
293   }
294   ASSERT_EQ(0, on_init_acquire.wait());
295
296   while (true) {
297     C_SaferCond on_acquire;
298     listener.on_acquire(0, &on_acquire);
299     std::unique_ptr<LeaderWatcher<> > leader_watcher;
300     for (auto it = leader_watchers.begin(); it != leader_watchers.end(); ) {
301       if ((*it)->is_leader()) {
302         ASSERT_FALSE(leader_watcher);
303         leader_watcher.reset(*it);
304         it = leader_watchers.erase(it);
305       } else {
306         it++;
307       }
308     }
309
310     ASSERT_TRUE(leader_watcher);
311     leader_watcher->shut_down();
312     if (leader_watchers.empty()) {
313       break;
314     }
315     ASSERT_EQ(0, on_acquire.wait());
316   }
317 }