Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / direct_messenger / test_direct_messenger.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <condition_variable>
5 #include <mutex>
6 #include <thread>
7
8 #include <gtest/gtest.h>
9
10 #include "global/global_init.h"
11 #include "common/ceph_argparse.h"
12
13 #include "DirectMessenger.h"
14 #include "msg/FastStrategy.h"
15 #include "msg/QueueStrategy.h"
16 #include "messages/MPing.h"
17
18
19 /// mock dispatcher that calls the given callback
20 class MockDispatcher : public Dispatcher {
21   std::function<void(Message*)> callback;
22  public:
23   MockDispatcher(CephContext *cct, std::function<void(Message*)> callback)
24     : Dispatcher(cct), callback(std::move(callback)) {}
25   bool ms_handle_reset(Connection *con) override { return false; }
26   void ms_handle_remote_reset(Connection *con) override {}
27   bool ms_handle_refused(Connection *con) override { return false; }
28   bool ms_dispatch(Message *m) override {
29     callback(m);
30     m->put();
31     return true;
32   }
33 };
34
35 /// test synchronous dispatch of messenger and connection interfaces
36 TEST(DirectMessenger, SyncDispatch)
37 {
38   auto cct = g_ceph_context;
39
40   // use FastStrategy for synchronous dispatch
41   DirectMessenger client(cct, entity_name_t::CLIENT(1),
42                          "client", 0, new FastStrategy());
43   DirectMessenger server(cct, entity_name_t::CLIENT(2),
44                          "server", 0, new FastStrategy());
45
46   ASSERT_EQ(0, client.set_direct_peer(&server));
47   ASSERT_EQ(0, server.set_direct_peer(&client));
48
49   bool got_request = false;
50   bool got_reply = false;
51
52   MockDispatcher client_dispatcher(cct, [&] (Message *m) {
53     got_reply = true;
54   });
55   client.add_dispatcher_head(&client_dispatcher);
56
57   MockDispatcher server_dispatcher(cct, [&] (Message *m) {
58     got_request = true;
59     ASSERT_EQ(0, m->get_connection()->send_message(new MPing()));
60   });
61   server.add_dispatcher_head(&server_dispatcher);
62
63   ASSERT_EQ(0, client.start());
64   ASSERT_EQ(0, server.start());
65
66   // test DirectMessenger::send_message()
67   ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst()));
68   ASSERT_TRUE(got_request);
69   ASSERT_TRUE(got_reply);
70
71   // test DirectConnection::send_message()
72   {
73     got_request = false;
74     got_reply = false;
75     auto conn = client.get_connection(server.get_myinst());
76     ASSERT_EQ(0, conn->send_message(new MPing()));
77     ASSERT_TRUE(got_request);
78     ASSERT_TRUE(got_reply);
79   }
80
81   // test DirectMessenger::send_message() with loopback address
82   got_request = false;
83   got_reply = false;
84   ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst()));
85   ASSERT_FALSE(got_request); // server should never see this
86   ASSERT_TRUE(got_reply);
87
88   // test DirectConnection::send_message() with loopback address
89   {
90     got_request = false;
91     got_reply = false;
92     auto conn = client.get_connection(client.get_myinst());
93     ASSERT_EQ(0, conn->send_message(new MPing()));
94     ASSERT_FALSE(got_request); // server should never see this
95     ASSERT_TRUE(got_reply);
96   }
97
98   // test DirectConnection::send_message() with loopback connection
99   {
100     got_request = false;
101     got_reply = false;
102     auto conn = client.get_loopback_connection();
103     ASSERT_EQ(0, conn->send_message(new MPing()));
104     ASSERT_FALSE(got_request); // server should never see this
105     ASSERT_TRUE(got_reply);
106   }
107
108   ASSERT_EQ(0, client.shutdown());
109   client.wait();
110
111   ASSERT_EQ(0, server.shutdown());
112   server.wait();
113 }
114
115 /// test asynchronous dispatch of messenger and connection interfaces
116 TEST(DirectMessenger, AsyncDispatch)
117 {
118   auto cct = g_ceph_context;
119
120   // use QueueStrategy for async replies
121   DirectMessenger client(cct, entity_name_t::CLIENT(1),
122                          "client", 0, new QueueStrategy(1));
123   DirectMessenger server(cct, entity_name_t::CLIENT(2),
124                          "server", 0, new FastStrategy());
125
126   ASSERT_EQ(0, client.set_direct_peer(&server));
127   ASSERT_EQ(0, server.set_direct_peer(&client));
128
129   // condition variable to wait on ping reply
130   std::mutex mutex;
131   std::condition_variable cond;
132   bool done = false;
133
134   auto wait_for_reply = [&] {
135     std::unique_lock<std::mutex> lock(mutex);
136     while (!done) {
137       cond.wait(lock);
138     }
139     done = false; // clear for reuse
140   };
141
142   // client dispatcher signals the condition variable on reply
143   MockDispatcher client_dispatcher(cct, [&] (Message *m) {
144     std::lock_guard<std::mutex> lock(mutex);
145     done = true;
146     cond.notify_one();
147   });
148   client.add_dispatcher_head(&client_dispatcher);
149
150   MockDispatcher server_dispatcher(cct, [&] (Message *m) {
151     // hold the lock over the call to send_message() to prove that the client's
152     // dispatch is asynchronous. if it isn't, it will deadlock
153     std::lock_guard<std::mutex> lock(mutex);
154     ASSERT_EQ(0, m->get_connection()->send_message(new MPing()));
155   });
156   server.add_dispatcher_head(&server_dispatcher);
157
158   ASSERT_EQ(0, client.start());
159   ASSERT_EQ(0, server.start());
160
161   // test DirectMessenger::send_message()
162   ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst()));
163   wait_for_reply();
164
165   // test DirectConnection::send_message()
166   {
167     auto conn = client.get_connection(server.get_myinst());
168     ASSERT_EQ(0, conn->send_message(new MPing()));
169   }
170   wait_for_reply();
171
172   // test DirectMessenger::send_message() with loopback address
173   {
174     // hold the lock to test that loopback dispatch is asynchronous
175     std::lock_guard<std::mutex> lock(mutex);
176     ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst()));
177   }
178   wait_for_reply();
179
180   // test DirectConnection::send_message() with loopback address
181   {
182     auto conn = client.get_connection(client.get_myinst());
183     // hold the lock to test that loopback dispatch is asynchronous
184     std::lock_guard<std::mutex> lock(mutex);
185     ASSERT_EQ(0, conn->send_message(new MPing()));
186   }
187   wait_for_reply();
188
189   // test DirectConnection::send_message() with loopback connection
190   {
191     auto conn = client.get_loopback_connection();
192     // hold the lock to test that loopback dispatch is asynchronous
193     std::lock_guard<std::mutex> lock(mutex);
194     ASSERT_EQ(0, conn->send_message(new MPing()));
195   }
196   wait_for_reply();
197
198   ASSERT_EQ(0, client.shutdown());
199   client.wait();
200
201   ASSERT_EQ(0, server.shutdown());
202   server.wait();
203 }
204
205 /// test that wait() blocks until shutdown()
206 TEST(DirectMessenger, WaitShutdown)
207 {
208   auto cct = g_ceph_context;
209
210   // test wait() with both Queue- and FastStrategy
211   DirectMessenger client(cct, entity_name_t::CLIENT(1),
212                          "client", 0, new QueueStrategy(1));
213   DirectMessenger server(cct, entity_name_t::CLIENT(2),
214                          "server", 0, new FastStrategy());
215
216   ASSERT_EQ(0, client.set_direct_peer(&server));
217   ASSERT_EQ(0, server.set_direct_peer(&client));
218
219   ASSERT_EQ(0, client.start());
220   ASSERT_EQ(0, server.start());
221
222   std::atomic<bool> client_waiting{false};
223   std::atomic<bool> server_waiting{false};
224
225   // spawn threads to wait() on each of the messengers
226   std::thread client_thread([&] {
227     client_waiting = true;
228     client.wait();
229     client_waiting = false;
230   });
231   std::thread server_thread([&] {
232     server_waiting = true;
233     server.wait();
234     server_waiting = false;
235   });
236
237   // give them time to start
238   std::this_thread::sleep_for(std::chrono::milliseconds(50));
239
240   ASSERT_TRUE(client_waiting);
241   ASSERT_TRUE(server_waiting);
242
243   // call shutdown to unblock the waiting threads
244   ASSERT_EQ(0, client.shutdown());
245   ASSERT_EQ(0, server.shutdown());
246
247   client_thread.join();
248   server_thread.join();
249
250   ASSERT_FALSE(client_waiting);
251   ASSERT_FALSE(server_waiting);
252 }
253
254 /// test connection and messenger interfaces after mark_down()
255 TEST(DirectMessenger, MarkDown)
256 {
257   auto cct = g_ceph_context;
258
259   DirectMessenger client(cct, entity_name_t::CLIENT(1),
260                          "client", 0, new FastStrategy());
261   DirectMessenger server(cct, entity_name_t::CLIENT(2),
262                          "server", 0, new FastStrategy());
263
264   ASSERT_EQ(0, client.set_direct_peer(&server));
265   ASSERT_EQ(0, server.set_direct_peer(&client));
266
267   ASSERT_EQ(0, client.start());
268   ASSERT_EQ(0, server.start());
269
270   auto client_to_server = client.get_connection(server.get_myinst());
271   auto server_to_client = server.get_connection(client.get_myinst());
272
273   ASSERT_TRUE(client_to_server->is_connected());
274   ASSERT_TRUE(server_to_client->is_connected());
275
276   // mark_down() breaks the connection on both sides
277   client_to_server->mark_down();
278
279   ASSERT_FALSE(client_to_server->is_connected());
280   ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing()));
281   ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst()));
282
283   ASSERT_FALSE(server_to_client->is_connected());
284   ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing()));
285   ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client.get_myinst()));
286
287   ASSERT_EQ(0, client.shutdown());
288   client.wait();
289
290   ASSERT_EQ(0, server.shutdown());
291   server.wait();
292 }
293
294 /// test connection and messenger interfaces after shutdown()
295 TEST(DirectMessenger, SendShutdown)
296 {
297   auto cct = g_ceph_context;
298
299   // put client on the heap so we can free it early
300   std::unique_ptr<DirectMessenger> client{
301     new DirectMessenger(cct, entity_name_t::CLIENT(1),
302                         "client", 0, new FastStrategy())};
303   DirectMessenger server(cct, entity_name_t::CLIENT(2),
304                          "server", 0, new FastStrategy());
305
306   ASSERT_EQ(0, client->set_direct_peer(&server));
307   ASSERT_EQ(0, server.set_direct_peer(client.get()));
308
309   ASSERT_EQ(0, client->start());
310   ASSERT_EQ(0, server.start());
311
312   const auto client_inst = client->get_myinst();
313   const auto server_inst = server.get_myinst();
314
315   auto client_to_server = client->get_connection(server_inst);
316   auto server_to_client = server.get_connection(client_inst);
317
318   ASSERT_TRUE(client_to_server->is_connected());
319   ASSERT_TRUE(server_to_client->is_connected());
320
321   // shut down the client to break connections
322   ASSERT_EQ(0, client->shutdown());
323   client->wait();
324
325   ASSERT_FALSE(client_to_server->is_connected());
326   ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing()));
327   ASSERT_EQ(-ENOTCONN, client->send_message(new MPing(), server_inst));
328
329   // free the client connection/messenger to test that calls to the server no
330   // longer try to dereference them
331   client_to_server.reset();
332   client.reset();
333
334   ASSERT_FALSE(server_to_client->is_connected());
335   ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing()));
336   ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client_inst));
337
338   ASSERT_EQ(0, server.shutdown());
339   server.wait();
340 }
341
342 /// test connection and messenger interfaces after bind()
343 TEST(DirectMessenger, Bind)
344 {
345   auto cct = g_ceph_context;
346
347   DirectMessenger client(cct, entity_name_t::CLIENT(1),
348                          "client", 0, new FastStrategy());
349   DirectMessenger server(cct, entity_name_t::CLIENT(2),
350                          "server", 0, new FastStrategy());
351
352   entity_addr_t client_addr;
353   client_addr.set_family(AF_INET);
354   client_addr.set_port(1);
355
356   // client bind succeeds before set_direct_peer()
357   ASSERT_EQ(0, client.bind(client_addr));
358
359   ASSERT_EQ(0, client.set_direct_peer(&server));
360   ASSERT_EQ(0, server.set_direct_peer(&client));
361
362   // server bind fails after set_direct_peer()
363   entity_addr_t empty_addr;
364   ASSERT_EQ(-EINVAL, server.bind(empty_addr));
365
366   ASSERT_EQ(0, client.start());
367   ASSERT_EQ(0, server.start());
368
369   auto client_to_server = client.get_connection(server.get_myinst());
370   auto server_to_client = server.get_connection(client.get_myinst());
371
372   ASSERT_TRUE(client_to_server->is_connected());
373   ASSERT_TRUE(server_to_client->is_connected());
374
375   // no address in connection to server
376   ASSERT_EQ(empty_addr, client_to_server->get_peer_addr());
377   // bind address is reflected in connection to client
378   ASSERT_EQ(client_addr, server_to_client->get_peer_addr());
379
380   // mark_down() with bind address breaks the connection
381   server.mark_down(client_addr);
382
383   ASSERT_FALSE(client_to_server->is_connected());
384   ASSERT_FALSE(server_to_client->is_connected());
385
386   ASSERT_EQ(0, client.shutdown());
387   client.wait();
388
389   ASSERT_EQ(0, server.shutdown());
390   server.wait();
391 }
392
393 /// test connection and messenger interfaces before calls to set_direct_peer()
394 TEST(DirectMessenger, StartWithoutPeer)
395 {
396   auto cct = g_ceph_context;
397
398   DirectMessenger client(cct, entity_name_t::CLIENT(1),
399                          "client", 0, new FastStrategy());
400   DirectMessenger server(cct, entity_name_t::CLIENT(2),
401                          "server", 0, new FastStrategy());
402
403   // can't start until set_direct_peer()
404   ASSERT_EQ(-EINVAL, client.start());
405   ASSERT_EQ(-EINVAL, server.start());
406
407   ASSERT_EQ(0, client.set_direct_peer(&server));
408
409   // only client can start
410   ASSERT_EQ(0, client.start());
411   ASSERT_EQ(-EINVAL, server.start());
412
413   // client has a connection but can't send
414   auto conn = client.get_connection(server.get_myinst());
415   ASSERT_NE(nullptr, conn);
416   ASSERT_FALSE(conn->is_connected());
417   ASSERT_EQ(-ENOTCONN, conn->send_message(new MPing()));
418   ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst()));
419
420   ASSERT_EQ(0, client.shutdown());
421   client.wait();
422 }
423
424 int main(int argc, char **argv)
425 {
426   // command-line arguments
427   vector<const char*> args;
428   argv_to_vec(argc, (const char **)argv, args);
429   env_to_vec(args);
430
431   auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_ANY,
432                          CODE_ENVIRONMENT_DAEMON, 0);
433   common_init_finish(cct.get());
434
435   ::testing::InitGoogleTest(&argc, argv);
436   return RUN_ALL_TESTS();
437 }