Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / msgr / test_async_driver.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #ifdef __APPLE__
18 #include <AvailabilityMacros.h>
19 #endif
20
21 #include <fcntl.h>
22 #include <sys/socket.h>
23 #include <pthread.h>
24 #include <stdint.h>
25 #include <arpa/inet.h>
26 #include "include/Context.h"
27 #include "common/Mutex.h"
28 #include "common/Cond.h"
29 #include "global/global_init.h"
30 #include "common/ceph_argparse.h"
31 #include "msg/async/Event.h"
32
33 #include <atomic>
34
35 // We use epoll, kqueue, evport, select in descending order by performance.
36 #if defined(__linux__)
37 #define HAVE_EPOLL 1
38 #endif
39
40 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
41 #define HAVE_KQUEUE 1
42 #endif
43
44 #ifdef __sun
45 #include <sys/feature_tests.h>
46 #ifdef _DTRACE_VERSION
47 #define HAVE_EVPORT 1
48 #endif
49 #endif
50
51 #ifdef HAVE_EPOLL
52 #include "msg/async/EventEpoll.h"
53 #endif
54 #ifdef HAVE_KQUEUE
55 #include "msg/async/EventKqueue.h"
56 #endif
57 #include "msg/async/EventSelect.h"
58
59 #include <gtest/gtest.h>
60
61
62 #if GTEST_HAS_PARAM_TEST
63
64 class EventDriverTest : public ::testing::TestWithParam<const char*> {
65  public:
66   EventDriver *driver;
67
68   EventDriverTest(): driver(0) {}
69   void SetUp() override {
70     cerr << __func__ << " start set up " << GetParam() << std::endl;
71 #ifdef HAVE_EPOLL
72     if (strcmp(GetParam(), "epoll"))
73       driver = new EpollDriver(g_ceph_context);
74 #endif
75 #ifdef HAVE_KQUEUE
76     if (strcmp(GetParam(), "kqueue"))
77       driver = new KqueueDriver(g_ceph_context);
78 #endif
79     if (strcmp(GetParam(), "select"))
80       driver = new SelectDriver(g_ceph_context);
81     driver->init(NULL, 100);
82   }
83   void TearDown() override {
84     delete driver;
85   }
86 };
87
88 int set_nonblock(int sd)
89 {
90   int flags;
91
92   /* Set the socket nonblocking.
93    * Note that fcntl(2) for F_GETFL and F_SETFL can't be
94    * interrupted by a signal. */
95   if ((flags = fcntl(sd, F_GETFL)) < 0 ) {
96     return -1;
97   }
98   if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
99     return -1;
100   }
101   return 0;
102 }
103
104
105 TEST_P(EventDriverTest, PipeTest) {
106   int fds[2];
107   vector<FiredFileEvent> fired_events;
108   int r;
109   struct timeval tv;
110   tv.tv_sec = 0;
111   tv.tv_usec = 1;
112
113   r = pipe(fds);
114   ASSERT_EQ(r, 0);
115   r = driver->add_event(fds[0], EVENT_NONE, EVENT_READABLE);
116   ASSERT_EQ(r, 0);
117   r = driver->event_wait(fired_events, &tv);
118   ASSERT_EQ(r, 0);
119
120   char c = 'A';
121   r = write(fds[1], &c, sizeof(c));
122   ASSERT_EQ(r, 1);
123   r = driver->event_wait(fired_events, &tv);
124   ASSERT_EQ(r, 1);
125   ASSERT_EQ(fired_events[0].fd, fds[0]);
126
127
128   fired_events.clear();
129   r = write(fds[1], &c, sizeof(c));
130   ASSERT_EQ(r, 1);
131   r = driver->event_wait(fired_events, &tv);
132   ASSERT_EQ(r, 1);
133   ASSERT_EQ(fired_events[0].fd, fds[0]);
134
135   fired_events.clear();
136   driver->del_event(fds[0], EVENT_READABLE, EVENT_READABLE);
137   r = write(fds[1], &c, sizeof(c));
138   ASSERT_EQ(r, 1);
139   r = driver->event_wait(fired_events, &tv);
140   ASSERT_EQ(r, 0);
141 }
142
143 void* echoclient(void *arg)
144 {
145   intptr_t port = (intptr_t)arg;
146   struct sockaddr_in sa;
147   memset(&sa, 0, sizeof(sa));
148   sa.sin_family = AF_INET;
149   sa.sin_port = htons(port);
150   char addr[] = "127.0.0.1";
151   int r = inet_pton(AF_INET, addr, &sa.sin_addr);
152   assert(r == 1);
153
154   int connect_sd = ::socket(AF_INET, SOCK_STREAM, 0);
155   if (connect_sd >= 0) {
156     r = connect(connect_sd, (struct sockaddr*)&sa, sizeof(sa));
157     assert(r == 0);
158     int t = 0;
159   
160     do {
161       char c[] = "banner";
162       r = write(connect_sd, c, sizeof(c));
163       char d[100];
164       r = read(connect_sd, d, sizeof(d));
165       if (r == 0)
166         break;
167       if (t++ == 30)
168         break;
169     } while (1);
170     ::close(connect_sd);
171   }
172   return 0;
173 }
174
175 TEST_P(EventDriverTest, NetworkSocketTest) {
176   int listen_sd = ::socket(AF_INET, SOCK_STREAM, 0);
177   ASSERT_TRUE(listen_sd > 0);
178   int on = 1;
179   int r = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
180   ASSERT_EQ(r, 0);
181   r = set_nonblock(listen_sd);
182   ASSERT_EQ(r, 0);
183   struct sockaddr_in sa;
184   long port = 0;
185   for (port = 38788; port < 40000; port++) {
186     memset(&sa,0,sizeof(sa));
187     sa.sin_family = AF_INET;
188     sa.sin_port = htons(port);
189     sa.sin_addr.s_addr = htonl(INADDR_ANY);
190
191     r = ::bind(listen_sd, (struct sockaddr *)&sa, sizeof(sa));
192     if (r == 0) {
193       break;
194     }
195   }
196   ASSERT_EQ(r, 0);
197   r = listen(listen_sd, 511);
198   ASSERT_EQ(r, 0);
199
200   vector<FiredFileEvent> fired_events;
201   struct timeval tv;
202   tv.tv_sec = 0;
203   tv.tv_usec = 1;
204   r = driver->add_event(listen_sd, EVENT_NONE, EVENT_READABLE);
205   ASSERT_EQ(r, 0);
206   r = driver->event_wait(fired_events, &tv);
207   ASSERT_EQ(r, 0);
208
209   fired_events.clear();
210   pthread_t thread1;
211   r = pthread_create(&thread1, NULL, echoclient, (void*)(intptr_t)port);
212   ASSERT_EQ(r, 0);
213   tv.tv_sec = 5;
214   tv.tv_usec = 0;
215   r = driver->event_wait(fired_events, &tv);
216   ASSERT_EQ(r, 1);
217   ASSERT_EQ(fired_events[0].fd, listen_sd);
218
219   fired_events.clear();
220   int client_sd = ::accept(listen_sd, NULL, NULL);
221   ASSERT_TRUE(client_sd > 0);
222   r = driver->add_event(client_sd, EVENT_NONE, EVENT_READABLE);
223   ASSERT_EQ(r, 0);
224
225   do {
226     fired_events.clear();
227     tv.tv_sec = 5;
228     tv.tv_usec = 0;
229     r = driver->event_wait(fired_events, &tv);
230     ASSERT_EQ(1, r);
231     ASSERT_EQ(EVENT_READABLE, fired_events[0].mask);
232
233     fired_events.clear();
234     char data[100];
235     r = ::read(client_sd, data, sizeof(data));
236     if (r == 0)
237       break;
238     ASSERT_GT(r, 0);
239     r = driver->add_event(client_sd, EVENT_READABLE, EVENT_WRITABLE);
240     ASSERT_EQ(0, r);
241     r = driver->event_wait(fired_events, &tv);
242     ASSERT_EQ(1, r);
243     ASSERT_EQ(fired_events[0].mask, EVENT_WRITABLE);
244     r = write(client_sd, data, strlen(data));
245     ASSERT_EQ((int)strlen(data), r);
246     driver->del_event(client_sd, EVENT_READABLE|EVENT_WRITABLE,
247                       EVENT_WRITABLE);
248   } while (1);
249
250   ::close(client_sd);
251   ::close(listen_sd);
252 }
253
254 class FakeEvent : public EventCallback {
255
256  public:
257   void do_request(int fd_or_id) override {}
258 };
259
260 TEST(EventCenterTest, FileEventExpansion) {
261   vector<int> sds;
262   EventCenter center(g_ceph_context);
263   center.init(100, 0, "posix");
264   center.set_owner();
265   EventCallbackRef e(new FakeEvent());
266   for (int i = 0; i < 300; i++) {
267     int sd = ::socket(AF_INET, SOCK_STREAM, 0);
268     center.create_file_event(sd, EVENT_READABLE, e);
269     sds.push_back(sd);
270   }
271
272   for (vector<int>::iterator it = sds.begin(); it != sds.end(); ++it)
273     center.delete_file_event(*it, EVENT_READABLE);
274 }
275
276
277 class Worker : public Thread {
278   CephContext *cct;
279   bool done;
280
281  public:
282   EventCenter center;
283   explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) {
284     center.init(100, idx, "posix");
285   }
286   void stop() {
287     done = true; 
288     center.wakeup();
289   }
290   void* entry() override {
291     center.set_owner();
292     while (!done)
293       center.process_events(1000000);
294     return 0;
295   }
296 };
297
298 class CountEvent: public EventCallback {
299   std::atomic<unsigned> *count;
300   Mutex *lock;
301   Cond *cond;
302
303  public:
304   CountEvent(std::atomic<unsigned> *atomic, Mutex *l, Cond *c): count(atomic), lock(l), cond(c) {}
305   void do_request(int id) override {
306     lock->Lock();
307     (*count)--;
308     cond->Signal();
309     lock->Unlock();
310   }
311 };
312
313 TEST(EventCenterTest, DispatchTest) {
314   Worker worker1(g_ceph_context, 1), worker2(g_ceph_context, 2);
315   std::atomic<unsigned> count = { 0 };
316   Mutex lock("DispatchTest::lock");
317   Cond cond;
318   worker1.create("worker_1");
319   worker2.create("worker_2");
320   for (int i = 0; i < 10000; ++i) {
321     count++;
322     worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
323     count++;
324     worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
325     Mutex::Locker l(lock);
326     while (count)
327       cond.Wait(lock);
328   }
329   worker1.stop();
330   worker2.stop();
331   worker1.join();
332   worker2.join();
333 }
334
335 INSTANTIATE_TEST_CASE_P(
336   AsyncMessenger,
337   EventDriverTest,
338   ::testing::Values(
339 #ifdef HAVE_EPOLL
340     "epoll",
341 #endif
342 #ifdef HAVE_KQUEUE
343     "kqueue",
344 #endif
345     "select"
346   )
347 );
348
349 #else
350
351 // Google Test may not support value-parameterized tests with some
352 // compilers. If we use conditional compilation to compile out all
353 // code referring to the gtest_main library, MSVC linker will not link
354 // that library at all and consequently complain about missing entry
355 // point defined in that library (fatal error LNK1561: entry point
356 // must be defined). This dummy test keeps gtest_main linked in.
357 TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
358
359 #endif
360
361
362 /*
363  * Local Variables:
364  * compile-command: "cd ../.. ; make ceph_test_async_driver && 
365  *    ./ceph_test_async_driver
366  *
367  * End:
368  */