1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSky <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
24 #include <gtest/gtest.h>
27 #include "include/Context.h"
29 #include "msg/async/Event.h"
30 #include "msg/async/Stack.h"
33 #if GTEST_HAS_PARAM_TEST
35 class NetworkWorkerTest : public ::testing::TestWithParam<const char*> {
37 std::shared_ptr<NetworkStack> stack;
38 string addr, port_addr;
40 NetworkWorkerTest() {}
41 void SetUp() override {
42 cerr << __func__ << " start set up " << GetParam() << std::endl;
43 if (strncmp(GetParam(), "dpdk", 4)) {
44 g_ceph_context->_conf->set_val("ms_type", "async+posix", false);
45 addr = "127.0.0.1:15000";
46 port_addr = "127.0.0.1:15001";
48 g_ceph_context->_conf->set_val("ms_type", "async+dpdk", false);
49 g_ceph_context->_conf->set_val("ms_dpdk_debug_allow_loopback", "true", false);
50 g_ceph_context->_conf->set_val("ms_async_op_threads", "2", false);
51 g_ceph_context->_conf->set_val("ms_dpdk_coremask", "0x7", false);
52 g_ceph_context->_conf->set_val("ms_dpdk_host_ipv4_addr", "172.16.218.3", false);
53 g_ceph_context->_conf->set_val("ms_dpdk_gateway_ipv4_addr", "172.16.218.2", false);
54 g_ceph_context->_conf->set_val("ms_dpdk_netmask_ipv4_addr", "255.255.255.0", false);
55 addr = "172.16.218.3:15000";
56 port_addr = "172.16.218.3:15001";
58 stack = NetworkStack::create(g_ceph_context, GetParam());
61 void TearDown() override {
64 string get_addr() const {
67 string get_ip_different_port() const {
70 string get_different_ip() const {
71 return "10.0.123.100:4323";
73 EventCenter *get_center(unsigned i) {
74 return &stack->get_worker(i)->center;
76 Worker *get_worker(unsigned i) {
77 return stack->get_worker(i);
79 template<typename func>
80 class C_dispatch : public EventCallback {
83 std::atomic_bool done;
85 C_dispatch(Worker *w, func &&_f): worker(w), f(std::move(_f)), done(false) {}
86 void do_request(int id) override {
91 int us = 1000 * 1000 * 1000;
99 template<typename func>
100 void exec_events(func &&f) {
101 std::vector<C_dispatch<func>*> dis;
102 for (unsigned i = 0; i < stack->get_num_worker(); ++i) {
103 Worker *w = stack->get_worker(i);
104 C_dispatch<func> *e = new C_dispatch<func>(w, std::move(f));
105 stack->get_worker(i)->center.dispatch_event_external(e);
109 for (auto &&e : dis) {
116 class C_poll : public EventCallback {
118 std::atomic<bool> woken;
119 static const int sleepus = 500;
122 C_poll(EventCenter *c): center(c), woken(false) {}
123 void do_request(int r) override {
126 bool poll(int milliseconds) {
127 auto start = ceph::coarse_real_clock::now();
129 center->process_events(sleepus);
131 auto r = std::chrono::duration_cast<std::chrono::milliseconds>(
132 ceph::coarse_real_clock::now() - start);
133 if (r >= std::chrono::milliseconds(milliseconds))
143 TEST_P(NetworkWorkerTest, SimpleTest) {
144 entity_addr_t bind_addr;
145 ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
146 std::atomic_bool accepted(false);
147 std::atomic_bool *accepted_p = &accepted;
149 exec_events([this, accepted_p, bind_addr](Worker *worker) mutable {
150 entity_addr_t cli_addr;
151 SocketOptions options;
152 ServerSocket bind_socket;
153 EventCenter *center = &worker->center;
155 if (stack->support_local_listen_table() || worker->id == 0)
156 r = worker->listen(bind_addr, options, &bind_socket);
159 ConnectedSocket cli_socket, srv_socket;
160 if (worker->id == 0) {
161 r = worker->connect(bind_addr, options, &cli_socket);
165 bool is_my_accept = false;
168 center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
173 ASSERT_TRUE(*accepted_p);
174 center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
178 r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
180 ASSERT_TRUE(srv_socket.fd() > 0);
183 if (worker->id == 0) {
185 center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
186 r = cli_socket.is_connected();
188 ASSERT_EQ(true, cb.poll(500));
189 r = cli_socket.is_connected();
192 center->delete_file_event(cli_socket.fd(), EVENT_READABLE);
195 const char *message = "this is a new message";
196 int len = strlen(message);
198 bl.append(message, len);
199 if (worker->id == 0) {
200 r = cli_socket.send(bl, false);
207 center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb);
209 r = srv_socket.read(buf, sizeof(buf));
210 while (r == -EAGAIN) {
211 ASSERT_TRUE(cb.poll(500));
212 r = srv_socket.read(buf, sizeof(buf));
216 ASSERT_EQ(0, memcmp(buf, message, len));
218 bind_socket.abort_accept();
220 if (worker->id == 0) {
221 cli_socket.shutdown();
222 // ack delay is 200 ms
226 bl.append(message, len);
227 if (worker->id == 0) {
228 r = cli_socket.send(bl, false);
229 ASSERT_EQ(-EPIPE, r);
233 ASSERT_TRUE(cb.poll(500));
234 r = srv_socket.read(buf, sizeof(buf));
237 ASSERT_TRUE(cb.poll(1000*500));
238 r = srv_socket.read(buf, sizeof(buf));
241 center->delete_file_event(srv_socket.fd(), EVENT_READABLE);
247 TEST_P(NetworkWorkerTest, ConnectFailedTest) {
248 entity_addr_t bind_addr;
249 ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
251 exec_events([this, bind_addr](Worker *worker) mutable {
252 EventCenter *center = &worker->center;
253 entity_addr_t cli_addr;
254 SocketOptions options;
255 ServerSocket bind_socket;
257 if (stack->support_local_listen_table() || worker->id == 0)
258 r = worker->listen(bind_addr, options, &bind_socket);
261 ConnectedSocket cli_socket1, cli_socket2;
262 if (worker->id == 0) {
263 ASSERT_TRUE(cli_addr.parse(get_ip_different_port().c_str()));
264 r = worker->connect(cli_addr, options, &cli_socket1);
267 center->create_file_event(cli_socket1.fd(), EVENT_READABLE, &cb);
268 r = cli_socket1.is_connected();
270 ASSERT_TRUE(cb.poll(500));
271 r = cli_socket1.is_connected();
273 ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET);
276 if (worker->id == 1) {
277 ASSERT_TRUE(cli_addr.parse(get_different_ip().c_str()));
278 r = worker->connect(cli_addr, options, &cli_socket2);
281 center->create_file_event(cli_socket2.fd(), EVENT_READABLE, &cb);
282 r = cli_socket2.is_connected();
285 r = cli_socket2.is_connected();
288 center->delete_file_event(cli_socket2.fd(), EVENT_READABLE);
293 TEST_P(NetworkWorkerTest, ListenTest) {
294 Worker *worker = get_worker(0);
295 entity_addr_t bind_addr;
296 ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
297 SocketOptions options;
298 ServerSocket bind_socket1, bind_socket2;
299 int r = worker->listen(bind_addr, options, &bind_socket1);
302 r = worker->listen(bind_addr, options, &bind_socket2);
303 ASSERT_EQ(-EADDRINUSE, r);
306 TEST_P(NetworkWorkerTest, AcceptAndCloseTest) {
307 entity_addr_t bind_addr;
308 ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
309 std::atomic_bool accepted(false);
310 std::atomic_bool *accepted_p = &accepted;
311 std::atomic_int unbind_count(stack->get_num_worker());
312 std::atomic_int *count_p = &unbind_count;
313 exec_events([this, bind_addr, accepted_p, count_p](Worker *worker) mutable {
314 SocketOptions options;
315 EventCenter *center = &worker->center;
316 entity_addr_t cli_addr;
319 ServerSocket bind_socket;
320 if (stack->support_local_listen_table() || worker->id == 0)
321 r = worker->listen(bind_addr, options, &bind_socket);
324 ConnectedSocket srv_socket, cli_socket;
326 r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
327 ASSERT_EQ(-EAGAIN, r);
331 if (worker->id == 0) {
332 center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
333 r = worker->connect(bind_addr, options, &cli_socket);
335 ASSERT_TRUE(cb.poll(500));
341 ConnectedSocket srv_socket2;
343 r = bind_socket.accept(&srv_socket2, options, &cli_addr, worker);
345 } while (r == -EAGAIN && !*accepted_p);
348 ASSERT_TRUE(*accepted_p);
349 // srv_socket2 closed
350 center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
353 if (worker->id == 0) {
356 center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
359 ASSERT_TRUE(cb.poll(500));
360 r = cli_socket.read(buf, sizeof(buf));
365 center->delete_file_event(cli_socket.fd(), EVENT_READABLE);
369 center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
370 if (worker->id == 0) {
372 r = worker->connect(bind_addr, options, &cli_socket);
375 ASSERT_TRUE(cb.poll(500));
381 r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
383 } while (r == -EAGAIN && !*accepted_p);
386 ASSERT_TRUE(*accepted_p);
387 center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
396 ConnectedSocket cli_socket;
397 r = worker->connect(bind_addr, options, &cli_socket);
401 center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
402 r = cli_socket.is_connected();
404 ASSERT_TRUE(cb.poll(500));
405 r = cli_socket.is_connected();
407 ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET);
412 TEST_P(NetworkWorkerTest, ComplexTest) {
413 entity_addr_t bind_addr;
414 std::atomic_bool listen_done(false);
415 std::atomic_bool *listen_p = &listen_done;
416 std::atomic_bool accepted(false);
417 std::atomic_bool *accepted_p = &accepted;
418 std::atomic_bool done(false);
419 std::atomic_bool *done_p = &done;
420 ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
421 exec_events([this, bind_addr, listen_p, accepted_p, done_p](Worker *worker) mutable {
422 entity_addr_t cli_addr;
423 EventCenter *center = &worker->center;
424 SocketOptions options;
425 ServerSocket bind_socket;
427 if (stack->support_local_listen_table() || worker->id == 0) {
428 r = worker->listen(bind_addr, options, &bind_socket);
432 ConnectedSocket cli_socket, srv_socket;
433 if (worker->id == 1) {
436 r = worker->connect(bind_addr, options, &cli_socket);
443 center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
447 r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
453 ASSERT_TRUE(*accepted_p);
454 center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
457 if (worker->id == 1) {
459 center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb);
460 r = cli_socket.is_connected();
462 ASSERT_TRUE(cb.poll(500));
463 r = cli_socket.is_connected();
466 center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE);
469 const size_t message_size = 10240;
471 string message(message_size, '!');
472 for (size_t i = 0; i < message_size; i += 100)
474 size_t len = message_size * count;
477 center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb);
479 center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb);
486 for (size_t i = 0; i < count; ++i)
487 bl.push_back(bufferptr((char*)message.data(), message_size));
490 if (worker->id == 1) {
495 r = cli_socket.send(bl, false);
496 ASSERT_TRUE(r >= 0 || r == -EAGAIN);
504 left = message_size * count;
505 ASSERT_EQ(0U, bl.length());
506 for (size_t i = 0; i < count; ++i)
507 bl.push_back(bufferptr((char*)message.data(), message_size));
515 r = srv_socket.read(buf, sizeof(buf));
516 ASSERT_TRUE(r > 0 || r == -EAGAIN);
518 read_string.append(buf, r);
520 } else if (r == -EAGAIN) {
525 for (size_t i = 0; i < read_string.size(); i += message_size)
526 ASSERT_EQ(0, memcmp(read_string.c_str()+i, message.c_str(), message_size));
536 center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE);
538 center->delete_file_event(srv_socket.fd(), EVENT_READABLE);
541 bind_socket.abort_accept();
549 class StressFactory {
554 std::set<Client*> clients;
555 std::set<Server*> servers;
557 for (auto && i : clients)
559 for (auto && i : servers)
564 struct RandomString {
566 vector<std::string> strs;
567 std::random_device rd;
568 std::default_random_engine rng;
570 RandomString(size_t s): slen(s), rng(rd()) {}
571 void prepare(size_t n) {
572 static const char alphabet[] =
573 "abcdefghijklmnopqrstuvwxyz"
574 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
577 std::uniform_int_distribution<> dist(
578 0, sizeof(alphabet) / sizeof(*alphabet) - 2);
582 std::back_inserter(strs), strs.capacity(), [&] {
585 std::generate_n(std::back_inserter(str), slen, [&]() {
586 return alphabet[dist(rng)];
592 std::string &get_random_string() {
593 std::uniform_int_distribution<> dist(
595 return strs[dist(rng)];
603 explicit Message(RandomString &rs, size_t i, size_t l): idx(i) {
604 size_t slen = rs.slen;
605 len = std::max(slen, l);
607 std::vector<std::string> strs;
608 strs.reserve(len / slen);
610 std::back_inserter(strs), strs.capacity(), [&] {
611 return rs.get_random_string();
614 len = slen * strs.size();
615 content.reserve(len);
616 for (auto &&s : strs)
619 bool verify(const char *b, size_t len = 0) const {
620 return content.compare(0, len, b, 0, len) == 0;
624 template <typename T>
625 class C_delete : public EventCallback {
628 C_delete(T *c): ctxt(c) {}
629 void do_request(int id) override {
636 StressFactory *factory;
638 ConnectedSocket socket;
639 std::deque<StressFactory::Message*> acking;
640 std::deque<StressFactory::Message*> writings;
644 bool write_enabled = false;
645 size_t read_offset = 0, write_offset = 0;
648 StressFactory::Message homeless_message;
650 class Client_read_handle : public EventCallback {
653 Client_read_handle(Client *_c): c(_c) {}
654 void do_request(int id) override {
655 c->do_read_request();
659 class Client_write_handle : public EventCallback {
662 Client_write_handle(Client *_c): c(_c) {}
663 void do_request(int id) override {
664 c->do_write_request();
669 Client(StressFactory *f, EventCenter *cen, ConnectedSocket s, size_t c)
670 : factory(f), center(cen), socket(std::move(s)), left(c), homeless_message(factory->rs, -1, 1024),
671 read_ctxt(this), write_ctxt(this) {
672 center->create_file_event(
673 socket.fd(), EVENT_READABLE, &read_ctxt);
674 center->dispatch_event_external(&read_ctxt);
677 ASSERT_FALSE(write_enabled);
680 center->delete_file_event(socket.fd(), EVENT_READABLE);
681 center->dispatch_event_external(new C_delete<Client>(this));
684 void do_read_request() {
687 ASSERT_TRUE(socket.is_connected() >= 0);
688 if (!socket.is_connected())
690 ASSERT_TRUE(!acking.empty() || first);
693 center->dispatch_event_external(&write_ctxt);
697 StressFactory::Message *m = acking.front();
700 buffer.resize(m->len);
701 bool must_no = false;
703 r = socket.read((char*)buffer.data() + read_offset,
704 m->len - read_offset);
705 ASSERT_TRUE(r == -EAGAIN || r > 0);
710 std::cerr << " client " << this << " receive " << m->idx << " len " << r << " content: " << std::endl;
711 ASSERT_FALSE(must_no);
712 if ((m->len - read_offset) == 0) {
713 ASSERT_TRUE(m->verify(buffer.data(), 0));
718 if (acking.empty()) {
719 m = &homeless_message;
723 buffer.resize(m->len);
727 if (acking.empty()) {
728 center->dispatch_event_external(&write_ctxt);
733 void do_write_request() {
736 ASSERT_TRUE(socket.is_connected() > 0);
738 while (left > 0 && factory->queue_depth > writings.size() + acking.size()) {
739 StressFactory::Message *m = new StressFactory::Message(
740 factory->rs, ++index,
741 factory->rd() % factory->max_message_length);
742 std::cerr << " client " << this << " generate message " << m->idx << " length " << m->len << std::endl;
743 ASSERT_EQ(m->len, m->content.size());
744 writings.push_back(m);
746 --factory->message_left;
749 while (!writings.empty()) {
750 StressFactory::Message *m = writings.front();
752 bl.append(m->content.data() + write_offset, m->content.size() - write_offset);
753 ssize_t r = socket.send(bl, false);
756 std::cerr << " client " << this << " send " << m->idx << " len " << r << " content: " << std::endl;
759 if (write_offset == m->content.size()) {
761 writings.pop_front();
765 if (writings.empty() && write_enabled) {
766 center->delete_file_event(socket.fd(), EVENT_WRITABLE);
767 write_enabled = false;
768 } else if (!writings.empty() && !write_enabled) {
769 ASSERT_EQ(0, center->create_file_event(
770 socket.fd(), EVENT_WRITABLE, &write_ctxt));
771 write_enabled = true;
775 bool finish() const {
776 return left == 0 && acking.empty() && writings.empty();
782 StressFactory *factory;
784 ConnectedSocket socket;
785 std::deque<std::string> buffers;
786 bool write_enabled = false;
789 class Server_read_handle : public EventCallback {
792 Server_read_handle(Server *_s): s(_s) {}
793 void do_request(int id) override {
794 s->do_read_request();
798 class Server_write_handle : public EventCallback {
801 Server_write_handle(Server *_s): s(_s) {}
802 void do_request(int id) override {
803 s->do_write_request();
808 Server(StressFactory *f, EventCenter *c, ConnectedSocket s):
809 factory(f), center(c), socket(std::move(s)), read_ctxt(this), write_ctxt(this) {
810 center->create_file_event(socket.fd(), EVENT_READABLE, &read_ctxt);
811 center->dispatch_event_external(&read_ctxt);
814 ASSERT_FALSE(write_enabled);
816 center->delete_file_event(socket.fd(), EVENT_READABLE);
817 center->dispatch_event_external(new C_delete<Server>(this));
819 void do_read_request() {
826 if (factory->zero_copy_read) {
827 r = socket.zero_copy_read(data);
829 r = socket.read(buf, sizeof(buf));
831 ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf)));
833 ASSERT_TRUE(buffers.empty());
836 } else if (r == -EAGAIN)
838 if (factory->zero_copy_read) {
839 buffers.emplace_back(data.c_str(), 0, data.length());
841 buffers.emplace_back(buf, 0, r);
843 std::cerr << " server " << this << " receive " << r << " content: " << std::endl;
845 if (!buffers.empty() && !write_enabled)
846 center->dispatch_event_external(&write_ctxt);
849 void do_write_request() {
853 while (!buffers.empty()) {
855 auto it = buffers.begin();
856 for (size_t i = 0; i < buffers.size(); ++i) {
857 bl.push_back(bufferptr((char*)it->data(), it->size()));
861 ssize_t r = socket.send(bl, false);
862 std::cerr << " server " << this << " send " << r << std::endl;
867 ASSERT_TRUE(!buffers.empty());
868 string &buffer = buffers.front();
869 if (r >= (int)buffer.size()) {
870 r -= (int)buffer.size();
873 std::cerr << " server " << this << " sent " << r << std::endl;
874 buffer = buffer.substr(r, buffer.size());
879 if (buffers.empty()) {
881 center->delete_file_event(socket.fd(), EVENT_WRITABLE);
882 write_enabled = false;
884 } else if (!write_enabled) {
885 ASSERT_EQ(0, center->create_file_event(
886 socket.fd(), EVENT_WRITABLE, &write_ctxt));
887 write_enabled = true;
897 class C_accept : public EventCallback {
898 StressFactory *factory;
899 ServerSocket bind_socket;
904 C_accept(StressFactory *f, ServerSocket s, ThreadData *data, Worker *w)
905 : factory(f), bind_socket(std::move(s)), t_data(data), worker(w) {}
906 void do_request(int id) override {
908 entity_addr_t cli_addr;
909 ConnectedSocket srv_socket;
910 SocketOptions options;
911 int r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
916 ASSERT_TRUE(srv_socket.fd() > 0);
917 Server *cb = new Server(factory, &t_data->worker->center, std::move(srv_socket));
918 t_data->servers.insert(cb);
922 friend class C_accept;
925 static const size_t min_client_send_messages = 100;
926 static const size_t max_client_send_messages = 1000;
927 std::shared_ptr<NetworkStack> stack;
929 std::random_device rd;
930 const size_t client_num, queue_depth, max_message_length;
931 atomic_int message_count, message_left;
932 entity_addr_t bind_addr;
933 std::atomic_bool already_bind = {false};
935 SocketOptions options;
937 explicit StressFactory(std::shared_ptr<NetworkStack> s, const string &addr,
938 size_t cli, size_t qd, size_t mc, size_t l, bool zero_copy)
939 : stack(s), rs(128), client_num(cli), queue_depth(qd),
940 max_message_length(l), message_count(mc), message_left(mc),
941 zero_copy_read(zero_copy) {
942 bind_addr.parse(addr.c_str());
948 void add_client(ThreadData *t_data) {
949 static Mutex lock("add_client_lock");
950 Mutex::Locker l(lock);
951 ConnectedSocket sock;
952 int r = t_data->worker->connect(bind_addr, options, &sock);
953 std::default_random_engine rng(rd());
954 std::uniform_int_distribution<> dist(
955 min_client_send_messages, max_client_send_messages);
958 if (c > message_count.load())
959 c = message_count.load();
960 Client *cb = new Client(this, &t_data->worker->center, std::move(sock), c);
961 t_data->clients.insert(cb);
965 void drop_client(ThreadData *t_data, Client *c) {
967 ASSERT_EQ(1U, t_data->clients.erase(c));
970 void drop_server(ThreadData *t_data, Server *s) {
972 ASSERT_EQ(1U, t_data->servers.erase(s));
975 void start(Worker *worker) {
978 t_data.worker = worker;
979 ServerSocket bind_socket;
980 if (stack->support_local_listen_table() || worker->id == 0) {
981 r = worker->listen(bind_addr, options, &bind_socket);
985 while (!already_bind)
987 C_accept *accept_handler = nullptr;
990 bind_fd = bind_socket.fd();
991 accept_handler = new C_accept(this, std::move(bind_socket), &t_data, worker);
992 ASSERT_EQ(0, worker->center.create_file_event(
993 bind_fd, EVENT_READABLE, accept_handler));
996 int echo_throttle = message_count;
997 while (message_count > 0 || !t_data.clients.empty() || !t_data.servers.empty()) {
998 if (message_count > 0 && t_data.clients.size() < client_num && t_data.servers.size() < client_num)
1000 for (auto &&c : t_data.clients) {
1002 drop_client(&t_data, c);
1006 for (auto &&s : t_data.servers) {
1008 drop_server(&t_data, s);
1013 worker->center.process_events(1);
1014 if (echo_throttle > message_left) {
1015 std::cerr << " clients " << t_data.clients.size() << " servers " << t_data.servers.size()
1016 << " message count " << message_left << std::endl;
1017 echo_throttle -= 100;
1021 worker->center.delete_file_event(bind_fd, EVENT_READABLE);
1022 delete accept_handler;
1026 TEST_P(NetworkWorkerTest, StressTest) {
1027 StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024,
1028 strncmp(GetParam(), "dpdk", 4) == 0);
1029 StressFactory *f = &factory;
1030 exec_events([f](Worker *worker) mutable {
1033 ASSERT_EQ(0, factory.message_left);
1037 INSTANTIATE_TEST_CASE_P(
1050 // Google Test may not support value-parameterized tests with some
1051 // compilers. If we use conditional compilation to compile out all
1052 // code referring to the gtest_main library, MSVC linker will not link
1053 // that library at all and consequently complain about missing entry
1054 // point defined in that library (fatal error LNK1561: entry point
1055 // must be defined). This dummy test keeps gtest_main linked in.
1056 TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
1063 * compile-command: "cd ../.. ; make ceph_test_async_networkstack &&
1064 * ./ceph_test_async_networkstack