Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / msgr / test_async_networkstack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 XSky <haomai@xsky.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #include <algorithm>
18 #include <atomic>
19 #include <iostream>
20 #include <random>
21 #include <string>
22 #include <set>
23 #include <vector>
24 #include <gtest/gtest.h>
25
26 #include "acconfig.h"
27 #include "include/Context.h"
28
29 #include "msg/async/Event.h"
30 #include "msg/async/Stack.h"
31
32
33 #if GTEST_HAS_PARAM_TEST
34
35 class NetworkWorkerTest : public ::testing::TestWithParam<const char*> {
36  public:
37   std::shared_ptr<NetworkStack> stack;
38   string addr, port_addr;
39
40   NetworkWorkerTest() {}
41   void SetUp() override {
42     cerr << __func__ << " start set up " << GetParam() << std::endl;
43     if (strncmp(GetParam(), "dpdk", 4)) {
44       g_ceph_context->_conf->set_val("ms_type", "async+posix", false);
45       addr = "127.0.0.1:15000";
46       port_addr = "127.0.0.1:15001";
47     } else {
48       g_ceph_context->_conf->set_val("ms_type", "async+dpdk", false);
49       g_ceph_context->_conf->set_val("ms_dpdk_debug_allow_loopback", "true", false);
50       g_ceph_context->_conf->set_val("ms_async_op_threads", "2", false);
51       g_ceph_context->_conf->set_val("ms_dpdk_coremask", "0x7", false);
52       g_ceph_context->_conf->set_val("ms_dpdk_host_ipv4_addr", "172.16.218.3", false);
53       g_ceph_context->_conf->set_val("ms_dpdk_gateway_ipv4_addr", "172.16.218.2", false);
54       g_ceph_context->_conf->set_val("ms_dpdk_netmask_ipv4_addr", "255.255.255.0", false);
55       addr = "172.16.218.3:15000";
56       port_addr = "172.16.218.3:15001";
57     }
58     stack = NetworkStack::create(g_ceph_context, GetParam());
59     stack->start();
60   }
61   void TearDown() override {
62     stack->stop();
63   }
64   string get_addr() const {
65     return addr;
66   }
67   string get_ip_different_port() const {
68     return port_addr;
69   }
70   string get_different_ip() const {
71     return "10.0.123.100:4323";
72   }
73   EventCenter *get_center(unsigned i) {
74     return &stack->get_worker(i)->center;
75   }
76   Worker *get_worker(unsigned i) {
77     return stack->get_worker(i);
78   }
79   template<typename func>
80   class C_dispatch : public EventCallback {
81     Worker *worker;
82     func f;
83     std::atomic_bool done;
84    public:
85     C_dispatch(Worker *w, func &&_f): worker(w), f(std::move(_f)), done(false) {}
86     void do_request(int id) override {
87       f(worker);
88       done = true;
89     }
90     void wait() {
91       int us = 1000 * 1000 * 1000;
92       while (!done) {
93         ASSERT_TRUE(us > 0);
94         usleep(100);
95         us -= 100;
96       }
97     }
98   };
99   template<typename func>
100   void exec_events(func &&f) {
101     std::vector<C_dispatch<func>*> dis;
102     for (unsigned i = 0; i < stack->get_num_worker(); ++i) {
103       Worker *w = stack->get_worker(i);
104       C_dispatch<func> *e = new C_dispatch<func>(w, std::move(f));
105       stack->get_worker(i)->center.dispatch_event_external(e);
106       dis.push_back(e);
107     }
108
109     for (auto &&e : dis) {
110       e->wait();
111       delete e;
112     }
113   }
114 };
115
116 class C_poll : public EventCallback {
117   EventCenter *center;
118   std::atomic<bool> woken;
119   static const int sleepus = 500;
120
121  public:
122   C_poll(EventCenter *c): center(c), woken(false) {}
123   void do_request(int r) override {
124     woken = true;
125   }
126   bool poll(int milliseconds) {
127     auto start = ceph::coarse_real_clock::now();
128     while (!woken) {
129       center->process_events(sleepus);
130       usleep(sleepus);
131       auto r = std::chrono::duration_cast<std::chrono::milliseconds>(
132               ceph::coarse_real_clock::now() - start);
133       if (r >= std::chrono::milliseconds(milliseconds))
134         break;
135     }
136     return woken;
137   }
138   void reset() {
139     woken = false;
140   }
141 };
142
143 TEST_P(NetworkWorkerTest, SimpleTest) {
144   entity_addr_t bind_addr;
145   ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
146   std::atomic_bool accepted(false);
147   std::atomic_bool *accepted_p = &accepted;
148
149   exec_events([this, accepted_p, bind_addr](Worker *worker) mutable {
150     entity_addr_t cli_addr;
151     SocketOptions options;
152     ServerSocket bind_socket;
153     EventCenter *center = &worker->center;
154     ssize_t r = 0;
155     if (stack->support_local_listen_table() || worker->id == 0)
156       r = worker->listen(bind_addr, options, &bind_socket);
157     ASSERT_EQ(0, r);
158
159     ConnectedSocket cli_socket, srv_socket;
160     if (worker->id == 0) {
161       r = worker->connect(bind_addr, options, &cli_socket);
162       ASSERT_EQ(0, r);
163     }
164
165     bool is_my_accept = false;
166     if (bind_socket) {
167       C_poll cb(center);
168       center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
169       if (cb.poll(500)) {
170         *accepted_p = true;
171         is_my_accept = true;
172       }
173       ASSERT_TRUE(*accepted_p);
174       center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
175     }
176
177     if (is_my_accept) {
178       r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
179       ASSERT_EQ(0, r);
180       ASSERT_TRUE(srv_socket.fd() > 0);
181     }
182
183     if (worker->id == 0) {
184       C_poll cb(center);
185       center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
186       r = cli_socket.is_connected();
187       if (r == 0) {
188         ASSERT_EQ(true, cb.poll(500));
189         r = cli_socket.is_connected();
190       }
191       ASSERT_EQ(1, r);
192       center->delete_file_event(cli_socket.fd(), EVENT_READABLE);
193     }
194
195     const char *message = "this is a new message";
196     int len = strlen(message);
197     bufferlist bl;
198     bl.append(message, len);
199     if (worker->id == 0) {
200       r = cli_socket.send(bl, false);
201       ASSERT_EQ(len, r);
202     }
203
204     char buf[1024];
205     C_poll cb(center);
206     if (is_my_accept) {
207       center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb);
208       {
209         r = srv_socket.read(buf, sizeof(buf));
210         while (r == -EAGAIN) {
211           ASSERT_TRUE(cb.poll(500));
212           r = srv_socket.read(buf, sizeof(buf));
213           cb.reset();
214         }
215         ASSERT_EQ(len, r);
216         ASSERT_EQ(0, memcmp(buf, message, len));
217       }
218       bind_socket.abort_accept();
219     }
220     if (worker->id == 0) {
221       cli_socket.shutdown();
222       // ack delay is 200 ms
223     }
224
225     bl.clear();
226     bl.append(message, len);
227     if (worker->id == 0) {
228       r = cli_socket.send(bl, false);
229       ASSERT_EQ(-EPIPE, r);
230     }
231     if (is_my_accept) {
232       cb.reset();
233       ASSERT_TRUE(cb.poll(500));
234       r = srv_socket.read(buf, sizeof(buf));
235       if (r == -EAGAIN) {
236         cb.reset();
237         ASSERT_TRUE(cb.poll(1000*500));
238         r = srv_socket.read(buf, sizeof(buf));
239       }
240       ASSERT_EQ(0, r);
241       center->delete_file_event(srv_socket.fd(), EVENT_READABLE);
242       srv_socket.close();
243     }
244   });
245 }
246
247 TEST_P(NetworkWorkerTest, ConnectFailedTest) {
248   entity_addr_t bind_addr;
249   ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
250
251   exec_events([this, bind_addr](Worker *worker) mutable {
252     EventCenter *center = &worker->center;
253     entity_addr_t cli_addr;
254     SocketOptions options;
255     ServerSocket bind_socket;
256     int r = 0;
257     if (stack->support_local_listen_table() || worker->id == 0)
258       r = worker->listen(bind_addr, options, &bind_socket);
259     ASSERT_EQ(0, r);
260
261     ConnectedSocket cli_socket1, cli_socket2;
262     if (worker->id == 0) {
263       ASSERT_TRUE(cli_addr.parse(get_ip_different_port().c_str()));
264       r = worker->connect(cli_addr, options, &cli_socket1);
265       ASSERT_EQ(0, r);
266       C_poll cb(center);
267       center->create_file_event(cli_socket1.fd(), EVENT_READABLE, &cb);
268       r = cli_socket1.is_connected();
269       if (r == 0) {
270         ASSERT_TRUE(cb.poll(500));
271         r = cli_socket1.is_connected();
272       }
273       ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET);
274     }
275
276     if (worker->id == 1) {
277       ASSERT_TRUE(cli_addr.parse(get_different_ip().c_str()));
278       r = worker->connect(cli_addr, options, &cli_socket2);
279       ASSERT_EQ(0, r);
280       C_poll cb(center);
281       center->create_file_event(cli_socket2.fd(), EVENT_READABLE, &cb);
282       r = cli_socket2.is_connected();
283       if (r == 0) {
284         cb.poll(500);
285         r = cli_socket2.is_connected();
286       }
287       ASSERT_TRUE(r != 1);
288       center->delete_file_event(cli_socket2.fd(), EVENT_READABLE);
289     }
290   });
291 }
292
293 TEST_P(NetworkWorkerTest, ListenTest) {
294   Worker *worker = get_worker(0);
295   entity_addr_t bind_addr;
296   ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
297   SocketOptions options;
298   ServerSocket bind_socket1, bind_socket2;
299   int r = worker->listen(bind_addr, options, &bind_socket1);
300   ASSERT_EQ(0, r);
301
302   r = worker->listen(bind_addr, options, &bind_socket2);
303   ASSERT_EQ(-EADDRINUSE, r);
304 }
305
306 TEST_P(NetworkWorkerTest, AcceptAndCloseTest) {
307   entity_addr_t bind_addr;
308   ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
309   std::atomic_bool accepted(false);
310   std::atomic_bool *accepted_p = &accepted;
311   std::atomic_int unbind_count(stack->get_num_worker());
312   std::atomic_int *count_p = &unbind_count;
313   exec_events([this, bind_addr, accepted_p, count_p](Worker *worker) mutable {
314     SocketOptions options;
315     EventCenter *center = &worker->center;
316     entity_addr_t cli_addr;
317     int r = 0;
318     {
319       ServerSocket bind_socket;
320       if (stack->support_local_listen_table() || worker->id == 0)
321         r = worker->listen(bind_addr, options, &bind_socket);
322       ASSERT_EQ(0, r);
323
324       ConnectedSocket srv_socket, cli_socket;
325       if (bind_socket) {
326         r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
327         ASSERT_EQ(-EAGAIN, r);
328       }
329
330       C_poll cb(center);
331       if (worker->id == 0) {
332         center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
333         r = worker->connect(bind_addr, options, &cli_socket);
334         ASSERT_EQ(0, r);
335         ASSERT_TRUE(cb.poll(500));
336       }
337
338       if (bind_socket) {
339         cb.reset();
340         cb.poll(500);
341         ConnectedSocket srv_socket2;
342         do {
343           r = bind_socket.accept(&srv_socket2, options, &cli_addr, worker);
344           usleep(100);
345         } while (r == -EAGAIN && !*accepted_p);
346         if (r == 0)
347           *accepted_p = true;
348         ASSERT_TRUE(*accepted_p);
349         // srv_socket2 closed
350         center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
351       }
352
353       if (worker->id == 0) {
354         char buf[100];
355         cb.reset();
356         center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
357         int i = 3;
358         while (!i--) {
359           ASSERT_TRUE(cb.poll(500));
360           r = cli_socket.read(buf, sizeof(buf));
361           if (r == 0)
362             break;
363         }
364         ASSERT_EQ(0, r);
365         center->delete_file_event(cli_socket.fd(), EVENT_READABLE);
366       }
367
368       if (bind_socket)
369         center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
370       if (worker->id == 0) {
371         *accepted_p = false;
372         r = worker->connect(bind_addr, options, &cli_socket);
373         ASSERT_EQ(0, r);
374         cb.reset();
375         ASSERT_TRUE(cb.poll(500));
376         cli_socket.close();
377       }
378
379       if (bind_socket) {
380         do {
381           r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
382           usleep(100);
383         } while (r == -EAGAIN && !*accepted_p);
384         if (r == 0)
385           *accepted_p = true;
386         ASSERT_TRUE(*accepted_p);
387         center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
388       }
389       // unbind
390     }
391
392     --*count_p;
393     while (*count_p > 0)
394       usleep(100);
395
396     ConnectedSocket cli_socket;
397     r = worker->connect(bind_addr, options, &cli_socket);
398     ASSERT_EQ(0, r);
399     {
400       C_poll cb(center);
401       center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb);
402       r = cli_socket.is_connected();
403       if (r == 0) {
404         ASSERT_TRUE(cb.poll(500));
405         r = cli_socket.is_connected();
406       }
407       ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET);
408     }
409   });
410 }
411
412 TEST_P(NetworkWorkerTest, ComplexTest) {
413   entity_addr_t bind_addr;
414   std::atomic_bool listen_done(false);
415   std::atomic_bool *listen_p = &listen_done;
416   std::atomic_bool accepted(false);
417   std::atomic_bool *accepted_p = &accepted;
418   std::atomic_bool done(false);
419   std::atomic_bool *done_p = &done;
420   ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
421   exec_events([this, bind_addr, listen_p, accepted_p, done_p](Worker *worker) mutable {
422     entity_addr_t cli_addr;
423     EventCenter *center = &worker->center;
424     SocketOptions options;
425     ServerSocket bind_socket;
426     int r = 0;
427     if (stack->support_local_listen_table() || worker->id == 0) {
428       r = worker->listen(bind_addr, options, &bind_socket);
429       ASSERT_EQ(0, r);
430       *listen_p = true;
431     }
432     ConnectedSocket cli_socket, srv_socket;
433     if (worker->id == 1) {
434       while (!*listen_p) {
435         usleep(50);
436         r = worker->connect(bind_addr, options, &cli_socket);
437         ASSERT_EQ(0, r);
438       }
439     }
440
441     if (bind_socket) {
442       C_poll cb(center);
443       center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb);
444       int count = 3;
445       while (count--) {
446         if (cb.poll(500)) {
447           r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
448           ASSERT_EQ(0, r);
449           *accepted_p = true;
450           break;
451         }
452       }
453       ASSERT_TRUE(*accepted_p);
454       center->delete_file_event(bind_socket.fd(), EVENT_READABLE);
455     }
456
457     if (worker->id == 1) {
458       C_poll cb(center);
459       center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb);
460       r = cli_socket.is_connected();
461       if (r == 0) {
462         ASSERT_TRUE(cb.poll(500));
463         r = cli_socket.is_connected();
464       }
465       ASSERT_EQ(1, r);
466       center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE);
467     }
468
469     const size_t message_size = 10240;
470     size_t count = 100;
471     string message(message_size, '!');
472     for (size_t i = 0; i < message_size; i += 100)
473       message[i] = ',';
474     size_t len = message_size * count;
475     C_poll cb(center);
476     if (worker->id == 1)
477       center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb);
478     if (srv_socket)
479       center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb);
480     size_t left = len;
481     len *= 2;
482     string read_string;
483     int again_count = 0;
484     int c = 2;
485     bufferlist bl;
486     for (size_t i = 0; i < count; ++i)
487       bl.push_back(bufferptr((char*)message.data(), message_size));
488     while (!*done_p) {
489       again_count = 0;
490       if (worker->id == 1) {
491         if (c > 0) {
492           ssize_t r = 0;
493           usleep(100);
494           if (left > 0) {
495             r = cli_socket.send(bl, false);
496             ASSERT_TRUE(r >= 0 || r == -EAGAIN);
497             if (r > 0)
498               left -= r;
499             if (r == -EAGAIN)
500               ++again_count;
501           }
502           if (left == 0) {
503             --c;
504             left = message_size * count;
505             ASSERT_EQ(0U, bl.length());
506             for (size_t i = 0; i < count; ++i)
507               bl.push_back(bufferptr((char*)message.data(), message_size));
508           }
509         }
510       }
511
512       if (srv_socket) {
513         char buf[1000];
514         if (len > 0) {
515           r = srv_socket.read(buf, sizeof(buf));
516           ASSERT_TRUE(r > 0 || r == -EAGAIN);
517           if (r > 0) {
518             read_string.append(buf, r);
519             len -= r;
520           } else if (r == -EAGAIN) {
521             ++again_count;
522           }
523         }
524         if (len == 0) {
525           for (size_t i = 0; i < read_string.size(); i += message_size)
526             ASSERT_EQ(0, memcmp(read_string.c_str()+i, message.c_str(), message_size));
527           *done_p = true;
528         }
529       }
530       if (again_count) {
531         cb.reset();
532         cb.poll(500);
533       }
534     }
535     if (worker->id == 1)
536       center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE);
537     if (srv_socket)
538       center->delete_file_event(srv_socket.fd(), EVENT_READABLE);
539
540     if (bind_socket)
541       bind_socket.abort_accept();
542     if (srv_socket)
543       srv_socket.close();
544     if (worker->id == 1)
545       cli_socket.close();
546   });
547 }
548
549 class StressFactory {
550   struct Client;
551   struct Server;
552   struct ThreadData {
553     Worker *worker;
554     std::set<Client*> clients;
555     std::set<Server*> servers;
556     ~ThreadData() {
557       for (auto && i : clients)
558         delete i;
559       for (auto && i : servers)
560         delete i;
561     }
562   };
563
564   struct RandomString {
565     size_t slen;
566     vector<std::string> strs;
567     std::random_device rd;
568     std::default_random_engine rng;
569
570     RandomString(size_t s): slen(s), rng(rd()) {}
571     void prepare(size_t n) {
572       static const char alphabet[] =
573           "abcdefghijklmnopqrstuvwxyz"
574           "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
575           "0123456789";
576
577       std::uniform_int_distribution<> dist(
578               0, sizeof(alphabet) / sizeof(*alphabet) - 2);
579
580       strs.reserve(n);
581       std::generate_n(
582         std::back_inserter(strs), strs.capacity(), [&] {
583           std::string str;
584           str.reserve(slen);
585           std::generate_n(std::back_inserter(str), slen, [&]() {
586             return alphabet[dist(rng)];
587           });
588          return str;
589         }
590       );
591     }
592     std::string &get_random_string() {
593       std::uniform_int_distribution<> dist(
594               0, strs.size() - 1);
595       return strs[dist(rng)];
596     }
597   };
598   struct Message {
599     size_t idx;
600     size_t len;
601     std::string content;
602
603     explicit Message(RandomString &rs, size_t i, size_t l): idx(i) {
604       size_t slen = rs.slen;
605       len = std::max(slen, l);
606
607       std::vector<std::string> strs;
608       strs.reserve(len / slen);
609       std::generate_n(
610         std::back_inserter(strs), strs.capacity(), [&] {
611           return rs.get_random_string();
612         }
613       );
614       len = slen * strs.size();
615       content.reserve(len);
616       for (auto &&s : strs)
617         content.append(s);
618     }
619     bool verify(const char *b, size_t len = 0) const {
620       return content.compare(0, len, b, 0, len) == 0;
621     }
622   };
623
624   template <typename T>
625   class C_delete : public EventCallback {
626     T *ctxt;
627    public:
628     C_delete(T *c): ctxt(c) {}
629     void do_request(int id) override {
630       delete ctxt;
631       delete this;
632     }
633   };
634
635   class Client {
636     StressFactory *factory;
637     EventCenter *center;
638     ConnectedSocket socket;
639     std::deque<StressFactory::Message*> acking;
640     std::deque<StressFactory::Message*> writings;
641     std::string buffer;
642     size_t index = 0;
643     size_t left;
644     bool write_enabled = false;
645     size_t read_offset = 0, write_offset = 0;
646     bool first = true;
647     bool dead = false;
648     StressFactory::Message homeless_message;
649
650     class Client_read_handle : public EventCallback {
651       Client *c;
652      public:
653       Client_read_handle(Client *_c): c(_c) {}
654       void do_request(int id) override {
655         c->do_read_request();
656       }
657     } read_ctxt;
658
659     class Client_write_handle : public EventCallback {
660       Client *c;
661      public:
662       Client_write_handle(Client *_c): c(_c) {}
663       void do_request(int id) override {
664         c->do_write_request();
665       }
666     } write_ctxt;
667
668    public:
669     Client(StressFactory *f, EventCenter *cen, ConnectedSocket s, size_t c)
670         : factory(f), center(cen), socket(std::move(s)), left(c), homeless_message(factory->rs, -1, 1024),
671           read_ctxt(this), write_ctxt(this) {
672       center->create_file_event(
673               socket.fd(), EVENT_READABLE, &read_ctxt);
674       center->dispatch_event_external(&read_ctxt);
675     }
676     void close() {
677       ASSERT_FALSE(write_enabled);
678       dead = true;
679       socket.shutdown();
680       center->delete_file_event(socket.fd(), EVENT_READABLE);
681       center->dispatch_event_external(new C_delete<Client>(this));
682     }
683
684     void do_read_request() {
685       if (dead)
686         return ;
687       ASSERT_TRUE(socket.is_connected() >= 0);
688       if (!socket.is_connected())
689         return ;
690       ASSERT_TRUE(!acking.empty() || first);
691       if (first) {
692         first = false;
693         center->dispatch_event_external(&write_ctxt);
694         if (acking.empty())
695           return ;
696       }
697       StressFactory::Message *m = acking.front();
698       int r = 0;
699       if (buffer.empty())
700         buffer.resize(m->len);
701       bool must_no = false;
702       while (true) {
703         r = socket.read((char*)buffer.data() + read_offset,
704                         m->len - read_offset);
705         ASSERT_TRUE(r == -EAGAIN || r > 0);
706         if (r == -EAGAIN)
707           break;
708         read_offset += r;
709
710         std::cerr << " client " << this << " receive " << m->idx << " len " << r << " content: "  << std::endl;
711         ASSERT_FALSE(must_no);
712         if ((m->len - read_offset) == 0) {
713           ASSERT_TRUE(m->verify(buffer.data(), 0));
714           delete m;
715           acking.pop_front();
716           read_offset = 0;
717           buffer.clear();
718           if (acking.empty()) {
719             m = &homeless_message;
720             must_no = true;
721           } else {
722             m = acking.front();
723             buffer.resize(m->len);
724           }
725         }
726       }
727       if (acking.empty()) {
728         center->dispatch_event_external(&write_ctxt);
729         return ;
730       }
731     }
732
733     void do_write_request() {
734       if (dead)
735         return ;
736       ASSERT_TRUE(socket.is_connected() > 0);
737
738       while (left > 0 && factory->queue_depth > writings.size() + acking.size()) {
739         StressFactory::Message *m = new StressFactory::Message(
740                 factory->rs, ++index,
741                 factory->rd() % factory->max_message_length);
742         std::cerr << " client " << this << " generate message " << m->idx << " length " << m->len << std::endl;
743         ASSERT_EQ(m->len, m->content.size());
744         writings.push_back(m);
745         --left;
746         --factory->message_left;
747       }
748
749       while (!writings.empty()) {
750         StressFactory::Message *m = writings.front();
751         bufferlist bl;
752         bl.append(m->content.data() + write_offset, m->content.size() - write_offset);
753         ssize_t r = socket.send(bl, false);
754         if (r == 0)
755           break;
756         std::cerr << " client " << this << " send " << m->idx << " len " << r << " content: " << std::endl;
757         ASSERT_TRUE(r >= 0);
758         write_offset += r;
759         if (write_offset == m->content.size()) {
760           write_offset = 0;
761           writings.pop_front();
762           acking.push_back(m);
763         }
764       }
765       if (writings.empty() && write_enabled) {
766         center->delete_file_event(socket.fd(), EVENT_WRITABLE);
767         write_enabled = false;
768       } else if (!writings.empty() && !write_enabled) {
769         ASSERT_EQ(0, center->create_file_event(
770                   socket.fd(), EVENT_WRITABLE, &write_ctxt));
771         write_enabled = true;
772       }
773     }
774
775     bool finish() const {
776       return left == 0 && acking.empty() && writings.empty();
777     }
778   };
779   friend class Client;
780
781   class Server {
782     StressFactory *factory;
783     EventCenter *center;
784     ConnectedSocket socket;
785     std::deque<std::string> buffers;
786     bool write_enabled = false;
787     bool dead = false;
788
789     class Server_read_handle : public EventCallback {
790       Server *s;
791      public:
792       Server_read_handle(Server *_s): s(_s) {}
793       void do_request(int id) override {
794         s->do_read_request();
795       }
796     } read_ctxt;
797
798     class Server_write_handle : public EventCallback {
799       Server *s;
800      public:
801       Server_write_handle(Server *_s): s(_s) {}
802       void do_request(int id) override {
803         s->do_write_request();
804       }
805     } write_ctxt;
806
807    public:
808     Server(StressFactory *f, EventCenter *c, ConnectedSocket s):
809         factory(f), center(c), socket(std::move(s)), read_ctxt(this), write_ctxt(this) {
810       center->create_file_event(socket.fd(), EVENT_READABLE, &read_ctxt);
811       center->dispatch_event_external(&read_ctxt);
812     }
813     void close() {
814       ASSERT_FALSE(write_enabled);
815       socket.shutdown();
816       center->delete_file_event(socket.fd(), EVENT_READABLE);
817       center->dispatch_event_external(new C_delete<Server>(this));
818     }
819     void do_read_request() {
820       if (dead)
821         return ;
822       int r = 0;
823       while (true) {
824         char buf[4096];
825         bufferptr data;
826         if (factory->zero_copy_read) {
827           r = socket.zero_copy_read(data);
828         } else {
829           r = socket.read(buf, sizeof(buf));
830         }
831         ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf)));
832         if (r == 0) {
833           ASSERT_TRUE(buffers.empty());
834           dead = true;
835           return ;
836         } else if (r == -EAGAIN)
837           break;
838         if (factory->zero_copy_read) {
839           buffers.emplace_back(data.c_str(), 0, data.length());
840         } else {
841           buffers.emplace_back(buf, 0, r);
842         }
843         std::cerr << " server " << this << " receive " << r << " content: " << std::endl;
844       }
845       if (!buffers.empty() && !write_enabled)
846         center->dispatch_event_external(&write_ctxt);
847     }
848
849     void do_write_request() {
850       if (dead)
851         return ;
852
853       while (!buffers.empty()) {
854         bufferlist bl;
855         auto it = buffers.begin();
856         for (size_t i = 0; i < buffers.size(); ++i) {
857           bl.push_back(bufferptr((char*)it->data(), it->size()));
858           ++it;
859         }
860
861         ssize_t r = socket.send(bl, false);
862         std::cerr << " server " << this << " send " << r << std::endl;
863         if (r == 0)
864           break;
865         ASSERT_TRUE(r >= 0);
866         while (r > 0) {
867           ASSERT_TRUE(!buffers.empty());
868           string &buffer = buffers.front();
869           if (r >= (int)buffer.size()) {
870             r -= (int)buffer.size();
871             buffers.pop_front();
872           } else {
873            std::cerr << " server " << this << " sent " << r << std::endl;
874             buffer = buffer.substr(r, buffer.size());
875             break;
876           }
877         }
878       }
879       if (buffers.empty()) {
880         if (write_enabled) {
881           center->delete_file_event(socket.fd(), EVENT_WRITABLE);
882           write_enabled = false;
883         }
884       } else if (!write_enabled) {
885         ASSERT_EQ(0, center->create_file_event(
886                   socket.fd(), EVENT_WRITABLE, &write_ctxt));
887         write_enabled = true;
888       }
889     }
890
891     bool finish() {
892      return dead;
893     }
894   };
895   friend class Server;
896
897   class C_accept : public EventCallback {
898     StressFactory *factory;
899     ServerSocket bind_socket;
900     ThreadData *t_data;
901     Worker *worker;
902
903    public:
904     C_accept(StressFactory *f, ServerSocket s, ThreadData *data, Worker *w)
905         : factory(f), bind_socket(std::move(s)), t_data(data), worker(w) {}
906     void do_request(int id) override {
907       while (true) {
908         entity_addr_t cli_addr;
909         ConnectedSocket srv_socket;
910         SocketOptions options;
911         int r = bind_socket.accept(&srv_socket, options, &cli_addr, worker);
912         if (r == -EAGAIN) {
913           break;
914         }
915         ASSERT_EQ(0, r);
916         ASSERT_TRUE(srv_socket.fd() > 0);
917         Server *cb = new Server(factory, &t_data->worker->center, std::move(srv_socket));
918         t_data->servers.insert(cb);
919       }
920     }
921   };
922   friend class C_accept;
923
924  public:
925   static const size_t min_client_send_messages = 100;
926   static const size_t max_client_send_messages = 1000;
927   std::shared_ptr<NetworkStack> stack;
928   RandomString rs;
929   std::random_device rd;
930   const size_t client_num, queue_depth, max_message_length;
931   atomic_int message_count, message_left;
932   entity_addr_t bind_addr;
933   std::atomic_bool already_bind = {false};
934   bool zero_copy_read;
935   SocketOptions options;
936
937   explicit StressFactory(std::shared_ptr<NetworkStack> s, const string &addr,
938                          size_t cli, size_t qd, size_t mc, size_t l, bool zero_copy)
939       : stack(s), rs(128), client_num(cli), queue_depth(qd),
940         max_message_length(l), message_count(mc), message_left(mc),
941         zero_copy_read(zero_copy) {
942     bind_addr.parse(addr.c_str());
943     rs.prepare(100);
944   }
945   ~StressFactory() {
946   }
947
948   void add_client(ThreadData *t_data) {
949     static Mutex lock("add_client_lock");
950     Mutex::Locker l(lock);
951     ConnectedSocket sock;
952     int r = t_data->worker->connect(bind_addr, options, &sock);
953     std::default_random_engine rng(rd());
954     std::uniform_int_distribution<> dist(
955             min_client_send_messages, max_client_send_messages);
956     ASSERT_EQ(0, r);
957     int c = dist(rng);
958     if (c > message_count.load())
959       c = message_count.load();
960     Client *cb = new Client(this, &t_data->worker->center, std::move(sock), c);
961     t_data->clients.insert(cb);
962     message_count -= c;
963   }
964
965   void drop_client(ThreadData *t_data, Client *c) {
966     c->close();
967     ASSERT_EQ(1U, t_data->clients.erase(c));
968   }
969
970   void drop_server(ThreadData *t_data, Server *s) {
971     s->close();
972     ASSERT_EQ(1U, t_data->servers.erase(s));
973   }
974
975   void start(Worker *worker) {
976     int r = 0;
977     ThreadData t_data;
978     t_data.worker = worker;
979     ServerSocket bind_socket;
980     if (stack->support_local_listen_table() || worker->id == 0) {
981       r = worker->listen(bind_addr, options, &bind_socket);
982       ASSERT_EQ(0, r);
983       already_bind = true;
984     }
985     while (!already_bind)
986       usleep(50);
987     C_accept *accept_handler = nullptr;
988     int bind_fd = 0;
989     if (bind_socket) {
990       bind_fd = bind_socket.fd();
991       accept_handler = new C_accept(this, std::move(bind_socket), &t_data, worker);
992       ASSERT_EQ(0, worker->center.create_file_event(
993                   bind_fd, EVENT_READABLE, accept_handler));
994     }
995
996     int echo_throttle = message_count;
997     while (message_count > 0 || !t_data.clients.empty() || !t_data.servers.empty()) {
998       if (message_count > 0  && t_data.clients.size() < client_num && t_data.servers.size() < client_num)
999         add_client(&t_data);
1000       for (auto &&c : t_data.clients) {
1001         if (c->finish()) {
1002           drop_client(&t_data, c);
1003           break;
1004         }
1005       }
1006       for (auto &&s : t_data.servers) {
1007         if (s->finish()) {
1008           drop_server(&t_data, s);
1009           break;
1010         }
1011       }
1012
1013       worker->center.process_events(1);
1014       if (echo_throttle > message_left) {
1015         std::cerr << " clients " << t_data.clients.size() << " servers " << t_data.servers.size()
1016                   << " message count " << message_left << std::endl;
1017         echo_throttle -= 100;
1018       }
1019     }
1020     if (bind_fd)
1021       worker->center.delete_file_event(bind_fd, EVENT_READABLE);
1022     delete accept_handler;
1023   }
1024 };
1025
1026 TEST_P(NetworkWorkerTest, StressTest) {
1027   StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024,
1028                         strncmp(GetParam(), "dpdk", 4) == 0);
1029   StressFactory *f = &factory;
1030   exec_events([f](Worker *worker) mutable {
1031     f->start(worker);
1032   });
1033   ASSERT_EQ(0, factory.message_left);
1034 }
1035
1036
1037 INSTANTIATE_TEST_CASE_P(
1038   NetworkStack,
1039   NetworkWorkerTest,
1040   ::testing::Values(
1041 #ifdef HAVE_DPDK
1042     "dpdk",
1043 #endif
1044     "posix"
1045   )
1046 );
1047
1048 #else
1049
1050 // Google Test may not support value-parameterized tests with some
1051 // compilers. If we use conditional compilation to compile out all
1052 // code referring to the gtest_main library, MSVC linker will not link
1053 // that library at all and consequently complain about missing entry
1054 // point defined in that library (fatal error LNK1561: entry point
1055 // must be defined). This dummy test keeps gtest_main linked in.
1056 TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
1057
1058 #endif
1059
1060
1061 /*
1062  * Local Variables:
1063  * compile-command: "cd ../.. ; make ceph_test_async_networkstack &&
1064  *    ./ceph_test_async_networkstack
1065  *
1066  * End:
1067  */