initial code repo
[stor4nfv.git] / src / ceph / src / test / msgr / test_async_networkstack.cc
diff --git a/src/ceph/src/test/msgr/test_async_networkstack.cc b/src/ceph/src/test/msgr/test_async_networkstack.cc
new file mode 100644 (file)
index 0000000..e876cc3
--- /dev/null
@@ -0,0 +1,1067 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <algorithm>
+#include <atomic>
+#include <iostream>
+#include <random>
+#include <string>
+#include <set>
+#include <vector>
+#include <gtest/gtest.h>
+
+#include "acconfig.h"
+#include "include/Context.h"
+
+#include "msg/async/Event.h"
+#include "msg/async/Stack.h"
+
+
+#if GTEST_HAS_PARAM_TEST
+
+class NetworkWorkerTest : public ::testing::TestWithParam<const char*> {
+ public:
+  std::shared_ptr<NetworkStack> stack;
+  string addr, port_addr;
+
+  NetworkWorkerTest() {}
+  void SetUp() override {
+    cerr << __func__ << " start set up " << GetParam() << std::endl;
+    if (strncmp(GetParam(), "dpdk", 4)) {
+      g_ceph_context->_conf->set_val("ms_type", "async+posix", false);
+      addr = "127.0.0.1:15000";
+      port_addr = "127.0.0.1:15001";
+    } else {
+      g_ceph_context->_conf->set_val("ms_type", "async+dpdk", false);
+      g_ceph_context->_conf->set_val("ms_dpdk_debug_allow_loopback", "true", false);
+      g_ceph_context->_conf->set_val("ms_async_op_threads", "2", false);
+      g_ceph_context->_conf->set_val("ms_dpdk_coremask", "0x7", false);
+      g_ceph_context->_conf->set_val("ms_dpdk_host_ipv4_addr", "172.16.218.3", false);
+      g_ceph_context->_conf->set_val("ms_dpdk_gateway_ipv4_addr", "172.16.218.2", false);
+      g_ceph_context->_conf->set_val("ms_dpdk_netmask_ipv4_addr", "255.255.255.0", false);
+      addr = "172.16.218.3:15000";
+      port_addr = "172.16.218.3:15001";
+    }
+    stack = NetworkStack::create(g_ceph_context, GetParam());
+    stack->start();
+  }
+  void TearDown() override {
+    stack->stop();
+  }
+  string get_addr() const {
+    return addr;
+  }
+  string get_ip_different_port() const {
+    return port_addr;
+  }
+  string get_different_ip() const {
+    return "10.0.123.100:4323";
+  }
+  EventCenter *get_center(unsigned i) {
+    return &stack->get_worker(i)->center;
+  }
+  Worker *get_worker(unsigned i) {
+    return stack->get_worker(i);
+  }
+  template<typename func>
+  class C_dispatch : public EventCallback {
+    Worker *worker;
+    func f;
+    std::atomic_bool done;
+   public:
+    C_dispatch(Worker *w, func &&_f): worker(w), f(std::move(_f)), done(false) {}
+    void do_request(int id) override {
+      f(worker);
+      done = true;
+    }
+    void wait() {
+      int us = 1000 * 1000 * 1000;
+      while (!done) {
+        ASSERT_TRUE(us > 0);
+        usleep(100);
+        us -= 100;
+      }
+    }
+  };
+  template<typename func>
+  void exec_events(func &&f) {
+    std::vector<C_dispatch<func>*> dis;
+    for (unsigned i = 0; i < stack->get_num_worker(); ++i) {
+      Worker *w = stack->get_worker(i);
+      C_dispatch<func> *e = new C_dispatch<func>(w, std::move(f));
+      stack->get_worker(i)->center.dispatch_event_external(e);
+      dis.push_back(e);
+    }
+
+    for (auto &&e : dis) {
+      e->wait();
+      delete e;
+    }
+  }
+};
+
+class C_poll : public EventCallback {
+  EventCenter *center;
+  std::atomic<bool> woken;
+  static const int sleepus = 500;
+
+ public:
+  C_poll(EventCenter *c): center(c), woken(false) {}
+  void do_request(int r) override {
+    woken = true;
+  }
+  bool poll(int milliseconds) {
+    auto start = ceph::coarse_real_clock::now();
+    while (!woken) {
+      center->process_events(sleepus);
+      usleep(sleepus);
+      auto r = std::chrono::duration_cast<std::chrono::milliseconds>(
+              ceph::coarse_real_clock::now() - start);
+      if (r >= std::chrono::milliseconds(milliseconds))
+        break;
+    }
+    return woken;
+  }
+  void reset() {
+    woken = false;
+  }
+};
+
+TEST_P(NetworkWorkerTest, SimpleTest) {
+  entity_addr_t bind_addr;
+  ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
+  std::atomic_bool accepted(false);
+  std::atomic_bool *accepted_p = &accepted;
+
+  exec_events([this, accepted_p, bind_addr](Worker *worker) mutable {
+    entity_addr_t cli_addr;
+    SocketOptions options;
+    ServerSocket bind_socket;
+    EventCenter *center = &worker->center;
+    ssize_t r = 0;
+    if (stack->support_local_listen_table() || worker->id == 0)
+      r = worker->listen(bind_addr, options, &bind_socket);
+    ASSERT_EQ(0, r);
+
+    ConnectedSocket cli_socket, srv_socket;
+    if (worker->id == 0) {
+      r = worker->connect(bind_addr, options, &cli_socket);
+      ASSERT_EQ(0, r);
+    }
+
+    bool is_my_accept = false;
+    if (bind_socket) {
+      C_poll cb(center);
+      center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
+      if (cb.poll(500)) {
+        *accepted_p = true;
+        is_my_accept = true;
+      }
+      ASSERT_TRUE(*accepted_p);
+      center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
+    }
+
+    if (is_my_accept) {
+      r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
+      ASSERT_EQ(0, r);
+      ASSERT_TRUE(srv_socket.fd() > 0);
+    }
+
+    if (worker->id == 0) {
+      C_poll cb(center);
+      center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
+      r = cli_socket.is_connected();
+      if (r == 0) {
+        ASSERT_EQ(true, cb.poll(500));
+        r = cli_socket.is_connected();
+      }
+      ASSERT_EQ(1, r);
+      center->delete_file_event(cli_socket.fd(), EVENT_READABLE);
+    }
+
+    const char *message = "this is a new message";
+    int len = strlen(message);
+    bufferlist bl;
+    bl.append(message, len);
+    if (worker->id == 0) {
+      r = cli_socket.send(bl, false);
+      ASSERT_EQ(len, r);
+    }
+
+    char buf[1024];
+    C_poll cb(center);
+    if (is_my_accept) {
+      center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb);
+      {
+        r = srv_socket.read(buf, sizeof(buf));
+        while (r == -EAGAIN) {
+          ASSERT_TRUE(cb.poll(500));
+          r = srv_socket.read(buf, sizeof(buf));
+          cb.reset();
+        }
+        ASSERT_EQ(len, r);
+        ASSERT_EQ(0, memcmp(buf, message, len));
+      }
+      bind_socket.abort_accept();
+    }
+    if (worker->id == 0) {
+      cli_socket.shutdown();
+      // ack delay is 200 ms
+    }
+
+    bl.clear();
+    bl.append(message, len);
+    if (worker->id == 0) {
+      r = cli_socket.send(bl, false);
+      ASSERT_EQ(-EPIPE, r);
+    }
+    if (is_my_accept) {
+      cb.reset();
+      ASSERT_TRUE(cb.poll(500));
+      r = srv_socket.read(buf, sizeof(buf));
+      if (r == -EAGAIN) {
+        cb.reset();
+        ASSERT_TRUE(cb.poll(1000*500));
+        r = srv_socket.read(buf, sizeof(buf));
+      }
+      ASSERT_EQ(0, r);
+      center->delete_file_event(srv_socket.fd(), EVENT_READABLE);
+      srv_socket.close();
+    }
+  });
+}
+
+TEST_P(NetworkWorkerTest, ConnectFailedTest) {
+  entity_addr_t bind_addr;
+  ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
+
+  exec_events([this, bind_addr](Worker *worker) mutable {
+    EventCenter *center = &worker->center;
+    entity_addr_t cli_addr;
+    SocketOptions options;
+    ServerSocket bind_socket;
+    int r = 0;
+    if (stack->support_local_listen_table() || worker->id == 0)
+      r = worker->listen(bind_addr, options, &bind_socket);
+    ASSERT_EQ(0, r);
+
+    ConnectedSocket cli_socket1, cli_socket2;
+    if (worker->id == 0) {
+      ASSERT_TRUE(cli_addr.parse(get_ip_different_port().c_str()));
+      r = worker->connect(cli_addr, options, &cli_socket1);
+      ASSERT_EQ(0, r);
+      C_poll cb(center);
+      center->create_file_event(cli_socket1.fd(), EVENT_READABLE, &cb);
+      r = cli_socket1.is_connected();
+      if (r == 0) {
+        ASSERT_TRUE(cb.poll(500));
+        r = cli_socket1.is_connected();
+      }
+      ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET);
+    }
+
+    if (worker->id == 1) {
+      ASSERT_TRUE(cli_addr.parse(get_different_ip().c_str()));
+      r = worker->connect(cli_addr, options, &cli_socket2);
+      ASSERT_EQ(0, r);
+      C_poll cb(center);
+      center->create_file_event(cli_socket2.fd(), EVENT_READABLE, &cb);
+      r = cli_socket2.is_connected();
+      if (r == 0) {
+        cb.poll(500);
+        r = cli_socket2.is_connected();
+      }
+      ASSERT_TRUE(r != 1);
+      center->delete_file_event(cli_socket2.fd(), EVENT_READABLE);
+    }
+  });
+}
+
+TEST_P(NetworkWorkerTest, ListenTest) {
+  Worker *worker = get_worker(0);
+  entity_addr_t bind_addr;
+  ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
+  SocketOptions options;
+  ServerSocket bind_socket1, bind_socket2;
+  int r = worker->listen(bind_addr, options, &bind_socket1);
+  ASSERT_EQ(0, r);
+
+  r = worker->listen(bind_addr, options, &bind_socket2);
+  ASSERT_EQ(-EADDRINUSE, r);
+}
+
+TEST_P(NetworkWorkerTest, AcceptAndCloseTest) {
+  entity_addr_t bind_addr;
+  ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
+  std::atomic_bool accepted(false);
+  std::atomic_bool *accepted_p = &accepted;
+  std::atomic_int unbind_count(stack->get_num_worker());
+  std::atomic_int *count_p = &unbind_count;
+  exec_events([this, bind_addr, accepted_p, count_p](Worker *worker) mutable {
+    SocketOptions options;
+    EventCenter *center = &worker->center;
+    entity_addr_t cli_addr;
+    int r = 0;
+    {
+      ServerSocket bind_socket;
+      if (stack->support_local_listen_table() || worker->id == 0)
+        r = worker->listen(bind_addr, options, &bind_socket);
+      ASSERT_EQ(0, r);
+
+      ConnectedSocket srv_socket, cli_socket;
+      if (bind_socket) {
+        r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
+        ASSERT_EQ(-EAGAIN, r);
+      }
+
+      C_poll cb(center);
+      if (worker->id == 0) {
+        center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
+        r = worker->connect(bind_addr, options, &cli_socket);
+        ASSERT_EQ(0, r);
+        ASSERT_TRUE(cb.poll(500));
+      }
+
+      if (bind_socket) {
+        cb.reset();
+        cb.poll(500);
+        ConnectedSocket srv_socket2;
+        do {
+          r = bind_socket.accept(&srv_socket2, options, &cli_addr, worker);
+          usleep(100);
+        } while (r == -EAGAIN && !*accepted_p);
+        if (r == 0)
+          *accepted_p = true;
+        ASSERT_TRUE(*accepted_p);
+        // srv_socket2 closed
+        center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
+      }
+
+      if (worker->id == 0) {
+        char buf[100];
+        cb.reset();
+        center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
+        int i = 3;
+        while (!i--) {
+          ASSERT_TRUE(cb.poll(500));
+          r = cli_socket.read(buf, sizeof(buf));
+          if (r == 0)
+            break;
+        }
+        ASSERT_EQ(0, r);
+        center->delete_file_event(cli_socket.fd(), EVENT_READABLE);
+      }
+
+      if (bind_socket)
+        center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
+      if (worker->id == 0) {
+        *accepted_p = false;
+        r = worker->connect(bind_addr, options, &cli_socket);
+        ASSERT_EQ(0, r);
+        cb.reset();
+        ASSERT_TRUE(cb.poll(500));
+        cli_socket.close();
+      }
+
+      if (bind_socket) {
+        do {
+          r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
+          usleep(100);
+        } while (r == -EAGAIN && !*accepted_p);
+        if (r == 0)
+          *accepted_p = true;
+        ASSERT_TRUE(*accepted_p);
+        center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
+      }
+      // unbind
+    }
+
+    --*count_p;
+    while (*count_p > 0)
+      usleep(100);
+
+    ConnectedSocket cli_socket;
+    r = worker->connect(bind_addr, options, &cli_socket);
+    ASSERT_EQ(0, r);
+    {
+      C_poll cb(center);
+      center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
+      r = cli_socket.is_connected();
+      if (r == 0) {
+        ASSERT_TRUE(cb.poll(500));
+        r = cli_socket.is_connected();
+      }
+      ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET);
+    }
+  });
+}
+
+TEST_P(NetworkWorkerTest, ComplexTest) {
+  entity_addr_t bind_addr;
+  std::atomic_bool listen_done(false);
+  std::atomic_bool *listen_p = &listen_done;
+  std::atomic_bool accepted(false);
+  std::atomic_bool *accepted_p = &accepted;
+  std::atomic_bool done(false);
+  std::atomic_bool *done_p = &done;
+  ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
+  exec_events([this, bind_addr, listen_p, accepted_p, done_p](Worker *worker) mutable {
+    entity_addr_t cli_addr;
+    EventCenter *center = &worker->center;
+    SocketOptions options;
+    ServerSocket bind_socket;
+    int r = 0;
+    if (stack->support_local_listen_table() || worker->id == 0) {
+      r = worker->listen(bind_addr, options, &bind_socket);
+      ASSERT_EQ(0, r);
+      *listen_p = true;
+    }
+    ConnectedSocket cli_socket, srv_socket;
+    if (worker->id == 1) {
+      while (!*listen_p) {
+        usleep(50);
+        r = worker->connect(bind_addr, options, &cli_socket);
+        ASSERT_EQ(0, r);
+      }
+    }
+
+    if (bind_socket) {
+      C_poll cb(center);
+      center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
+      int count = 3;
+      while (count--) {
+        if (cb.poll(500)) {
+          r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
+          ASSERT_EQ(0, r);
+          *accepted_p = true;
+          break;
+        }
+      }
+      ASSERT_TRUE(*accepted_p);
+      center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
+    }
+
+    if (worker->id == 1) {
+      C_poll cb(center);
+      center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb);
+      r = cli_socket.is_connected();
+      if (r == 0) {
+        ASSERT_TRUE(cb.poll(500));
+        r = cli_socket.is_connected();
+      }
+      ASSERT_EQ(1, r);
+      center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE);
+    }
+
+    const size_t message_size = 10240;
+    size_t count = 100;
+    string message(message_size, '!');
+    for (size_t i = 0; i < message_size; i += 100)
+      message[i] = ',';
+    size_t len = message_size * count;
+    C_poll cb(center);
+    if (worker->id == 1)
+      center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb);
+    if (srv_socket)
+      center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb);
+    size_t left = len;
+    len *= 2;
+    string read_string;
+    int again_count = 0;
+    int c = 2;
+    bufferlist bl;
+    for (size_t i = 0; i < count; ++i)
+      bl.push_back(bufferptr((char*)message.data(), message_size));
+    while (!*done_p) {
+      again_count = 0;
+      if (worker->id == 1) {
+        if (c > 0) {
+          ssize_t r = 0;
+          usleep(100);
+          if (left > 0) {
+            r = cli_socket.send(bl, false);
+            ASSERT_TRUE(r >= 0 || r == -EAGAIN);
+            if (r > 0)
+              left -= r;
+            if (r == -EAGAIN)
+              ++again_count;
+          }
+          if (left == 0) {
+            --c;
+            left = message_size * count;
+            ASSERT_EQ(0U, bl.length());
+            for (size_t i = 0; i < count; ++i)
+              bl.push_back(bufferptr((char*)message.data(), message_size));
+          }
+        }
+      }
+
+      if (srv_socket) {
+        char buf[1000];
+        if (len > 0) {
+          r = srv_socket.read(buf, sizeof(buf));
+          ASSERT_TRUE(r > 0 || r == -EAGAIN);
+          if (r > 0) {
+            read_string.append(buf, r);
+            len -= r;
+          } else if (r == -EAGAIN) {
+            ++again_count;
+          }
+        }
+        if (len == 0) {
+          for (size_t i = 0; i < read_string.size(); i += message_size)
+            ASSERT_EQ(0, memcmp(read_string.c_str()+i, message.c_str(), message_size));
+          *done_p = true;
+        }
+      }
+      if (again_count) {
+        cb.reset();
+        cb.poll(500);
+      }
+    }
+    if (worker->id == 1)
+      center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE);
+    if (srv_socket)
+      center->delete_file_event(srv_socket.fd(), EVENT_READABLE);
+
+    if (bind_socket)
+      bind_socket.abort_accept();
+    if (srv_socket)
+      srv_socket.close();
+    if (worker->id == 1)
+      cli_socket.close();
+  });
+}
+
+class StressFactory {
+  struct Client;
+  struct Server;
+  struct ThreadData {
+    Worker *worker;
+    std::set<Client*> clients;
+    std::set<Server*> servers;
+    ~ThreadData() {
+      for (auto && i : clients)
+        delete i;
+      for (auto && i : servers)
+        delete i;
+    }
+  };
+
+  struct RandomString {
+    size_t slen;
+    vector<std::string> strs;
+    std::random_device rd;
+    std::default_random_engine rng;
+
+    RandomString(size_t s): slen(s), rng(rd()) {}
+    void prepare(size_t n) {
+      static const char alphabet[] =
+          "abcdefghijklmnopqrstuvwxyz"
+          "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+          "0123456789";
+
+      std::uniform_int_distribution<> dist(
+              0, sizeof(alphabet) / sizeof(*alphabet) - 2);
+
+      strs.reserve(n);
+      std::generate_n(
+        std::back_inserter(strs), strs.capacity(), [&] {
+          std::string str;
+          str.reserve(slen);
+          std::generate_n(std::back_inserter(str), slen, [&]() {
+            return alphabet[dist(rng)];
+          });
+         return str;
+        }
+      );
+    }
+    std::string &get_random_string() {
+      std::uniform_int_distribution<> dist(
+              0, strs.size() - 1);
+      return strs[dist(rng)];
+    }
+  };
+  struct Message {
+    size_t idx;
+    size_t len;
+    std::string content;
+
+    explicit Message(RandomString &rs, size_t i, size_t l): idx(i) {
+      size_t slen = rs.slen;
+      len = std::max(slen, l);
+
+      std::vector<std::string> strs;
+      strs.reserve(len / slen);
+      std::generate_n(
+        std::back_inserter(strs), strs.capacity(), [&] {
+          return rs.get_random_string();
+        }
+      );
+      len = slen * strs.size();
+      content.reserve(len);
+      for (auto &&s : strs)
+        content.append(s);
+    }
+    bool verify(const char *b, size_t len = 0) const {
+      return content.compare(0, len, b, 0, len) == 0;
+    }
+  };
+
+  template <typename T>
+  class C_delete : public EventCallback {
+    T *ctxt;
+   public:
+    C_delete(T *c): ctxt(c) {}
+    void do_request(int id) override {
+      delete ctxt;
+      delete this;
+    }
+  };
+
+  class Client {
+    StressFactory *factory;
+    EventCenter *center;
+    ConnectedSocket socket;
+    std::deque<StressFactory::Message*> acking;
+    std::deque<StressFactory::Message*> writings;
+    std::string buffer;
+    size_t index = 0;
+    size_t left;
+    bool write_enabled = false;
+    size_t read_offset = 0, write_offset = 0;
+    bool first = true;
+    bool dead = false;
+    StressFactory::Message homeless_message;
+
+    class Client_read_handle : public EventCallback {
+      Client *c;
+     public:
+      Client_read_handle(Client *_c): c(_c) {}
+      void do_request(int id) override {
+        c->do_read_request();
+      }
+    } read_ctxt;
+
+    class Client_write_handle : public EventCallback {
+      Client *c;
+     public:
+      Client_write_handle(Client *_c): c(_c) {}
+      void do_request(int id) override {
+        c->do_write_request();
+      }
+    } write_ctxt;
+
+   public:
+    Client(StressFactory *f, EventCenter *cen, ConnectedSocket s, size_t c)
+        : factory(f), center(cen), socket(std::move(s)), left(c), homeless_message(factory->rs, -1, 1024),
+          read_ctxt(this), write_ctxt(this) {
+      center->create_file_event(
+              socket.fd(), EVENT_READABLE, &read_ctxt);
+      center->dispatch_event_external(&read_ctxt);
+    }
+    void close() {
+      ASSERT_FALSE(write_enabled);
+      dead = true;
+      socket.shutdown();
+      center->delete_file_event(socket.fd(), EVENT_READABLE);
+      center->dispatch_event_external(new C_delete<Client>(this));
+    }
+
+    void do_read_request() {
+      if (dead)
+        return ;
+      ASSERT_TRUE(socket.is_connected() >= 0);
+      if (!socket.is_connected())
+        return ;
+      ASSERT_TRUE(!acking.empty() || first);
+      if (first) {
+        first = false;
+        center->dispatch_event_external(&write_ctxt);
+        if (acking.empty())
+          return ;
+      }
+      StressFactory::Message *m = acking.front();
+      int r = 0;
+      if (buffer.empty())
+        buffer.resize(m->len);
+      bool must_no = false;
+      while (true) {
+        r = socket.read((char*)buffer.data() + read_offset,
+                        m->len - read_offset);
+        ASSERT_TRUE(r == -EAGAIN || r > 0);
+        if (r == -EAGAIN)
+          break;
+        read_offset += r;
+
+        std::cerr << " client " << this << " receive " << m->idx << " len " << r << " content: "  << std::endl;
+        ASSERT_FALSE(must_no);
+        if ((m->len - read_offset) == 0) {
+          ASSERT_TRUE(m->verify(buffer.data(), 0));
+          delete m;
+          acking.pop_front();
+          read_offset = 0;
+          buffer.clear();
+          if (acking.empty()) {
+            m = &homeless_message;
+            must_no = true;
+          } else {
+            m = acking.front();
+            buffer.resize(m->len);
+          }
+        }
+      }
+      if (acking.empty()) {
+        center->dispatch_event_external(&write_ctxt);
+        return ;
+      }
+    }
+
+    void do_write_request() {
+      if (dead)
+        return ;
+      ASSERT_TRUE(socket.is_connected() > 0);
+
+      while (left > 0 && factory->queue_depth > writings.size() + acking.size()) {
+        StressFactory::Message *m = new StressFactory::Message(
+                factory->rs, ++index,
+                factory->rd() % factory->max_message_length);
+        std::cerr << " client " << this << " generate message " << m->idx << " length " << m->len << std::endl;
+        ASSERT_EQ(m->len, m->content.size());
+        writings.push_back(m);
+        --left;
+        --factory->message_left;
+      }
+
+      while (!writings.empty()) {
+        StressFactory::Message *m = writings.front();
+        bufferlist bl;
+        bl.append(m->content.data() + write_offset, m->content.size() - write_offset);
+        ssize_t r = socket.send(bl, false);
+        if (r == 0)
+          break;
+        std::cerr << " client " << this << " send " << m->idx << " len " << r << " content: " << std::endl;
+        ASSERT_TRUE(r >= 0);
+        write_offset += r;
+        if (write_offset == m->content.size()) {
+          write_offset = 0;
+          writings.pop_front();
+          acking.push_back(m);
+        }
+      }
+      if (writings.empty() && write_enabled) {
+        center->delete_file_event(socket.fd(), EVENT_WRITABLE);
+        write_enabled = false;
+      } else if (!writings.empty() && !write_enabled) {
+        ASSERT_EQ(0, center->create_file_event(
+                  socket.fd(), EVENT_WRITABLE, &write_ctxt));
+        write_enabled = true;
+      }
+    }
+
+    bool finish() const {
+      return left == 0 && acking.empty() && writings.empty();
+    }
+  };
+  friend class Client;
+
+  class Server {
+    StressFactory *factory;
+    EventCenter *center;
+    ConnectedSocket socket;
+    std::deque<std::string> buffers;
+    bool write_enabled = false;
+    bool dead = false;
+
+    class Server_read_handle : public EventCallback {
+      Server *s;
+     public:
+      Server_read_handle(Server *_s): s(_s) {}
+      void do_request(int id) override {
+        s->do_read_request();
+      }
+    } read_ctxt;
+
+    class Server_write_handle : public EventCallback {
+      Server *s;
+     public:
+      Server_write_handle(Server *_s): s(_s) {}
+      void do_request(int id) override {
+        s->do_write_request();
+      }
+    } write_ctxt;
+
+   public:
+    Server(StressFactory *f, EventCenter *c, ConnectedSocket s):
+        factory(f), center(c), socket(std::move(s)), read_ctxt(this), write_ctxt(this) {
+      center->create_file_event(socket.fd(), EVENT_READABLE, &read_ctxt);
+      center->dispatch_event_external(&read_ctxt);
+    }
+    void close() {
+      ASSERT_FALSE(write_enabled);
+      socket.shutdown();
+      center->delete_file_event(socket.fd(), EVENT_READABLE);
+      center->dispatch_event_external(new C_delete<Server>(this));
+    }
+    void do_read_request() {
+      if (dead)
+        return ;
+      int r = 0;
+      while (true) {
+        char buf[4096];
+        bufferptr data;
+        if (factory->zero_copy_read) {
+          r = socket.zero_copy_read(data);
+        } else {
+          r = socket.read(buf, sizeof(buf));
+        }
+        ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf)));
+        if (r == 0) {
+          ASSERT_TRUE(buffers.empty());
+          dead = true;
+          return ;
+        } else if (r == -EAGAIN)
+          break;
+        if (factory->zero_copy_read) {
+          buffers.emplace_back(data.c_str(), 0, data.length());
+        } else {
+          buffers.emplace_back(buf, 0, r);
+        }
+        std::cerr << " server " << this << " receive " << r << " content: " << std::endl;
+      }
+      if (!buffers.empty() && !write_enabled)
+        center->dispatch_event_external(&write_ctxt);
+    }
+
+    void do_write_request() {
+      if (dead)
+        return ;
+
+      while (!buffers.empty()) {
+        bufferlist bl;
+        auto it = buffers.begin();
+        for (size_t i = 0; i < buffers.size(); ++i) {
+          bl.push_back(bufferptr((char*)it->data(), it->size()));
+          ++it;
+        }
+
+        ssize_t r = socket.send(bl, false);
+        std::cerr << " server " << this << " send " << r << std::endl;
+        if (r == 0)
+          break;
+        ASSERT_TRUE(r >= 0);
+        while (r > 0) {
+          ASSERT_TRUE(!buffers.empty());
+          string &buffer = buffers.front();
+          if (r >= (int)buffer.size()) {
+            r -= (int)buffer.size();
+            buffers.pop_front();
+          } else {
+           std::cerr << " server " << this << " sent " << r << std::endl;
+            buffer = buffer.substr(r, buffer.size());
+            break;
+          }
+        }
+      }
+      if (buffers.empty()) {
+        if (write_enabled) {
+          center->delete_file_event(socket.fd(), EVENT_WRITABLE);
+          write_enabled = false;
+        }
+      } else if (!write_enabled) {
+        ASSERT_EQ(0, center->create_file_event(
+                  socket.fd(), EVENT_WRITABLE, &write_ctxt));
+        write_enabled = true;
+      }
+    }
+
+    bool finish() {
+     return dead;
+    }
+  };
+  friend class Server;
+
+  class C_accept : public EventCallback {
+    StressFactory *factory;
+    ServerSocket bind_socket;
+    ThreadData *t_data;
+    Worker *worker;
+
+   public:
+    C_accept(StressFactory *f, ServerSocket s, ThreadData *data, Worker *w)
+        : factory(f), bind_socket(std::move(s)), t_data(data), worker(w) {}
+    void do_request(int id) override {
+      while (true) {
+        entity_addr_t cli_addr;
+        ConnectedSocket srv_socket;
+        SocketOptions options;
+        int r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
+        if (r == -EAGAIN) {
+          break;
+        }
+        ASSERT_EQ(0, r);
+        ASSERT_TRUE(srv_socket.fd() > 0);
+        Server *cb = new Server(factory, &t_data->worker->center, std::move(srv_socket));
+        t_data->servers.insert(cb);
+      }
+    }
+  };
+  friend class C_accept;
+
+ public:
+  static const size_t min_client_send_messages = 100;
+  static const size_t max_client_send_messages = 1000;
+  std::shared_ptr<NetworkStack> stack;
+  RandomString rs;
+  std::random_device rd;
+  const size_t client_num, queue_depth, max_message_length;
+  atomic_int message_count, message_left;
+  entity_addr_t bind_addr;
+  std::atomic_bool already_bind = {false};
+  bool zero_copy_read;
+  SocketOptions options;
+
+  explicit StressFactory(std::shared_ptr<NetworkStack> s, const string &addr,
+                         size_t cli, size_t qd, size_t mc, size_t l, bool zero_copy)
+      : stack(s), rs(128), client_num(cli), queue_depth(qd),
+        max_message_length(l), message_count(mc), message_left(mc),
+        zero_copy_read(zero_copy) {
+    bind_addr.parse(addr.c_str());
+    rs.prepare(100);
+  }
+  ~StressFactory() {
+  }
+
+  void add_client(ThreadData *t_data) {
+    static Mutex lock("add_client_lock");
+    Mutex::Locker l(lock);
+    ConnectedSocket sock;
+    int r = t_data->worker->connect(bind_addr, options, &sock);
+    std::default_random_engine rng(rd());
+    std::uniform_int_distribution<> dist(
+            min_client_send_messages, max_client_send_messages);
+    ASSERT_EQ(0, r);
+    int c = dist(rng);
+    if (c > message_count.load())
+      c = message_count.load();
+    Client *cb = new Client(this, &t_data->worker->center, std::move(sock), c);
+    t_data->clients.insert(cb);
+    message_count -= c;
+  }
+
+  void drop_client(ThreadData *t_data, Client *c) {
+    c->close();
+    ASSERT_EQ(1U, t_data->clients.erase(c));
+  }
+
+  void drop_server(ThreadData *t_data, Server *s) {
+    s->close();
+    ASSERT_EQ(1U, t_data->servers.erase(s));
+  }
+
+  void start(Worker *worker) {
+    int r = 0;
+    ThreadData t_data;
+    t_data.worker = worker;
+    ServerSocket bind_socket;
+    if (stack->support_local_listen_table() || worker->id == 0) {
+      r = worker->listen(bind_addr, options, &bind_socket);
+      ASSERT_EQ(0, r);
+      already_bind = true;
+    }
+    while (!already_bind)
+      usleep(50);
+    C_accept *accept_handler = nullptr;
+    int bind_fd = 0;
+    if (bind_socket) {
+      bind_fd = bind_socket.fd();
+      accept_handler = new C_accept(this, std::move(bind_socket), &t_data, worker);
+      ASSERT_EQ(0, worker->center.create_file_event(
+                  bind_fd, EVENT_READABLE, accept_handler));
+    }
+
+    int echo_throttle = message_count;
+    while (message_count > 0 || !t_data.clients.empty() || !t_data.servers.empty()) {
+      if (message_count > 0  && t_data.clients.size() < client_num && t_data.servers.size() < client_num)
+        add_client(&t_data);
+      for (auto &&c : t_data.clients) {
+        if (c->finish()) {
+          drop_client(&t_data, c);
+          break;
+        }
+      }
+      for (auto &&s : t_data.servers) {
+        if (s->finish()) {
+          drop_server(&t_data, s);
+          break;
+        }
+      }
+
+      worker->center.process_events(1);
+      if (echo_throttle > message_left) {
+        std::cerr << " clients " << t_data.clients.size() << " servers " << t_data.servers.size()
+                  << " message count " << message_left << std::endl;
+        echo_throttle -= 100;
+      }
+    }
+    if (bind_fd)
+      worker->center.delete_file_event(bind_fd, EVENT_READABLE);
+    delete accept_handler;
+  }
+};
+
+TEST_P(NetworkWorkerTest, StressTest) {
+  StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024,
+                        strncmp(GetParam(), "dpdk", 4) == 0);
+  StressFactory *f = &factory;
+  exec_events([f](Worker *worker) mutable {
+    f->start(worker);
+  });
+  ASSERT_EQ(0, factory.message_left);
+}
+
+
+INSTANTIATE_TEST_CASE_P(
+  NetworkStack,
+  NetworkWorkerTest,
+  ::testing::Values(
+#ifdef HAVE_DPDK
+    "dpdk",
+#endif
+    "posix"
+  )
+);
+
+#else
+
+// Google Test may not support value-parameterized tests with some
+// compilers. If we use conditional compilation to compile out all
+// code referring to the gtest_main library, MSVC linker will not link
+// that library at all and consequently complain about missing entry
+// point defined in that library (fatal error LNK1561: entry point
+// must be defined). This dummy test keeps gtest_main linked in.
+TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
+
+#endif
+
+
+/*
+ * Local Variables:
+ * compile-command: "cd ../.. ; make ceph_test_async_networkstack &&
+ *    ./ceph_test_async_networkstack
+ *
+ * End:
+ */