X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftest%2Fmsgr%2Ftest_async_networkstack.cc;fp=src%2Fceph%2Fsrc%2Ftest%2Fmsgr%2Ftest_async_networkstack.cc;h=e876cc350890958052a7bd3808c3238fb2f7e931;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/test/msgr/test_async_networkstack.cc b/src/ceph/src/test/msgr/test_async_networkstack.cc new file mode 100644 index 0000000..e876cc3 --- /dev/null +++ b/src/ceph/src/test/msgr/test_async_networkstack.cc @@ -0,0 +1,1067 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSky + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "acconfig.h" +#include "include/Context.h" + +#include "msg/async/Event.h" +#include "msg/async/Stack.h" + + +#if GTEST_HAS_PARAM_TEST + +class NetworkWorkerTest : public ::testing::TestWithParam { + public: + std::shared_ptr stack; + string addr, port_addr; + + NetworkWorkerTest() {} + void SetUp() override { + cerr << __func__ << " start set up " << GetParam() << std::endl; + if (strncmp(GetParam(), "dpdk", 4)) { + g_ceph_context->_conf->set_val("ms_type", "async+posix", false); + addr = "127.0.0.1:15000"; + port_addr = "127.0.0.1:15001"; + } else { + g_ceph_context->_conf->set_val("ms_type", "async+dpdk", false); + g_ceph_context->_conf->set_val("ms_dpdk_debug_allow_loopback", "true", false); + g_ceph_context->_conf->set_val("ms_async_op_threads", "2", false); + g_ceph_context->_conf->set_val("ms_dpdk_coremask", "0x7", false); + g_ceph_context->_conf->set_val("ms_dpdk_host_ipv4_addr", "172.16.218.3", false); + g_ceph_context->_conf->set_val("ms_dpdk_gateway_ipv4_addr", "172.16.218.2", false); + g_ceph_context->_conf->set_val("ms_dpdk_netmask_ipv4_addr", "255.255.255.0", false); + addr = "172.16.218.3:15000"; + port_addr = "172.16.218.3:15001"; + } + stack = NetworkStack::create(g_ceph_context, GetParam()); + stack->start(); + } + void TearDown() override { + stack->stop(); + } + string get_addr() const { + return addr; + } + string get_ip_different_port() const { + return port_addr; + } + string get_different_ip() const { + return "10.0.123.100:4323"; + } + EventCenter *get_center(unsigned i) { + return &stack->get_worker(i)->center; + } + Worker *get_worker(unsigned i) { + return stack->get_worker(i); + } + template + class C_dispatch : public EventCallback { + Worker *worker; + func f; + std::atomic_bool done; + public: + C_dispatch(Worker *w, func &&_f): worker(w), f(std::move(_f)), done(false) {} + void do_request(int id) override { + f(worker); + done = true; + } + void wait() { + int us = 1000 * 1000 * 1000; + while (!done) { + ASSERT_TRUE(us > 0); + usleep(100); + us -= 100; + } + } + }; + template + void exec_events(func &&f) { + std::vector*> dis; + for (unsigned i = 0; i < stack->get_num_worker(); ++i) { + Worker *w = stack->get_worker(i); + C_dispatch *e = new C_dispatch(w, std::move(f)); + stack->get_worker(i)->center.dispatch_event_external(e); + dis.push_back(e); + } + + for (auto &&e : dis) { + e->wait(); + delete e; + } + } +}; + +class C_poll : public EventCallback { + EventCenter *center; + std::atomic woken; + static const int sleepus = 500; + + public: + C_poll(EventCenter *c): center(c), woken(false) {} + void do_request(int r) override { + woken = true; + } + bool poll(int milliseconds) { + auto start = ceph::coarse_real_clock::now(); + while (!woken) { + center->process_events(sleepus); + usleep(sleepus); + auto r = std::chrono::duration_cast( + ceph::coarse_real_clock::now() - start); + if (r >= std::chrono::milliseconds(milliseconds)) + break; + } + return woken; + } + void reset() { + woken = false; + } +}; + +TEST_P(NetworkWorkerTest, SimpleTest) { + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + std::atomic_bool accepted(false); + std::atomic_bool *accepted_p = &accepted; + + exec_events([this, accepted_p, bind_addr](Worker *worker) mutable { + entity_addr_t cli_addr; + SocketOptions options; + ServerSocket bind_socket; + EventCenter *center = &worker->center; + ssize_t r = 0; + if (stack->support_local_listen_table() || worker->id == 0) + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + + ConnectedSocket cli_socket, srv_socket; + if (worker->id == 0) { + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + } + + bool is_my_accept = false; + if (bind_socket) { + C_poll cb(center); + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + if (cb.poll(500)) { + *accepted_p = true; + is_my_accept = true; + } + ASSERT_TRUE(*accepted_p); + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + + if (is_my_accept) { + r = bind_socket.accept(&srv_socket, options, &cli_addr, worker); + ASSERT_EQ(0, r); + ASSERT_TRUE(srv_socket.fd() > 0); + } + + if (worker->id == 0) { + C_poll cb(center); + center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb); + r = cli_socket.is_connected(); + if (r == 0) { + ASSERT_EQ(true, cb.poll(500)); + r = cli_socket.is_connected(); + } + ASSERT_EQ(1, r); + center->delete_file_event(cli_socket.fd(), EVENT_READABLE); + } + + const char *message = "this is a new message"; + int len = strlen(message); + bufferlist bl; + bl.append(message, len); + if (worker->id == 0) { + r = cli_socket.send(bl, false); + ASSERT_EQ(len, r); + } + + char buf[1024]; + C_poll cb(center); + if (is_my_accept) { + center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb); + { + r = srv_socket.read(buf, sizeof(buf)); + while (r == -EAGAIN) { + ASSERT_TRUE(cb.poll(500)); + r = srv_socket.read(buf, sizeof(buf)); + cb.reset(); + } + ASSERT_EQ(len, r); + ASSERT_EQ(0, memcmp(buf, message, len)); + } + bind_socket.abort_accept(); + } + if (worker->id == 0) { + cli_socket.shutdown(); + // ack delay is 200 ms + } + + bl.clear(); + bl.append(message, len); + if (worker->id == 0) { + r = cli_socket.send(bl, false); + ASSERT_EQ(-EPIPE, r); + } + if (is_my_accept) { + cb.reset(); + ASSERT_TRUE(cb.poll(500)); + r = srv_socket.read(buf, sizeof(buf)); + if (r == -EAGAIN) { + cb.reset(); + ASSERT_TRUE(cb.poll(1000*500)); + r = srv_socket.read(buf, sizeof(buf)); + } + ASSERT_EQ(0, r); + center->delete_file_event(srv_socket.fd(), EVENT_READABLE); + srv_socket.close(); + } + }); +} + +TEST_P(NetworkWorkerTest, ConnectFailedTest) { + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + + exec_events([this, bind_addr](Worker *worker) mutable { + EventCenter *center = &worker->center; + entity_addr_t cli_addr; + SocketOptions options; + ServerSocket bind_socket; + int r = 0; + if (stack->support_local_listen_table() || worker->id == 0) + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + + ConnectedSocket cli_socket1, cli_socket2; + if (worker->id == 0) { + ASSERT_TRUE(cli_addr.parse(get_ip_different_port().c_str())); + r = worker->connect(cli_addr, options, &cli_socket1); + ASSERT_EQ(0, r); + C_poll cb(center); + center->create_file_event(cli_socket1.fd(), EVENT_READABLE, &cb); + r = cli_socket1.is_connected(); + if (r == 0) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket1.is_connected(); + } + ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET); + } + + if (worker->id == 1) { + ASSERT_TRUE(cli_addr.parse(get_different_ip().c_str())); + r = worker->connect(cli_addr, options, &cli_socket2); + ASSERT_EQ(0, r); + C_poll cb(center); + center->create_file_event(cli_socket2.fd(), EVENT_READABLE, &cb); + r = cli_socket2.is_connected(); + if (r == 0) { + cb.poll(500); + r = cli_socket2.is_connected(); + } + ASSERT_TRUE(r != 1); + center->delete_file_event(cli_socket2.fd(), EVENT_READABLE); + } + }); +} + +TEST_P(NetworkWorkerTest, ListenTest) { + Worker *worker = get_worker(0); + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + SocketOptions options; + ServerSocket bind_socket1, bind_socket2; + int r = worker->listen(bind_addr, options, &bind_socket1); + ASSERT_EQ(0, r); + + r = worker->listen(bind_addr, options, &bind_socket2); + ASSERT_EQ(-EADDRINUSE, r); +} + +TEST_P(NetworkWorkerTest, AcceptAndCloseTest) { + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + std::atomic_bool accepted(false); + std::atomic_bool *accepted_p = &accepted; + std::atomic_int unbind_count(stack->get_num_worker()); + std::atomic_int *count_p = &unbind_count; + exec_events([this, bind_addr, accepted_p, count_p](Worker *worker) mutable { + SocketOptions options; + EventCenter *center = &worker->center; + entity_addr_t cli_addr; + int r = 0; + { + ServerSocket bind_socket; + if (stack->support_local_listen_table() || worker->id == 0) + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + + ConnectedSocket srv_socket, cli_socket; + if (bind_socket) { + r = bind_socket.accept(&srv_socket, options, &cli_addr, worker); + ASSERT_EQ(-EAGAIN, r); + } + + C_poll cb(center); + if (worker->id == 0) { + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + ASSERT_TRUE(cb.poll(500)); + } + + if (bind_socket) { + cb.reset(); + cb.poll(500); + ConnectedSocket srv_socket2; + do { + r = bind_socket.accept(&srv_socket2, options, &cli_addr, worker); + usleep(100); + } while (r == -EAGAIN && !*accepted_p); + if (r == 0) + *accepted_p = true; + ASSERT_TRUE(*accepted_p); + // srv_socket2 closed + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + + if (worker->id == 0) { + char buf[100]; + cb.reset(); + center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb); + int i = 3; + while (!i--) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket.read(buf, sizeof(buf)); + if (r == 0) + break; + } + ASSERT_EQ(0, r); + center->delete_file_event(cli_socket.fd(), EVENT_READABLE); + } + + if (bind_socket) + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + if (worker->id == 0) { + *accepted_p = false; + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + cb.reset(); + ASSERT_TRUE(cb.poll(500)); + cli_socket.close(); + } + + if (bind_socket) { + do { + r = bind_socket.accept(&srv_socket, options, &cli_addr, worker); + usleep(100); + } while (r == -EAGAIN && !*accepted_p); + if (r == 0) + *accepted_p = true; + ASSERT_TRUE(*accepted_p); + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + // unbind + } + + --*count_p; + while (*count_p > 0) + usleep(100); + + ConnectedSocket cli_socket; + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + { + C_poll cb(center); + center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb); + r = cli_socket.is_connected(); + if (r == 0) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket.is_connected(); + } + ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET); + } + }); +} + +TEST_P(NetworkWorkerTest, ComplexTest) { + entity_addr_t bind_addr; + std::atomic_bool listen_done(false); + std::atomic_bool *listen_p = &listen_done; + std::atomic_bool accepted(false); + std::atomic_bool *accepted_p = &accepted; + std::atomic_bool done(false); + std::atomic_bool *done_p = &done; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + exec_events([this, bind_addr, listen_p, accepted_p, done_p](Worker *worker) mutable { + entity_addr_t cli_addr; + EventCenter *center = &worker->center; + SocketOptions options; + ServerSocket bind_socket; + int r = 0; + if (stack->support_local_listen_table() || worker->id == 0) { + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + *listen_p = true; + } + ConnectedSocket cli_socket, srv_socket; + if (worker->id == 1) { + while (!*listen_p) { + usleep(50); + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + } + } + + if (bind_socket) { + C_poll cb(center); + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + int count = 3; + while (count--) { + if (cb.poll(500)) { + r = bind_socket.accept(&srv_socket, options, &cli_addr, worker); + ASSERT_EQ(0, r); + *accepted_p = true; + break; + } + } + ASSERT_TRUE(*accepted_p); + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + + if (worker->id == 1) { + C_poll cb(center); + center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb); + r = cli_socket.is_connected(); + if (r == 0) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket.is_connected(); + } + ASSERT_EQ(1, r); + center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE); + } + + const size_t message_size = 10240; + size_t count = 100; + string message(message_size, '!'); + for (size_t i = 0; i < message_size; i += 100) + message[i] = ','; + size_t len = message_size * count; + C_poll cb(center); + if (worker->id == 1) + center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb); + if (srv_socket) + center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb); + size_t left = len; + len *= 2; + string read_string; + int again_count = 0; + int c = 2; + bufferlist bl; + for (size_t i = 0; i < count; ++i) + bl.push_back(bufferptr((char*)message.data(), message_size)); + while (!*done_p) { + again_count = 0; + if (worker->id == 1) { + if (c > 0) { + ssize_t r = 0; + usleep(100); + if (left > 0) { + r = cli_socket.send(bl, false); + ASSERT_TRUE(r >= 0 || r == -EAGAIN); + if (r > 0) + left -= r; + if (r == -EAGAIN) + ++again_count; + } + if (left == 0) { + --c; + left = message_size * count; + ASSERT_EQ(0U, bl.length()); + for (size_t i = 0; i < count; ++i) + bl.push_back(bufferptr((char*)message.data(), message_size)); + } + } + } + + if (srv_socket) { + char buf[1000]; + if (len > 0) { + r = srv_socket.read(buf, sizeof(buf)); + ASSERT_TRUE(r > 0 || r == -EAGAIN); + if (r > 0) { + read_string.append(buf, r); + len -= r; + } else if (r == -EAGAIN) { + ++again_count; + } + } + if (len == 0) { + for (size_t i = 0; i < read_string.size(); i += message_size) + ASSERT_EQ(0, memcmp(read_string.c_str()+i, message.c_str(), message_size)); + *done_p = true; + } + } + if (again_count) { + cb.reset(); + cb.poll(500); + } + } + if (worker->id == 1) + center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE); + if (srv_socket) + center->delete_file_event(srv_socket.fd(), EVENT_READABLE); + + if (bind_socket) + bind_socket.abort_accept(); + if (srv_socket) + srv_socket.close(); + if (worker->id == 1) + cli_socket.close(); + }); +} + +class StressFactory { + struct Client; + struct Server; + struct ThreadData { + Worker *worker; + std::set clients; + std::set servers; + ~ThreadData() { + for (auto && i : clients) + delete i; + for (auto && i : servers) + delete i; + } + }; + + struct RandomString { + size_t slen; + vector strs; + std::random_device rd; + std::default_random_engine rng; + + RandomString(size_t s): slen(s), rng(rd()) {} + void prepare(size_t n) { + static const char alphabet[] = + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "0123456789"; + + std::uniform_int_distribution<> dist( + 0, sizeof(alphabet) / sizeof(*alphabet) - 2); + + strs.reserve(n); + std::generate_n( + std::back_inserter(strs), strs.capacity(), [&] { + std::string str; + str.reserve(slen); + std::generate_n(std::back_inserter(str), slen, [&]() { + return alphabet[dist(rng)]; + }); + return str; + } + ); + } + std::string &get_random_string() { + std::uniform_int_distribution<> dist( + 0, strs.size() - 1); + return strs[dist(rng)]; + } + }; + struct Message { + size_t idx; + size_t len; + std::string content; + + explicit Message(RandomString &rs, size_t i, size_t l): idx(i) { + size_t slen = rs.slen; + len = std::max(slen, l); + + std::vector strs; + strs.reserve(len / slen); + std::generate_n( + std::back_inserter(strs), strs.capacity(), [&] { + return rs.get_random_string(); + } + ); + len = slen * strs.size(); + content.reserve(len); + for (auto &&s : strs) + content.append(s); + } + bool verify(const char *b, size_t len = 0) const { + return content.compare(0, len, b, 0, len) == 0; + } + }; + + template + class C_delete : public EventCallback { + T *ctxt; + public: + C_delete(T *c): ctxt(c) {} + void do_request(int id) override { + delete ctxt; + delete this; + } + }; + + class Client { + StressFactory *factory; + EventCenter *center; + ConnectedSocket socket; + std::deque acking; + std::deque writings; + std::string buffer; + size_t index = 0; + size_t left; + bool write_enabled = false; + size_t read_offset = 0, write_offset = 0; + bool first = true; + bool dead = false; + StressFactory::Message homeless_message; + + class Client_read_handle : public EventCallback { + Client *c; + public: + Client_read_handle(Client *_c): c(_c) {} + void do_request(int id) override { + c->do_read_request(); + } + } read_ctxt; + + class Client_write_handle : public EventCallback { + Client *c; + public: + Client_write_handle(Client *_c): c(_c) {} + void do_request(int id) override { + c->do_write_request(); + } + } write_ctxt; + + public: + Client(StressFactory *f, EventCenter *cen, ConnectedSocket s, size_t c) + : factory(f), center(cen), socket(std::move(s)), left(c), homeless_message(factory->rs, -1, 1024), + read_ctxt(this), write_ctxt(this) { + center->create_file_event( + socket.fd(), EVENT_READABLE, &read_ctxt); + center->dispatch_event_external(&read_ctxt); + } + void close() { + ASSERT_FALSE(write_enabled); + dead = true; + socket.shutdown(); + center->delete_file_event(socket.fd(), EVENT_READABLE); + center->dispatch_event_external(new C_delete(this)); + } + + void do_read_request() { + if (dead) + return ; + ASSERT_TRUE(socket.is_connected() >= 0); + if (!socket.is_connected()) + return ; + ASSERT_TRUE(!acking.empty() || first); + if (first) { + first = false; + center->dispatch_event_external(&write_ctxt); + if (acking.empty()) + return ; + } + StressFactory::Message *m = acking.front(); + int r = 0; + if (buffer.empty()) + buffer.resize(m->len); + bool must_no = false; + while (true) { + r = socket.read((char*)buffer.data() + read_offset, + m->len - read_offset); + ASSERT_TRUE(r == -EAGAIN || r > 0); + if (r == -EAGAIN) + break; + read_offset += r; + + std::cerr << " client " << this << " receive " << m->idx << " len " << r << " content: " << std::endl; + ASSERT_FALSE(must_no); + if ((m->len - read_offset) == 0) { + ASSERT_TRUE(m->verify(buffer.data(), 0)); + delete m; + acking.pop_front(); + read_offset = 0; + buffer.clear(); + if (acking.empty()) { + m = &homeless_message; + must_no = true; + } else { + m = acking.front(); + buffer.resize(m->len); + } + } + } + if (acking.empty()) { + center->dispatch_event_external(&write_ctxt); + return ; + } + } + + void do_write_request() { + if (dead) + return ; + ASSERT_TRUE(socket.is_connected() > 0); + + while (left > 0 && factory->queue_depth > writings.size() + acking.size()) { + StressFactory::Message *m = new StressFactory::Message( + factory->rs, ++index, + factory->rd() % factory->max_message_length); + std::cerr << " client " << this << " generate message " << m->idx << " length " << m->len << std::endl; + ASSERT_EQ(m->len, m->content.size()); + writings.push_back(m); + --left; + --factory->message_left; + } + + while (!writings.empty()) { + StressFactory::Message *m = writings.front(); + bufferlist bl; + bl.append(m->content.data() + write_offset, m->content.size() - write_offset); + ssize_t r = socket.send(bl, false); + if (r == 0) + break; + std::cerr << " client " << this << " send " << m->idx << " len " << r << " content: " << std::endl; + ASSERT_TRUE(r >= 0); + write_offset += r; + if (write_offset == m->content.size()) { + write_offset = 0; + writings.pop_front(); + acking.push_back(m); + } + } + if (writings.empty() && write_enabled) { + center->delete_file_event(socket.fd(), EVENT_WRITABLE); + write_enabled = false; + } else if (!writings.empty() && !write_enabled) { + ASSERT_EQ(0, center->create_file_event( + socket.fd(), EVENT_WRITABLE, &write_ctxt)); + write_enabled = true; + } + } + + bool finish() const { + return left == 0 && acking.empty() && writings.empty(); + } + }; + friend class Client; + + class Server { + StressFactory *factory; + EventCenter *center; + ConnectedSocket socket; + std::deque buffers; + bool write_enabled = false; + bool dead = false; + + class Server_read_handle : public EventCallback { + Server *s; + public: + Server_read_handle(Server *_s): s(_s) {} + void do_request(int id) override { + s->do_read_request(); + } + } read_ctxt; + + class Server_write_handle : public EventCallback { + Server *s; + public: + Server_write_handle(Server *_s): s(_s) {} + void do_request(int id) override { + s->do_write_request(); + } + } write_ctxt; + + public: + Server(StressFactory *f, EventCenter *c, ConnectedSocket s): + factory(f), center(c), socket(std::move(s)), read_ctxt(this), write_ctxt(this) { + center->create_file_event(socket.fd(), EVENT_READABLE, &read_ctxt); + center->dispatch_event_external(&read_ctxt); + } + void close() { + ASSERT_FALSE(write_enabled); + socket.shutdown(); + center->delete_file_event(socket.fd(), EVENT_READABLE); + center->dispatch_event_external(new C_delete(this)); + } + void do_read_request() { + if (dead) + return ; + int r = 0; + while (true) { + char buf[4096]; + bufferptr data; + if (factory->zero_copy_read) { + r = socket.zero_copy_read(data); + } else { + r = socket.read(buf, sizeof(buf)); + } + ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf))); + if (r == 0) { + ASSERT_TRUE(buffers.empty()); + dead = true; + return ; + } else if (r == -EAGAIN) + break; + if (factory->zero_copy_read) { + buffers.emplace_back(data.c_str(), 0, data.length()); + } else { + buffers.emplace_back(buf, 0, r); + } + std::cerr << " server " << this << " receive " << r << " content: " << std::endl; + } + if (!buffers.empty() && !write_enabled) + center->dispatch_event_external(&write_ctxt); + } + + void do_write_request() { + if (dead) + return ; + + while (!buffers.empty()) { + bufferlist bl; + auto it = buffers.begin(); + for (size_t i = 0; i < buffers.size(); ++i) { + bl.push_back(bufferptr((char*)it->data(), it->size())); + ++it; + } + + ssize_t r = socket.send(bl, false); + std::cerr << " server " << this << " send " << r << std::endl; + if (r == 0) + break; + ASSERT_TRUE(r >= 0); + while (r > 0) { + ASSERT_TRUE(!buffers.empty()); + string &buffer = buffers.front(); + if (r >= (int)buffer.size()) { + r -= (int)buffer.size(); + buffers.pop_front(); + } else { + std::cerr << " server " << this << " sent " << r << std::endl; + buffer = buffer.substr(r, buffer.size()); + break; + } + } + } + if (buffers.empty()) { + if (write_enabled) { + center->delete_file_event(socket.fd(), EVENT_WRITABLE); + write_enabled = false; + } + } else if (!write_enabled) { + ASSERT_EQ(0, center->create_file_event( + socket.fd(), EVENT_WRITABLE, &write_ctxt)); + write_enabled = true; + } + } + + bool finish() { + return dead; + } + }; + friend class Server; + + class C_accept : public EventCallback { + StressFactory *factory; + ServerSocket bind_socket; + ThreadData *t_data; + Worker *worker; + + public: + C_accept(StressFactory *f, ServerSocket s, ThreadData *data, Worker *w) + : factory(f), bind_socket(std::move(s)), t_data(data), worker(w) {} + void do_request(int id) override { + while (true) { + entity_addr_t cli_addr; + ConnectedSocket srv_socket; + SocketOptions options; + int r = bind_socket.accept(&srv_socket, options, &cli_addr, worker); + if (r == -EAGAIN) { + break; + } + ASSERT_EQ(0, r); + ASSERT_TRUE(srv_socket.fd() > 0); + Server *cb = new Server(factory, &t_data->worker->center, std::move(srv_socket)); + t_data->servers.insert(cb); + } + } + }; + friend class C_accept; + + public: + static const size_t min_client_send_messages = 100; + static const size_t max_client_send_messages = 1000; + std::shared_ptr stack; + RandomString rs; + std::random_device rd; + const size_t client_num, queue_depth, max_message_length; + atomic_int message_count, message_left; + entity_addr_t bind_addr; + std::atomic_bool already_bind = {false}; + bool zero_copy_read; + SocketOptions options; + + explicit StressFactory(std::shared_ptr s, const string &addr, + size_t cli, size_t qd, size_t mc, size_t l, bool zero_copy) + : stack(s), rs(128), client_num(cli), queue_depth(qd), + max_message_length(l), message_count(mc), message_left(mc), + zero_copy_read(zero_copy) { + bind_addr.parse(addr.c_str()); + rs.prepare(100); + } + ~StressFactory() { + } + + void add_client(ThreadData *t_data) { + static Mutex lock("add_client_lock"); + Mutex::Locker l(lock); + ConnectedSocket sock; + int r = t_data->worker->connect(bind_addr, options, &sock); + std::default_random_engine rng(rd()); + std::uniform_int_distribution<> dist( + min_client_send_messages, max_client_send_messages); + ASSERT_EQ(0, r); + int c = dist(rng); + if (c > message_count.load()) + c = message_count.load(); + Client *cb = new Client(this, &t_data->worker->center, std::move(sock), c); + t_data->clients.insert(cb); + message_count -= c; + } + + void drop_client(ThreadData *t_data, Client *c) { + c->close(); + ASSERT_EQ(1U, t_data->clients.erase(c)); + } + + void drop_server(ThreadData *t_data, Server *s) { + s->close(); + ASSERT_EQ(1U, t_data->servers.erase(s)); + } + + void start(Worker *worker) { + int r = 0; + ThreadData t_data; + t_data.worker = worker; + ServerSocket bind_socket; + if (stack->support_local_listen_table() || worker->id == 0) { + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + already_bind = true; + } + while (!already_bind) + usleep(50); + C_accept *accept_handler = nullptr; + int bind_fd = 0; + if (bind_socket) { + bind_fd = bind_socket.fd(); + accept_handler = new C_accept(this, std::move(bind_socket), &t_data, worker); + ASSERT_EQ(0, worker->center.create_file_event( + bind_fd, EVENT_READABLE, accept_handler)); + } + + int echo_throttle = message_count; + while (message_count > 0 || !t_data.clients.empty() || !t_data.servers.empty()) { + if (message_count > 0 && t_data.clients.size() < client_num && t_data.servers.size() < client_num) + add_client(&t_data); + for (auto &&c : t_data.clients) { + if (c->finish()) { + drop_client(&t_data, c); + break; + } + } + for (auto &&s : t_data.servers) { + if (s->finish()) { + drop_server(&t_data, s); + break; + } + } + + worker->center.process_events(1); + if (echo_throttle > message_left) { + std::cerr << " clients " << t_data.clients.size() << " servers " << t_data.servers.size() + << " message count " << message_left << std::endl; + echo_throttle -= 100; + } + } + if (bind_fd) + worker->center.delete_file_event(bind_fd, EVENT_READABLE); + delete accept_handler; + } +}; + +TEST_P(NetworkWorkerTest, StressTest) { + StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024, + strncmp(GetParam(), "dpdk", 4) == 0); + StressFactory *f = &factory; + exec_events([f](Worker *worker) mutable { + f->start(worker); + }); + ASSERT_EQ(0, factory.message_left); +} + + +INSTANTIATE_TEST_CASE_P( + NetworkStack, + NetworkWorkerTest, + ::testing::Values( +#ifdef HAVE_DPDK + "dpdk", +#endif + "posix" + ) +); + +#else + +// Google Test may not support value-parameterized tests with some +// compilers. If we use conditional compilation to compile out all +// code referring to the gtest_main library, MSVC linker will not link +// that library at all and consequently complain about missing entry +// point defined in that library (fatal error LNK1561: entry point +// must be defined). This dummy test keeps gtest_main linked in. +TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {} + +#endif + + +/* + * Local Variables: + * compile-command: "cd ../.. ; make ceph_test_async_networkstack && + * ./ceph_test_async_networkstack + * + * End: + */