X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftest%2Fmsgr%2Ftest_async_driver.cc;fp=src%2Fceph%2Fsrc%2Ftest%2Fmsgr%2Ftest_async_driver.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=08399dd47d7a65ea4d0e7722aea79599141bca0b;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/test/msgr/test_async_driver.cc b/src/ceph/src/test/msgr/test_async_driver.cc deleted file mode 100644 index 08399dd..0000000 --- a/src/ceph/src/test/msgr/test_async_driver.cc +++ /dev/null @@ -1,368 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2014 UnitedStack - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifdef __APPLE__ -#include -#endif - -#include -#include -#include -#include -#include -#include "include/Context.h" -#include "common/Mutex.h" -#include "common/Cond.h" -#include "global/global_init.h" -#include "common/ceph_argparse.h" -#include "msg/async/Event.h" - -#include - -// We use epoll, kqueue, evport, select in descending order by performance. -#if defined(__linux__) -#define HAVE_EPOLL 1 -#endif - -#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__) -#define HAVE_KQUEUE 1 -#endif - -#ifdef __sun -#include -#ifdef _DTRACE_VERSION -#define HAVE_EVPORT 1 -#endif -#endif - -#ifdef HAVE_EPOLL -#include "msg/async/EventEpoll.h" -#endif -#ifdef HAVE_KQUEUE -#include "msg/async/EventKqueue.h" -#endif -#include "msg/async/EventSelect.h" - -#include - - -#if GTEST_HAS_PARAM_TEST - -class EventDriverTest : public ::testing::TestWithParam { - public: - EventDriver *driver; - - EventDriverTest(): driver(0) {} - void SetUp() override { - cerr << __func__ << " start set up " << GetParam() << std::endl; -#ifdef HAVE_EPOLL - if (strcmp(GetParam(), "epoll")) - driver = new EpollDriver(g_ceph_context); -#endif -#ifdef HAVE_KQUEUE - if (strcmp(GetParam(), "kqueue")) - driver = new KqueueDriver(g_ceph_context); -#endif - if (strcmp(GetParam(), "select")) - driver = new SelectDriver(g_ceph_context); - driver->init(NULL, 100); - } - void TearDown() override { - delete driver; - } -}; - -int set_nonblock(int sd) -{ - int flags; - - /* Set the socket nonblocking. - * Note that fcntl(2) for F_GETFL and F_SETFL can't be - * interrupted by a signal. */ - if ((flags = fcntl(sd, F_GETFL)) < 0 ) { - return -1; - } - if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) { - return -1; - } - return 0; -} - - -TEST_P(EventDriverTest, PipeTest) { - int fds[2]; - vector fired_events; - int r; - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 1; - - r = pipe(fds); - ASSERT_EQ(r, 0); - r = driver->add_event(fds[0], EVENT_NONE, EVENT_READABLE); - ASSERT_EQ(r, 0); - r = driver->event_wait(fired_events, &tv); - ASSERT_EQ(r, 0); - - char c = 'A'; - r = write(fds[1], &c, sizeof(c)); - ASSERT_EQ(r, 1); - r = driver->event_wait(fired_events, &tv); - ASSERT_EQ(r, 1); - ASSERT_EQ(fired_events[0].fd, fds[0]); - - - fired_events.clear(); - r = write(fds[1], &c, sizeof(c)); - ASSERT_EQ(r, 1); - r = driver->event_wait(fired_events, &tv); - ASSERT_EQ(r, 1); - ASSERT_EQ(fired_events[0].fd, fds[0]); - - fired_events.clear(); - driver->del_event(fds[0], EVENT_READABLE, EVENT_READABLE); - r = write(fds[1], &c, sizeof(c)); - ASSERT_EQ(r, 1); - r = driver->event_wait(fired_events, &tv); - ASSERT_EQ(r, 0); -} - -void* echoclient(void *arg) -{ - intptr_t port = (intptr_t)arg; - struct sockaddr_in sa; - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_port = htons(port); - char addr[] = "127.0.0.1"; - int r = inet_pton(AF_INET, addr, &sa.sin_addr); - assert(r == 1); - - int connect_sd = ::socket(AF_INET, SOCK_STREAM, 0); - if (connect_sd >= 0) { - r = connect(connect_sd, (struct sockaddr*)&sa, sizeof(sa)); - assert(r == 0); - int t = 0; - - do { - char c[] = "banner"; - r = write(connect_sd, c, sizeof(c)); - char d[100]; - r = read(connect_sd, d, sizeof(d)); - if (r == 0) - break; - if (t++ == 30) - break; - } while (1); - ::close(connect_sd); - } - return 0; -} - -TEST_P(EventDriverTest, NetworkSocketTest) { - int listen_sd = ::socket(AF_INET, SOCK_STREAM, 0); - ASSERT_TRUE(listen_sd > 0); - int on = 1; - int r = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); - ASSERT_EQ(r, 0); - r = set_nonblock(listen_sd); - ASSERT_EQ(r, 0); - struct sockaddr_in sa; - long port = 0; - for (port = 38788; port < 40000; port++) { - memset(&sa,0,sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_port = htons(port); - sa.sin_addr.s_addr = htonl(INADDR_ANY); - - r = ::bind(listen_sd, (struct sockaddr *)&sa, sizeof(sa)); - if (r == 0) { - break; - } - } - ASSERT_EQ(r, 0); - r = listen(listen_sd, 511); - ASSERT_EQ(r, 0); - - vector fired_events; - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 1; - r = driver->add_event(listen_sd, EVENT_NONE, EVENT_READABLE); - ASSERT_EQ(r, 0); - r = driver->event_wait(fired_events, &tv); - ASSERT_EQ(r, 0); - - fired_events.clear(); - pthread_t thread1; - r = pthread_create(&thread1, NULL, echoclient, (void*)(intptr_t)port); - ASSERT_EQ(r, 0); - tv.tv_sec = 5; - tv.tv_usec = 0; - r = driver->event_wait(fired_events, &tv); - ASSERT_EQ(r, 1); - ASSERT_EQ(fired_events[0].fd, listen_sd); - - fired_events.clear(); - int client_sd = ::accept(listen_sd, NULL, NULL); - ASSERT_TRUE(client_sd > 0); - r = driver->add_event(client_sd, EVENT_NONE, EVENT_READABLE); - ASSERT_EQ(r, 0); - - do { - fired_events.clear(); - tv.tv_sec = 5; - tv.tv_usec = 0; - r = driver->event_wait(fired_events, &tv); - ASSERT_EQ(1, r); - ASSERT_EQ(EVENT_READABLE, fired_events[0].mask); - - fired_events.clear(); - char data[100]; - r = ::read(client_sd, data, sizeof(data)); - if (r == 0) - break; - ASSERT_GT(r, 0); - r = driver->add_event(client_sd, EVENT_READABLE, EVENT_WRITABLE); - ASSERT_EQ(0, r); - r = driver->event_wait(fired_events, &tv); - ASSERT_EQ(1, r); - ASSERT_EQ(fired_events[0].mask, EVENT_WRITABLE); - r = write(client_sd, data, strlen(data)); - ASSERT_EQ((int)strlen(data), r); - driver->del_event(client_sd, EVENT_READABLE|EVENT_WRITABLE, - EVENT_WRITABLE); - } while (1); - - ::close(client_sd); - ::close(listen_sd); -} - -class FakeEvent : public EventCallback { - - public: - void do_request(int fd_or_id) override {} -}; - -TEST(EventCenterTest, FileEventExpansion) { - vector sds; - EventCenter center(g_ceph_context); - center.init(100, 0, "posix"); - center.set_owner(); - EventCallbackRef e(new FakeEvent()); - for (int i = 0; i < 300; i++) { - int sd = ::socket(AF_INET, SOCK_STREAM, 0); - center.create_file_event(sd, EVENT_READABLE, e); - sds.push_back(sd); - } - - for (vector::iterator it = sds.begin(); it != sds.end(); ++it) - center.delete_file_event(*it, EVENT_READABLE); -} - - -class Worker : public Thread { - CephContext *cct; - bool done; - - public: - EventCenter center; - explicit Worker(CephContext *c, int idx): cct(c), done(false), center(c) { - center.init(100, idx, "posix"); - } - void stop() { - done = true; - center.wakeup(); - } - void* entry() override { - center.set_owner(); - while (!done) - center.process_events(1000000); - return 0; - } -}; - -class CountEvent: public EventCallback { - std::atomic *count; - Mutex *lock; - Cond *cond; - - public: - CountEvent(std::atomic *atomic, Mutex *l, Cond *c): count(atomic), lock(l), cond(c) {} - void do_request(int id) override { - lock->Lock(); - (*count)--; - cond->Signal(); - lock->Unlock(); - } -}; - -TEST(EventCenterTest, DispatchTest) { - Worker worker1(g_ceph_context, 1), worker2(g_ceph_context, 2); - std::atomic count = { 0 }; - Mutex lock("DispatchTest::lock"); - Cond cond; - worker1.create("worker_1"); - worker2.create("worker_2"); - for (int i = 0; i < 10000; ++i) { - count++; - worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond))); - count++; - worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond))); - Mutex::Locker l(lock); - while (count) - cond.Wait(lock); - } - worker1.stop(); - worker2.stop(); - worker1.join(); - worker2.join(); -} - -INSTANTIATE_TEST_CASE_P( - AsyncMessenger, - EventDriverTest, - ::testing::Values( -#ifdef HAVE_EPOLL - "epoll", -#endif -#ifdef HAVE_KQUEUE - "kqueue", -#endif - "select" - ) -); - -#else - -// Google Test may not support value-parameterized tests with some -// compilers. If we use conditional compilation to compile out all -// code referring to the gtest_main library, MSVC linker will not link -// that library at all and consequently complain about missing entry -// point defined in that library (fatal error LNK1561: entry point -// must be defined). This dummy test keeps gtest_main linked in. -TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {} - -#endif - - -/* - * Local Variables: - * compile-command: "cd ../.. ; make ceph_test_async_driver && - * ./ceph_test_async_driver - * - * End: - */