1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
18 #include <AvailabilityMacros.h>
22 #include <sys/socket.h>
25 #include <arpa/inet.h>
26 #include "include/Context.h"
27 #include "common/Mutex.h"
28 #include "common/Cond.h"
29 #include "global/global_init.h"
30 #include "common/ceph_argparse.h"
31 #include "msg/async/Event.h"
35 // We use epoll, kqueue, evport, select in descending order by performance.
36 #if defined(__linux__)
40 #if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
45 #include <sys/feature_tests.h>
46 #ifdef _DTRACE_VERSION
52 #include "msg/async/EventEpoll.h"
55 #include "msg/async/EventKqueue.h"
57 #include "msg/async/EventSelect.h"
59 #include <gtest/gtest.h>
62 #if GTEST_HAS_PARAM_TEST
64 class EventDriverTest : public ::testing::TestWithParam<const char*> {
68 EventDriverTest(): driver(0) {}
69 void SetUp() override {
70 cerr << __func__ << " start set up " << GetParam() << std::endl;
72 if (strcmp(GetParam(), "epoll"))
73 driver = new EpollDriver(g_ceph_context);
76 if (strcmp(GetParam(), "kqueue"))
77 driver = new KqueueDriver(g_ceph_context);
79 if (strcmp(GetParam(), "select"))
80 driver = new SelectDriver(g_ceph_context);
81 driver->init(NULL, 100);
83 void TearDown() override {
88 int set_nonblock(int sd)
92 /* Set the socket nonblocking.
93 * Note that fcntl(2) for F_GETFL and F_SETFL can't be
94 * interrupted by a signal. */
95 if ((flags = fcntl(sd, F_GETFL)) < 0 ) {
98 if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
105 TEST_P(EventDriverTest, PipeTest) {
107 vector<FiredFileEvent> fired_events;
115 r = driver->add_event(fds[0], EVENT_NONE, EVENT_READABLE);
117 r = driver->event_wait(fired_events, &tv);
121 r = write(fds[1], &c, sizeof(c));
123 r = driver->event_wait(fired_events, &tv);
125 ASSERT_EQ(fired_events[0].fd, fds[0]);
128 fired_events.clear();
129 r = write(fds[1], &c, sizeof(c));
131 r = driver->event_wait(fired_events, &tv);
133 ASSERT_EQ(fired_events[0].fd, fds[0]);
135 fired_events.clear();
136 driver->del_event(fds[0], EVENT_READABLE, EVENT_READABLE);
137 r = write(fds[1], &c, sizeof(c));
139 r = driver->event_wait(fired_events, &tv);
143 void* echoclient(void *arg)
145 intptr_t port = (intptr_t)arg;
146 struct sockaddr_in sa;
147 memset(&sa, 0, sizeof(sa));
148 sa.sin_family = AF_INET;
149 sa.sin_port = htons(port);
150 char addr[] = "127.0.0.1";
151 int r = inet_pton(AF_INET, addr, &sa.sin_addr);
154 int connect_sd = ::socket(AF_INET, SOCK_STREAM, 0);
155 if (connect_sd >= 0) {
156 r = connect(connect_sd, (struct sockaddr*)&sa, sizeof(sa));
162 r = write(connect_sd, c, sizeof(c));
164 r = read(connect_sd, d, sizeof(d));
175 TEST_P(EventDriverTest, NetworkSocketTest) {
176 int listen_sd = ::socket(AF_INET, SOCK_STREAM, 0);
177 ASSERT_TRUE(listen_sd > 0);
179 int r = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
181 r = set_nonblock(listen_sd);
183 struct sockaddr_in sa;
185 for (port = 38788; port < 40000; port++) {
186 memset(&sa,0,sizeof(sa));
187 sa.sin_family = AF_INET;
188 sa.sin_port = htons(port);
189 sa.sin_addr.s_addr = htonl(INADDR_ANY);
191 r = ::bind(listen_sd, (struct sockaddr *)&sa, sizeof(sa));
197 r = listen(listen_sd, 511);
200 vector<FiredFileEvent> fired_events;
204 r = driver->add_event(listen_sd, EVENT_NONE, EVENT_READABLE);
206 r = driver->event_wait(fired_events, &tv);
209 fired_events.clear();
211 r = pthread_create(&thread1, NULL, echoclient, (void*)(intptr_t)port);
215 r = driver->event_wait(fired_events, &tv);
217 ASSERT_EQ(fired_events[0].fd, listen_sd);
219 fired_events.clear();
220 int client_sd = ::accept(listen_sd, NULL, NULL);
221 ASSERT_TRUE(client_sd > 0);
222 r = driver->add_event(client_sd, EVENT_NONE, EVENT_READABLE);
226 fired_events.clear();
229 r = driver->event_wait(fired_events, &tv);
231 ASSERT_EQ(EVENT_READABLE, fired_events[0].mask);
233 fired_events.clear();
235 r = ::read(client_sd, data, sizeof(data));
239 r = driver->add_event(client_sd, EVENT_READABLE, EVENT_WRITABLE);
241 r = driver->event_wait(fired_events, &tv);
243 ASSERT_EQ(fired_events[0].mask, EVENT_WRITABLE);
244 r = write(client_sd, data, strlen(data));
245 ASSERT_EQ((int)strlen(data), r);
246 driver->del_event(client_sd, EVENT_READABLE|EVENT_WRITABLE,
254 class FakeEvent : public EventCallback {
257 void do_request(int fd_or_id) override {}
260 TEST(EventCenterTest, FileEventExpansion) {
262 EventCenter center(g_ceph_context);
263 center.init(100, 0, "posix");
265 EventCallbackRef e(new FakeEvent());
266 for (int i = 0; i < 300; i++) {
267 int sd = ::socket(AF_INET, SOCK_STREAM, 0);
268 center.create_file_event(sd, EVENT_READABLE, e);
272 for (vector<int>::iterator it = sds.begin(); it != sds.end(); ++it)
273 center.delete_file_event(*it, EVENT_READABLE);
277 class Worker : public Thread {
283 explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) {
284 center.init(100, idx, "posix");
290 void* entry() override {
293 center.process_events(1000000);
298 class CountEvent: public EventCallback {
299 std::atomic<unsigned> *count;
304 CountEvent(std::atomic<unsigned> *atomic, Mutex *l, Cond *c): count(atomic), lock(l), cond(c) {}
305 void do_request(int id) override {
313 TEST(EventCenterTest, DispatchTest) {
314 Worker worker1(g_ceph_context, 1), worker2(g_ceph_context, 2);
315 std::atomic<unsigned> count = { 0 };
316 Mutex lock("DispatchTest::lock");
318 worker1.create("worker_1");
319 worker2.create("worker_2");
320 for (int i = 0; i < 10000; ++i) {
322 worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
324 worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
325 Mutex::Locker l(lock);
335 INSTANTIATE_TEST_CASE_P(
351 // Google Test may not support value-parameterized tests with some
352 // compilers. If we use conditional compilation to compile out all
353 // code referring to the gtest_main library, MSVC linker will not link
354 // that library at all and consequently complain about missing entry
355 // point defined in that library (fatal error LNK1561: entry point
356 // must be defined). This dummy test keeps gtest_main linked in.
357 TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
364 * compile-command: "cd ../.. ; make ceph_test_async_driver &&
365 * ./ceph_test_async_driver