Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / msgr / test_msgr.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #include <atomic>
18 #include <iostream>
19 #include <unistd.h>
20 #include <stdlib.h>
21 #include <time.h>
22 #include "common/Mutex.h"
23 #include "common/Cond.h"
24 #include "common/ceph_argparse.h"
25 #include "global/global_init.h"
26 #include "msg/Dispatcher.h"
27 #include "msg/msg_types.h"
28 #include "msg/Message.h"
29 #include "msg/Messenger.h"
30 #include "msg/Connection.h"
31 #include "messages/MPing.h"
32 #include "messages/MCommand.h"
33
34 #include <boost/random/mersenne_twister.hpp>
35 #include <boost/random/uniform_int.hpp>
36 #include <boost/random/binomial_distribution.hpp>
37 #include <gtest/gtest.h>
38
39 typedef boost::mt11213b gen_type;
40
41 #include "common/dout.h"
42 #include "include/assert.h"
43
44 #define dout_subsys ceph_subsys_ms
45 #undef dout_prefix
46 #define dout_prefix *_dout << " ceph_test_msgr "
47
48
49 #if GTEST_HAS_PARAM_TEST
50
51 #define CHECK_AND_WAIT_TRUE(expr) do {  \
52   int n = 1000;                         \
53   while (--n) {                         \
54     if (expr)                           \
55       break;                            \
56     usleep(1000);                       \
57   }                                     \
58 } while(0);
59
60 class MessengerTest : public ::testing::TestWithParam<const char*> {
61  public:
62   Messenger *server_msgr;
63   Messenger *client_msgr;
64
65   MessengerTest(): server_msgr(NULL), client_msgr(NULL) {}
66   void SetUp() override {
67     lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
68     server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
69     client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
70     server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
71     client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
72   }
73   void TearDown() override {
74     ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0);
75     ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0);
76     delete server_msgr;
77     delete client_msgr;
78   }
79
80 };
81
82
83 class FakeDispatcher : public Dispatcher {
84  public:
85   struct Session : public RefCountedObject {
86     atomic<uint64_t> count;
87     ConnectionRef con;
88
89     explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) {
90     }
91     uint64_t get_count() { return count; }
92   };
93
94   Mutex lock;
95   Cond cond;
96   bool is_server;
97   bool got_new;
98   bool got_remote_reset;
99   bool got_connect;
100   bool loopback;
101
102   explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"),
103                           is_server(s), got_new(false), got_remote_reset(false),
104                           got_connect(false), loopback(false) {}
105   bool ms_can_fast_dispatch_any() const override { return true; }
106   bool ms_can_fast_dispatch(const Message *m) const override {
107     switch (m->get_type()) {
108     case CEPH_MSG_PING:
109       return true;
110     default:
111       return false;
112     }
113   }
114
115   void ms_handle_fast_connect(Connection *con) override {
116     lock.Lock();
117     lderr(g_ceph_context) << __func__ << " " << con << dendl;
118     Session *s = static_cast<Session*>(con->get_priv());
119     if (!s) {
120       s = new Session(con);
121       con->set_priv(s->get());
122       lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl;
123     }
124     s->put();
125     got_connect = true;
126     cond.Signal();
127     lock.Unlock();
128   }
129   void ms_handle_fast_accept(Connection *con) override {
130     Session *s = static_cast<Session*>(con->get_priv());
131     if (!s) {
132       s = new Session(con);
133       con->set_priv(s->get());
134     }
135     s->put();
136   }
137   bool ms_dispatch(Message *m) override {
138     Session *s = static_cast<Session*>(m->get_connection()->get_priv());
139     if (!s) {
140       s = new Session(m->get_connection());
141       m->get_connection()->set_priv(s->get());
142     }
143     s->put();
144     s->count++;
145     lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
146     if (is_server) {
147       reply_message(m);
148     }
149     Mutex::Locker l(lock);
150     got_new = true;
151     cond.Signal();
152     m->put();
153     return true;
154   }
155   bool ms_handle_reset(Connection *con) override {
156     Mutex::Locker l(lock);
157     lderr(g_ceph_context) << __func__ << " " << con << dendl;
158     Session *s = static_cast<Session*>(con->get_priv());
159     if (s) {
160       s->con.reset(NULL);  // break con <-> session ref cycle
161       con->set_priv(NULL);   // break ref <-> session cycle, if any
162       s->put();
163     }
164     return true;
165   }
166   void ms_handle_remote_reset(Connection *con) override {
167     Mutex::Locker l(lock);
168     lderr(g_ceph_context) << __func__ << " " << con << dendl;
169     Session *s = static_cast<Session*>(con->get_priv());
170     if (s) {
171       s->con.reset(NULL);  // break con <-> session ref cycle
172       con->set_priv(NULL);   // break ref <-> session cycle, if any
173       s->put();
174     }
175     got_remote_reset = true;
176     cond.Signal();
177   }
178   bool ms_handle_refused(Connection *con) override {
179     return false;
180   }
181   void ms_fast_dispatch(Message *m) override {
182     Session *s = static_cast<Session*>(m->get_connection()->get_priv());
183     if (!s) {
184       s = new Session(m->get_connection());
185       m->get_connection()->set_priv(s->get());
186     }
187     s->put();
188     s->count++;
189     lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
190     if (is_server) {
191       if (loopback)
192         assert(m->get_source().is_osd());
193       else
194         reply_message(m);
195     } else if (loopback) {
196       assert(m->get_source().is_client());
197     }
198     m->put();
199     Mutex::Locker l(lock);
200     got_new = true;
201     cond.Signal();
202   }
203
204   bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
205                             bufferlist& authorizer, bufferlist& authorizer_reply,
206                             bool& isvalid, CryptoKey& session_key) override {
207     isvalid = true;
208     return true;
209   }
210
211   void reply_message(Message *m) {
212     MPing *rm = new MPing();
213     m->get_connection()->send_message(rm);
214   }
215 };
216
217 typedef FakeDispatcher::Session Session;
218
219 TEST_P(MessengerTest, SimpleTest) {
220   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
221   entity_addr_t bind_addr;
222   bind_addr.parse("127.0.0.1");
223   server_msgr->bind(bind_addr);
224   server_msgr->add_dispatcher_head(&srv_dispatcher);
225   server_msgr->start();
226
227   client_msgr->add_dispatcher_head(&cli_dispatcher);
228   client_msgr->start();
229
230   // 1. simple round trip
231   MPing *m = new MPing();
232   ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
233   {
234     ASSERT_EQ(conn->send_message(m), 0);
235     Mutex::Locker l(cli_dispatcher.lock);
236     while (!cli_dispatcher.got_new)
237       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
238     cli_dispatcher.got_new = false;
239   }
240   ASSERT_TRUE(conn->is_connected());
241   ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
242   ASSERT_TRUE(conn->peer_is_osd());
243
244   // 2. test rebind port
245   set<int> avoid_ports;
246   for (int i = 0; i < 10 ; i++)
247     avoid_ports.insert(server_msgr->get_myaddr().get_port() + i);
248   server_msgr->rebind(avoid_ports);
249   ASSERT_TRUE(avoid_ports.count(server_msgr->get_myaddr().get_port()) == 0);
250
251   conn = client_msgr->get_connection(server_msgr->get_myinst());
252   {
253     m = new MPing();
254     ASSERT_EQ(conn->send_message(m), 0);
255     Mutex::Locker l(cli_dispatcher.lock);
256     while (!cli_dispatcher.got_new)
257       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
258     cli_dispatcher.got_new = false;
259   }
260   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
261
262   // 3. test markdown connection
263   conn->mark_down();
264   ASSERT_FALSE(conn->is_connected());
265
266   // 4. test failed connection
267   server_msgr->shutdown();
268   server_msgr->wait();
269
270   m = new MPing();
271   conn->send_message(m);
272   CHECK_AND_WAIT_TRUE(!conn->is_connected());
273   ASSERT_FALSE(conn->is_connected());
274
275   // 5. loopback connection
276   srv_dispatcher.loopback = true;
277   conn = client_msgr->get_loopback_connection();
278   {
279     m = new MPing();
280     ASSERT_EQ(conn->send_message(m), 0);
281     Mutex::Locker l(cli_dispatcher.lock);
282     while (!cli_dispatcher.got_new)
283       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
284     cli_dispatcher.got_new = false;
285   }
286   srv_dispatcher.loopback = false;
287   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
288   client_msgr->shutdown();
289   client_msgr->wait();
290   server_msgr->shutdown();
291   server_msgr->wait();
292 }
293
294 TEST_P(MessengerTest, NameAddrTest) {
295   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
296   entity_addr_t bind_addr;
297   bind_addr.parse("127.0.0.1");
298   server_msgr->bind(bind_addr);
299   server_msgr->add_dispatcher_head(&srv_dispatcher);
300   server_msgr->start();
301
302   client_msgr->add_dispatcher_head(&cli_dispatcher);
303   client_msgr->start();
304
305   MPing *m = new MPing();
306   ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
307   {
308     ASSERT_EQ(conn->send_message(m), 0);
309     Mutex::Locker l(cli_dispatcher.lock);
310     while (!cli_dispatcher.got_new)
311       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
312     cli_dispatcher.got_new = false;
313   }
314   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
315   ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
316   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
317   // Make should server_conn is the one we already accepted from client,
318   // so it means client_msgr has the same addr when server connection has
319   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
320   server_msgr->shutdown();
321   client_msgr->shutdown();
322   server_msgr->wait();
323   client_msgr->wait();
324 }
325
326 TEST_P(MessengerTest, FeatureTest) {
327   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
328   entity_addr_t bind_addr;
329   bind_addr.parse("127.0.0.1");
330   uint64_t all_feature_supported, feature_required, feature_supported = 0;
331   for (int i = 0; i < 10; i++)
332     feature_supported |= 1ULL << i;
333   feature_required = feature_supported | 1ULL << 13;
334   all_feature_supported = feature_required | 1ULL << 14;
335
336   Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT);
337   p.features_required = feature_required;
338   server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
339   server_msgr->bind(bind_addr);
340   server_msgr->add_dispatcher_head(&srv_dispatcher);
341   server_msgr->start();
342
343   // 1. Suppose if only support less than required
344   p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
345   p.features_supported = feature_supported;
346   client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
347   client_msgr->add_dispatcher_head(&cli_dispatcher);
348   client_msgr->start();
349
350   MPing *m = new MPing();
351   ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
352   conn->send_message(m);
353   CHECK_AND_WAIT_TRUE(!conn->is_connected());
354   // should failed build a connection
355   ASSERT_FALSE(conn->is_connected());
356
357   client_msgr->shutdown();
358   client_msgr->wait();
359
360   // 2. supported met required
361   p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
362   p.features_supported = all_feature_supported;
363   client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
364   client_msgr->start();
365
366   conn = client_msgr->get_connection(server_msgr->get_myinst());
367   {
368     m = new MPing();
369     ASSERT_EQ(conn->send_message(m), 0);
370     Mutex::Locker l(cli_dispatcher.lock);
371     while (!cli_dispatcher.got_new)
372       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
373     cli_dispatcher.got_new = false;
374   }
375   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
376
377   server_msgr->shutdown();
378   client_msgr->shutdown();
379   server_msgr->wait();
380   client_msgr->wait();
381 }
382
383 TEST_P(MessengerTest, TimeoutTest) {
384   g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "1");
385   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
386   entity_addr_t bind_addr;
387   bind_addr.parse("127.0.0.1");
388   server_msgr->bind(bind_addr);
389   server_msgr->add_dispatcher_head(&srv_dispatcher);
390   server_msgr->start();
391
392   client_msgr->add_dispatcher_head(&cli_dispatcher);
393   client_msgr->start();
394
395   // 1. build the connection
396   MPing *m = new MPing();
397   ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
398   {
399     ASSERT_EQ(conn->send_message(m), 0);
400     Mutex::Locker l(cli_dispatcher.lock);
401     while (!cli_dispatcher.got_new)
402       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
403     cli_dispatcher.got_new = false;
404   }
405   ASSERT_TRUE(conn->is_connected());
406   ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
407   ASSERT_TRUE(conn->peer_is_osd());
408
409   // 2. wait for idle
410   usleep(2500*1000);
411   ASSERT_FALSE(conn->is_connected());
412
413   server_msgr->shutdown();
414   server_msgr->wait();
415
416   client_msgr->shutdown();
417   client_msgr->wait();
418   g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "900");
419 }
420
421 TEST_P(MessengerTest, StatefulTest) {
422   Message *m;
423   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
424   entity_addr_t bind_addr;
425   bind_addr.parse("127.0.0.1");
426   Messenger::Policy p = Messenger::Policy::stateful_server(0);
427   server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
428   p = Messenger::Policy::lossless_client(0);
429   client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
430
431   server_msgr->bind(bind_addr);
432   server_msgr->add_dispatcher_head(&srv_dispatcher);
433   server_msgr->start();
434   client_msgr->add_dispatcher_head(&cli_dispatcher);
435   client_msgr->start();
436
437   // 1. test for server standby
438   ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
439   {
440     m = new MPing();
441     ASSERT_EQ(conn->send_message(m), 0);
442     Mutex::Locker l(cli_dispatcher.lock);
443     while (!cli_dispatcher.got_new)
444       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
445     cli_dispatcher.got_new = false;
446   }
447   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
448   conn->mark_down();
449   ASSERT_FALSE(conn->is_connected());
450   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
451   // don't lose state
452   ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
453
454   srv_dispatcher.got_new = false;
455   conn = client_msgr->get_connection(server_msgr->get_myinst());
456   {
457     m = new MPing();
458     ASSERT_EQ(conn->send_message(m), 0);
459     Mutex::Locker l(cli_dispatcher.lock);
460     while (!cli_dispatcher.got_new)
461       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
462     cli_dispatcher.got_new = false;
463   }
464   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
465   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
466   {
467     Mutex::Locker l(srv_dispatcher.lock);
468     while (!srv_dispatcher.got_remote_reset)
469       srv_dispatcher.cond.Wait(srv_dispatcher.lock);
470   }
471
472   // 2. test for client reconnect
473   ASSERT_FALSE(cli_dispatcher.got_remote_reset);
474   cli_dispatcher.got_connect = false;
475   cli_dispatcher.got_new = false;
476   cli_dispatcher.got_remote_reset = false;
477   server_conn->mark_down();
478   ASSERT_FALSE(server_conn->is_connected());
479   // ensure client detect server socket closed
480   {
481     Mutex::Locker l(cli_dispatcher.lock);
482     while (!cli_dispatcher.got_remote_reset)
483       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
484     cli_dispatcher.got_remote_reset = false;
485   }
486   {
487     Mutex::Locker l(cli_dispatcher.lock);
488     while (!cli_dispatcher.got_connect)
489       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
490     cli_dispatcher.got_connect = false;
491   }
492   CHECK_AND_WAIT_TRUE(conn->is_connected());
493   ASSERT_TRUE(conn->is_connected());
494
495   {
496     m = new MPing();
497     ASSERT_EQ(conn->send_message(m), 0);
498     ASSERT_TRUE(conn->is_connected());
499     Mutex::Locker l(cli_dispatcher.lock);
500     while (!cli_dispatcher.got_new)
501       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
502     cli_dispatcher.got_new = false;
503   }
504   // resetcheck happen
505   ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv())->get_count());
506   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
507   ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
508   cli_dispatcher.got_remote_reset = false;
509
510   server_msgr->shutdown();
511   client_msgr->shutdown();
512   server_msgr->wait();
513   client_msgr->wait();
514 }
515
516 TEST_P(MessengerTest, StatelessTest) {
517   Message *m;
518   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
519   entity_addr_t bind_addr;
520   bind_addr.parse("127.0.0.1");
521   Messenger::Policy p = Messenger::Policy::stateless_server(0);
522   server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
523   p = Messenger::Policy::lossy_client(0);
524   client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
525
526   server_msgr->bind(bind_addr);
527   server_msgr->add_dispatcher_head(&srv_dispatcher);
528   server_msgr->start();
529   client_msgr->add_dispatcher_head(&cli_dispatcher);
530   client_msgr->start();
531
532   // 1. test for server lose state
533   ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
534   {
535     m = new MPing();
536     ASSERT_EQ(conn->send_message(m), 0);
537     Mutex::Locker l(cli_dispatcher.lock);
538     while (!cli_dispatcher.got_new)
539       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
540     cli_dispatcher.got_new = false;
541   }
542   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
543   conn->mark_down();
544   ASSERT_FALSE(conn->is_connected());
545
546   srv_dispatcher.got_new = false;
547   conn = client_msgr->get_connection(server_msgr->get_myinst());
548   {
549     m = new MPing();
550     ASSERT_EQ(conn->send_message(m), 0);
551     Mutex::Locker l(cli_dispatcher.lock);
552     while (!cli_dispatcher.got_new)
553       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
554     cli_dispatcher.got_new = false;
555   }
556   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
557   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
558   // server lose state
559   {
560     Mutex::Locker l(srv_dispatcher.lock);
561     while (!srv_dispatcher.got_new)
562       srv_dispatcher.cond.Wait(srv_dispatcher.lock);
563   }
564   ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
565
566   // 2. test for client lossy
567   server_conn->mark_down();
568   ASSERT_FALSE(server_conn->is_connected());
569   conn->send_keepalive();
570   CHECK_AND_WAIT_TRUE(!conn->is_connected());
571   ASSERT_FALSE(conn->is_connected());
572   conn = client_msgr->get_connection(server_msgr->get_myinst());
573   {
574     m = new MPing();
575     ASSERT_EQ(conn->send_message(m), 0);
576     Mutex::Locker l(cli_dispatcher.lock);
577     while (!cli_dispatcher.got_new)
578       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
579     cli_dispatcher.got_new = false;
580   }
581   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
582
583   server_msgr->shutdown();
584   client_msgr->shutdown();
585   server_msgr->wait();
586   client_msgr->wait();
587 }
588
589 TEST_P(MessengerTest, ClientStandbyTest) {
590   Message *m;
591   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
592   entity_addr_t bind_addr;
593   bind_addr.parse("127.0.0.1");
594   Messenger::Policy p = Messenger::Policy::stateful_server(0);
595   server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
596   p = Messenger::Policy::lossless_peer(0);
597   client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
598
599   server_msgr->bind(bind_addr);
600   server_msgr->add_dispatcher_head(&srv_dispatcher);
601   server_msgr->start();
602   client_msgr->add_dispatcher_head(&cli_dispatcher);
603   client_msgr->start();
604
605   // 1. test for client standby, resetcheck
606   ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
607   {
608     m = new MPing();
609     ASSERT_EQ(conn->send_message(m), 0);
610     Mutex::Locker l(cli_dispatcher.lock);
611     while (!cli_dispatcher.got_new)
612       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
613     cli_dispatcher.got_new = false;
614   }
615   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
616   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
617   ASSERT_FALSE(cli_dispatcher.got_remote_reset);
618   cli_dispatcher.got_connect = false;
619   server_conn->mark_down();
620   ASSERT_FALSE(server_conn->is_connected());
621   // client should be standby
622   usleep(300*1000);
623   // client should be standby, so we use original connection
624   {
625     // Try send message to verify got remote reset callback
626     m = new MPing();
627     ASSERT_EQ(conn->send_message(m), 0);
628     {
629       Mutex::Locker l(cli_dispatcher.lock);
630       while (!cli_dispatcher.got_remote_reset)
631         cli_dispatcher.cond.Wait(cli_dispatcher.lock);
632       cli_dispatcher.got_remote_reset = false;
633       while (!cli_dispatcher.got_connect)
634         cli_dispatcher.cond.Wait(cli_dispatcher.lock);
635       cli_dispatcher.got_connect = false;
636     }
637     CHECK_AND_WAIT_TRUE(conn->is_connected());
638     ASSERT_TRUE(conn->is_connected());
639     m = new MPing();
640     ASSERT_EQ(conn->send_message(m), 0);
641     Mutex::Locker l(cli_dispatcher.lock);
642     while (!cli_dispatcher.got_new)
643       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
644     cli_dispatcher.got_new = false;
645   }
646   ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
647   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
648   ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
649
650   server_msgr->shutdown();
651   client_msgr->shutdown();
652   server_msgr->wait();
653   client_msgr->wait();
654 }
655
656 TEST_P(MessengerTest, AuthTest) {
657   g_ceph_context->_conf->set_val("auth_cluster_required", "cephx");
658   g_ceph_context->_conf->set_val("auth_service_required", "cephx");
659   g_ceph_context->_conf->set_val("auth_client_required", "cephx");
660   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
661   entity_addr_t bind_addr;
662   bind_addr.parse("127.0.0.1");
663   server_msgr->bind(bind_addr);
664   server_msgr->add_dispatcher_head(&srv_dispatcher);
665   server_msgr->start();
666
667   client_msgr->add_dispatcher_head(&cli_dispatcher);
668   client_msgr->start();
669
670   // 1. simple auth round trip
671   MPing *m = new MPing();
672   ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
673   {
674     ASSERT_EQ(conn->send_message(m), 0);
675     Mutex::Locker l(cli_dispatcher.lock);
676     while (!cli_dispatcher.got_new)
677       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
678     cli_dispatcher.got_new = false;
679   }
680   ASSERT_TRUE(conn->is_connected());
681   ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
682
683   // 2. mix auth
684   g_ceph_context->_conf->set_val("auth_cluster_required", "none");
685   g_ceph_context->_conf->set_val("auth_service_required", "none");
686   g_ceph_context->_conf->set_val("auth_client_required", "none");
687   conn->mark_down();
688   ASSERT_FALSE(conn->is_connected());
689   conn = client_msgr->get_connection(server_msgr->get_myinst());
690   {
691     MPing *m = new MPing();
692     ASSERT_EQ(conn->send_message(m), 0);
693     Mutex::Locker l(cli_dispatcher.lock);
694     while (!cli_dispatcher.got_new)
695       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
696     cli_dispatcher.got_new = false;
697   }
698   ASSERT_TRUE(conn->is_connected());
699   ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
700
701   server_msgr->shutdown();
702   client_msgr->shutdown();
703   server_msgr->wait();
704   client_msgr->wait();
705 }
706
707 TEST_P(MessengerTest, MessageTest) {
708   FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
709   entity_addr_t bind_addr;
710   bind_addr.parse("127.0.0.1");
711   Messenger::Policy p = Messenger::Policy::stateful_server(0);
712   server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
713   p = Messenger::Policy::lossless_peer(0);
714   client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
715
716   server_msgr->bind(bind_addr);
717   server_msgr->add_dispatcher_head(&srv_dispatcher);
718   server_msgr->start();
719   client_msgr->add_dispatcher_head(&cli_dispatcher);
720   client_msgr->start();
721
722
723   // 1. A very large "front"(as well as "payload")
724   // Because a external message need to invade Messenger::decode_message,
725   // here we only use existing message class(MCommand)
726   ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
727   {
728     uuid_d uuid;
729     uuid.generate_random();
730     vector<string> cmds;
731     string s("abcdefghijklmnopqrstuvwxyz");
732     for (int i = 0; i < 1024*30; i++)
733       cmds.push_back(s);
734     MCommand *m = new MCommand(uuid);
735     m->cmd = cmds;
736     conn->send_message(m);
737     utime_t t;
738     t += 1000*1000*500;
739     Mutex::Locker l(cli_dispatcher.lock);
740     while (!cli_dispatcher.got_new)
741       cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
742     ASSERT_TRUE(cli_dispatcher.got_new);
743     cli_dispatcher.got_new = false;
744   }
745
746   // 2. A very large "data"
747   {
748     bufferlist bl;
749     string s("abcdefghijklmnopqrstuvwxyz");
750     for (int i = 0; i < 1024*30; i++)
751       bl.append(s);
752     MPing *m = new MPing();
753     m->set_data(bl);
754     conn->send_message(m);
755     utime_t t;
756     t += 1000*1000*500;
757     Mutex::Locker l(cli_dispatcher.lock);
758     while (!cli_dispatcher.got_new)
759       cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
760     ASSERT_TRUE(cli_dispatcher.got_new);
761     cli_dispatcher.got_new = false;
762   }
763   server_msgr->shutdown();
764   client_msgr->shutdown();
765   server_msgr->wait();
766   client_msgr->wait();
767 }
768
769
770 class SyntheticWorkload;
771
772 struct Payload {
773   enum Who : uint8_t {
774     PING = 0,
775     PONG = 1,
776   };
777   uint8_t who;
778   uint64_t seq;
779   bufferlist data;
780
781   Payload(Who who, uint64_t seq, const bufferlist& data)
782     : who(who), seq(seq), data(data)
783   {}
784   Payload() = default;
785   DENC(Payload, v, p) {
786     DENC_START(1, 1, p);
787     denc(v.who, p);
788     denc(v.seq, p);
789     denc(v.data, p);
790     DENC_FINISH(p);
791   }
792 };
793 WRITE_CLASS_DENC(Payload)
794
795 ostream& operator<<(ostream& out, const Payload &pl)
796 {
797   return out << "reply=" << pl.who << " i = " << pl.seq;
798 }
799
800 class SyntheticDispatcher : public Dispatcher {
801  public:
802   Mutex lock;
803   Cond cond;
804   bool is_server;
805   bool got_new;
806   bool got_remote_reset;
807   bool got_connect;
808   map<ConnectionRef, list<uint64_t> > conn_sent;
809   map<uint64_t, bufferlist> sent;
810   atomic<uint64_t> index;
811   SyntheticWorkload *workload;
812
813   SyntheticDispatcher(bool s, SyntheticWorkload *wl):
814       Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false),
815       got_remote_reset(false), got_connect(false), index(0), workload(wl) {}
816   bool ms_can_fast_dispatch_any() const override { return true; }
817   bool ms_can_fast_dispatch(const Message *m) const override {
818     switch (m->get_type()) {
819     case CEPH_MSG_PING:
820     case MSG_COMMAND:
821       return true;
822     default:
823       return false;
824     }
825   }
826
827   void ms_handle_fast_connect(Connection *con) override {
828     Mutex::Locker l(lock);
829     list<uint64_t> c = conn_sent[con];
830     for (list<uint64_t>::iterator it = c.begin();
831          it != c.end(); ++it)
832       sent.erase(*it);
833     conn_sent.erase(con);
834     got_connect = true;
835     cond.Signal();
836   }
837   void ms_handle_fast_accept(Connection *con) override {
838     Mutex::Locker l(lock);
839     list<uint64_t> c = conn_sent[con];
840     for (list<uint64_t>::iterator it = c.begin();
841          it != c.end(); ++it)
842       sent.erase(*it);
843     conn_sent.erase(con);
844     cond.Signal();
845   }
846   bool ms_dispatch(Message *m) override {
847     ceph_abort();
848   }
849   bool ms_handle_reset(Connection *con) override;
850   void ms_handle_remote_reset(Connection *con) override {
851     Mutex::Locker l(lock);
852     list<uint64_t> c = conn_sent[con];
853     for (list<uint64_t>::iterator it = c.begin();
854          it != c.end(); ++it)
855       sent.erase(*it);
856     conn_sent.erase(con);
857     got_remote_reset = true;
858   }
859   bool ms_handle_refused(Connection *con) override {
860     return false;
861   }
862   void ms_fast_dispatch(Message *m) override {
863     // MSG_COMMAND is used to disorganize regular message flow
864     if (m->get_type() == MSG_COMMAND) {
865       m->put();
866       return ;
867     }
868
869     Payload pl;
870     auto p = m->get_data().begin();
871     ::decode(pl, p);
872     if (pl.who == Payload::PING) {
873       lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
874       reply_message(m, pl);
875       m->put();
876       Mutex::Locker l(lock);
877       got_new = true;
878       cond.Signal();
879     } else {
880       Mutex::Locker l(lock);
881       if (sent.count(pl.seq)) {
882         lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
883         ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
884         ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
885         conn_sent[m->get_connection()].pop_front();
886         sent.erase(pl.seq);
887       }
888       m->put();
889       got_new = true;
890       cond.Signal();
891     }
892   }
893
894   bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
895                             bufferlist& authorizer, bufferlist& authorizer_reply,
896                             bool& isvalid, CryptoKey& session_key) override {
897     isvalid = true;
898     return true;
899   }
900
901   void reply_message(const Message *m, Payload& pl) {
902     pl.who = Payload::PONG;
903     bufferlist bl;
904     ::encode(pl, bl);
905     MPing *rm = new MPing();
906     rm->set_data(bl);
907     m->get_connection()->send_message(rm);
908     lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
909   }
910
911   void send_message_wrap(ConnectionRef con, const bufferlist& data) {
912     Message *m = new MPing();
913     Payload pl{Payload::PING, index++, data};
914     bufferlist bl;
915     ::encode(pl, bl);
916     m->set_data(bl);
917     if (!con->get_messenger()->get_default_policy().lossy) {
918       Mutex::Locker l(lock);
919       sent[pl.seq] = pl.data;
920       conn_sent[con].push_back(pl.seq);
921     }
922     lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
923     ASSERT_EQ(0, con->send_message(m));
924   }
925
926   uint64_t get_pending() {
927     Mutex::Locker l(lock);
928     return sent.size();
929   }
930
931   void clear_pending(ConnectionRef con) {
932     Mutex::Locker l(lock);
933
934     for (list<uint64_t>::iterator it = conn_sent[con].begin();
935          it != conn_sent[con].end(); ++it)
936       sent.erase(*it);
937     conn_sent.erase(con);
938   }
939
940   void print() {
941     for (auto && p : conn_sent) {
942       if (!p.second.empty()) {
943         lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl;
944       }
945     }
946   }
947 };
948
949
950 class SyntheticWorkload {
951   Mutex lock;
952   Cond cond;
953   set<Messenger*> available_servers;
954   set<Messenger*> available_clients;
955   map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
956   SyntheticDispatcher dispatcher;
957   gen_type rng;
958   vector<bufferlist> rand_data;
959
960  public:
961   static const unsigned max_in_flight = 64;
962   static const unsigned max_connections = 128;
963   static const unsigned max_message_len = 1024 * 1024 * 4;
964
965   SyntheticWorkload(int servers, int clients, string type, int random_num,
966                     Messenger::Policy srv_policy, Messenger::Policy cli_policy):
967       lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL)) {
968     Messenger *msgr;
969     int base_port = 16800;
970     entity_addr_t bind_addr;
971     char addr[64];
972     for (int i = 0; i < servers; ++i) {
973       msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
974                                "server", getpid()+i, 0);
975       snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
976       bind_addr.parse(addr);
977       msgr->bind(bind_addr);
978       msgr->add_dispatcher_head(&dispatcher);
979
980       assert(msgr);
981       msgr->set_default_policy(srv_policy);
982       available_servers.insert(msgr);
983       msgr->start();
984     }
985
986     for (int i = 0; i < clients; ++i) {
987       msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
988                                "client", getpid()+i+servers, 0);
989       if (cli_policy.standby) {
990         snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers);
991         bind_addr.parse(addr);
992         msgr->bind(bind_addr);
993       }
994       msgr->add_dispatcher_head(&dispatcher);
995
996       assert(msgr);
997       msgr->set_default_policy(cli_policy);
998       available_clients.insert(msgr);
999       msgr->start();
1000     }
1001
1002     for (int i = 0; i < random_num; i++) {
1003       bufferlist bl;
1004       boost::uniform_int<> u(32, max_message_len);
1005       uint64_t value_len = u(rng);
1006       bufferptr bp(value_len);
1007       bp.zero();
1008       for (uint64_t j = 0; j < value_len-sizeof(i); ) {
1009         memcpy(bp.c_str()+j, &i, sizeof(i));
1010         j += 4096;
1011       }
1012
1013       bl.append(bp);
1014       rand_data.push_back(bl);
1015     }
1016   }
1017
1018   ConnectionRef _get_random_connection() {
1019     while (dispatcher.get_pending() > max_in_flight) {
1020       lock.Unlock();
1021       usleep(500);
1022       lock.Lock();
1023     }
1024     assert(lock.is_locked());
1025     boost::uniform_int<> choose(0, available_connections.size() - 1);
1026     int index = choose(rng);
1027     map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
1028     for (; index > 0; --index, ++i) ;
1029     return i->first;
1030   }
1031
1032   bool can_create_connection() {
1033     return available_connections.size() < max_connections;
1034   }
1035
1036   void generate_connection() {
1037     Mutex::Locker l(lock);
1038     if (!can_create_connection())
1039       return ;
1040
1041     Messenger *server, *client;
1042     {
1043       boost::uniform_int<> choose(0, available_servers.size() - 1);
1044       int index = choose(rng);
1045       set<Messenger*>::iterator i = available_servers.begin();
1046       for (; index > 0; --index, ++i) ;
1047       server = *i;
1048     }
1049     {
1050       boost::uniform_int<> choose(0, available_clients.size() - 1);
1051       int index = choose(rng);
1052       set<Messenger*>::iterator i = available_clients.begin();
1053       for (; index > 0; --index, ++i) ;
1054       client = *i;
1055     }
1056
1057     pair<Messenger*, Messenger*> p;
1058     {
1059       boost::uniform_int<> choose(0, available_servers.size() - 1);
1060       if (server->get_default_policy().server) {
1061         p = make_pair(client, server);
1062       } else {
1063         ConnectionRef conn = client->get_connection(server->get_myinst());
1064         if (available_connections.count(conn) || choose(rng) % 2)
1065           p = make_pair(client, server);
1066         else
1067           p = make_pair(server, client);
1068       }
1069     }
1070     ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
1071     available_connections[conn] = p;
1072   }
1073
1074   void send_message() {
1075     Mutex::Locker l(lock);
1076     ConnectionRef conn = _get_random_connection();
1077     boost::uniform_int<> true_false(0, 99);
1078     int val = true_false(rng);
1079     if (val >= 95) {
1080       uuid_d uuid;
1081       uuid.generate_random();
1082       MCommand *m = new MCommand(uuid);
1083       vector<string> cmds;
1084       cmds.push_back("command");
1085       m->cmd = cmds;
1086       m->set_priority(200);
1087       conn->send_message(m);
1088     } else {
1089       boost::uniform_int<> u(0, rand_data.size()-1);
1090       dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
1091     }
1092   }
1093
1094   void drop_connection() {
1095     Mutex::Locker l(lock);
1096     if (available_connections.size() < 10)
1097       return;
1098     ConnectionRef conn = _get_random_connection();
1099     dispatcher.clear_pending(conn);
1100     conn->mark_down();
1101     pair<Messenger*, Messenger*> &p = available_connections[conn];
1102     // it's a lossless policy, so we need to mark down each side
1103     if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
1104       ASSERT_EQ(conn->get_messenger(), p.first);
1105       ConnectionRef peer = p.second->get_connection(p.first->get_myinst());
1106       peer->mark_down();
1107       dispatcher.clear_pending(peer);
1108       available_connections.erase(peer);
1109     }
1110     ASSERT_EQ(available_connections.erase(conn), 1U);
1111   }
1112
1113   void print_internal_state(bool detail=false) {
1114     Mutex::Locker l(lock);
1115     lderr(g_ceph_context) << "available_connections: " << available_connections.size()
1116          << " inflight messages: " << dispatcher.get_pending() << dendl;
1117     if (detail && !available_connections.empty()) {
1118       dispatcher.print();
1119     }
1120   }
1121
1122   void wait_for_done() {
1123     int64_t tick_us = 1000 * 100; // 100ms
1124     int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins
1125     int i = 0;
1126     while (dispatcher.get_pending()) {
1127       usleep(tick_us);
1128       timeout_us -= tick_us;
1129       if (i++ % 50 == 0)
1130         print_internal_state(true);
1131       if (timeout_us < 0)
1132         assert(0 == " loop time exceed 5 mins, it looks we stuck into some problems!");
1133     }
1134     for (set<Messenger*>::iterator it = available_servers.begin();
1135          it != available_servers.end(); ++it) {
1136       (*it)->shutdown();
1137       (*it)->wait();
1138       ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1139       delete (*it);
1140     }
1141     available_servers.clear();
1142
1143     for (set<Messenger*>::iterator it = available_clients.begin();
1144          it != available_clients.end(); ++it) {
1145       (*it)->shutdown();
1146       (*it)->wait();
1147       ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1148       delete (*it);
1149     }
1150     available_clients.clear();
1151   }
1152
1153   void handle_reset(Connection *con) {
1154     Mutex::Locker l(lock);
1155     available_connections.erase(con);
1156     dispatcher.clear_pending(con);
1157   }
1158 };
1159
1160 bool SyntheticDispatcher::ms_handle_reset(Connection *con) {
1161   workload->handle_reset(con);
1162   return true;
1163 }
1164
1165 TEST_P(MessengerTest, SyntheticStressTest) {
1166   SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1167                              Messenger::Policy::stateful_server(0),
1168                              Messenger::Policy::lossless_client(0));
1169   for (int i = 0; i < 100; ++i) {
1170     if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1171     test_msg.generate_connection();
1172   }
1173   gen_type rng(time(NULL));
1174   for (int i = 0; i < 5000; ++i) {
1175     if (!(i % 10)) {
1176       lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1177       test_msg.print_internal_state();
1178     }
1179     boost::uniform_int<> true_false(0, 99);
1180     int val = true_false(rng);
1181     if (val > 90) {
1182       test_msg.generate_connection();
1183     } else if (val > 80) {
1184       test_msg.drop_connection();
1185     } else if (val > 10) {
1186       test_msg.send_message();
1187     } else {
1188       usleep(rand() % 1000 + 500);
1189     }
1190   }
1191   test_msg.wait_for_done();
1192 }
1193
1194 TEST_P(MessengerTest, SyntheticStressTest1) {
1195   SyntheticWorkload test_msg(16, 32, GetParam(), 100,
1196                              Messenger::Policy::lossless_peer_reuse(0),
1197                              Messenger::Policy::lossless_peer_reuse(0));
1198   for (int i = 0; i < 10; ++i) {
1199     if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1200     test_msg.generate_connection();
1201   }
1202   gen_type rng(time(NULL));
1203   for (int i = 0; i < 10000; ++i) {
1204     if (!(i % 10)) {
1205       lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1206       test_msg.print_internal_state();
1207     }
1208     boost::uniform_int<> true_false(0, 99);
1209     int val = true_false(rng);
1210     if (val > 80) {
1211       test_msg.generate_connection();
1212     } else if (val > 60) {
1213       test_msg.drop_connection();
1214     } else if (val > 10) {
1215       test_msg.send_message();
1216     } else {
1217       usleep(rand() % 1000 + 500);
1218     }
1219   }
1220   test_msg.wait_for_done();
1221 }
1222
1223
1224 TEST_P(MessengerTest, SyntheticInjectTest) {
1225   uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes;
1226   g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
1227   g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1228   g_ceph_context->_conf->set_val("ms_dispatch_throttle_bytes", "16777216");
1229   SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1230                              Messenger::Policy::stateful_server(0),
1231                              Messenger::Policy::lossless_client(0));
1232   for (int i = 0; i < 100; ++i) {
1233     if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1234     test_msg.generate_connection();
1235   }
1236   gen_type rng(time(NULL));
1237   for (int i = 0; i < 1000; ++i) {
1238     if (!(i % 10)) {
1239       lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1240       test_msg.print_internal_state();
1241     }
1242     boost::uniform_int<> true_false(0, 99);
1243     int val = true_false(rng);
1244     if (val > 90) {
1245       test_msg.generate_connection();
1246     } else if (val > 80) {
1247       test_msg.drop_connection();
1248     } else if (val > 10) {
1249       test_msg.send_message();
1250     } else {
1251       usleep(rand() % 500 + 100);
1252     }
1253   }
1254   test_msg.wait_for_done();
1255   g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1256   g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1257   g_ceph_context->_conf->set_val(
1258       "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes));
1259 }
1260
1261 TEST_P(MessengerTest, SyntheticInjectTest2) {
1262   g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
1263   g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1264   SyntheticWorkload test_msg(8, 16, GetParam(), 100,
1265                              Messenger::Policy::lossless_peer_reuse(0),
1266                              Messenger::Policy::lossless_peer_reuse(0));
1267   for (int i = 0; i < 100; ++i) {
1268     if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1269     test_msg.generate_connection();
1270   }
1271   gen_type rng(time(NULL));
1272   for (int i = 0; i < 1000; ++i) {
1273     if (!(i % 10)) {
1274       lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1275       test_msg.print_internal_state();
1276     }
1277     boost::uniform_int<> true_false(0, 99);
1278     int val = true_false(rng);
1279     if (val > 90) {
1280       test_msg.generate_connection();
1281     } else if (val > 80) {
1282       test_msg.drop_connection();
1283     } else if (val > 10) {
1284       test_msg.send_message();
1285     } else {
1286       usleep(rand() % 500 + 100);
1287     }
1288   }
1289   test_msg.wait_for_done();
1290   g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1291   g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1292 }
1293
1294 TEST_P(MessengerTest, SyntheticInjectTest3) {
1295   g_ceph_context->_conf->set_val("ms_inject_socket_failures", "600");
1296   g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1297   SyntheticWorkload test_msg(8, 16, GetParam(), 100,
1298                              Messenger::Policy::stateless_server(0),
1299                              Messenger::Policy::lossy_client(0));
1300   for (int i = 0; i < 100; ++i) {
1301     if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1302     test_msg.generate_connection();
1303   }
1304   gen_type rng(time(NULL));
1305   for (int i = 0; i < 1000; ++i) {
1306     if (!(i % 10)) {
1307       lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1308       test_msg.print_internal_state();
1309     }
1310     boost::uniform_int<> true_false(0, 99);
1311     int val = true_false(rng);
1312     if (val > 90) {
1313       test_msg.generate_connection();
1314     } else if (val > 80) {
1315       test_msg.drop_connection();
1316     } else if (val > 10) {
1317       test_msg.send_message();
1318     } else {
1319       usleep(rand() % 500 + 100);
1320     }
1321   }
1322   test_msg.wait_for_done();
1323   g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1324   g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1325 }
1326
1327
1328 TEST_P(MessengerTest, SyntheticInjectTest4) {
1329   g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
1330   g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1331   g_ceph_context->_conf->set_val("ms_inject_delay_probability", "1");
1332   g_ceph_context->_conf->set_val("ms_inject_delay_type", "client osd", false);
1333   g_ceph_context->_conf->set_val("ms_inject_delay_max", "5");
1334   SyntheticWorkload test_msg(16, 32, GetParam(), 100,
1335                              Messenger::Policy::lossless_peer(0),
1336                              Messenger::Policy::lossless_peer(0));
1337   for (int i = 0; i < 100; ++i) {
1338     if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1339     test_msg.generate_connection();
1340   }
1341   gen_type rng(time(NULL));
1342   for (int i = 0; i < 1000; ++i) {
1343     if (!(i % 10)) {
1344       lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1345       test_msg.print_internal_state();
1346     }
1347     boost::uniform_int<> true_false(0, 99);
1348     int val = true_false(rng);
1349     if (val > 95) {
1350       test_msg.generate_connection();
1351     } else if (val > 80) {
1352       // test_msg.drop_connection();
1353     } else if (val > 10) {
1354       test_msg.send_message();
1355     } else {
1356       usleep(rand() % 500 + 100);
1357     }
1358   }
1359   test_msg.wait_for_done();
1360   g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1361   g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1362   g_ceph_context->_conf->set_val("ms_inject_delay_probability", "0");
1363   g_ceph_context->_conf->set_val("ms_inject_delay_type", "", false);
1364   g_ceph_context->_conf->set_val("ms_inject_delay_max", "0");
1365 }
1366
1367
1368 class MarkdownDispatcher : public Dispatcher {
1369   Mutex lock;
1370   set<ConnectionRef> conns;
1371   bool last_mark;
1372  public:
1373   std::atomic<uint64_t> count = { 0 };
1374   explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"),
1375                               last_mark(false) {}
1376   bool ms_can_fast_dispatch_any() const override { return false; }
1377   bool ms_can_fast_dispatch(const Message *m) const override {
1378     switch (m->get_type()) {
1379     case CEPH_MSG_PING:
1380       return true;
1381     default:
1382       return false;
1383     }
1384   }
1385
1386   void ms_handle_fast_connect(Connection *con) override {
1387     lderr(g_ceph_context) << __func__ << " " << con << dendl;
1388     Mutex::Locker l(lock);
1389     conns.insert(con);
1390   }
1391   void ms_handle_fast_accept(Connection *con) override {
1392     Mutex::Locker l(lock);
1393     conns.insert(con);
1394   }
1395   bool ms_dispatch(Message *m) override {
1396     lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl;
1397     Mutex::Locker l(lock);
1398     count++;
1399     conns.insert(m->get_connection());
1400     if (conns.size() < 2 && !last_mark) {
1401       m->put();
1402       return true;
1403     }
1404
1405     last_mark = true;
1406     usleep(rand() % 500);
1407     for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) {
1408       if ((*it) != m->get_connection().get()) {
1409         (*it)->mark_down();
1410         conns.erase(it);
1411         break;
1412       }
1413     }
1414     if (conns.empty())
1415       last_mark = false;
1416     m->put();
1417     return true;
1418   }
1419   bool ms_handle_reset(Connection *con) override {
1420     lderr(g_ceph_context) << __func__ << " " << con << dendl;
1421     Mutex::Locker l(lock);
1422     conns.erase(con);
1423     usleep(rand() % 500);
1424     return true;
1425   }
1426   void ms_handle_remote_reset(Connection *con) override {
1427     Mutex::Locker l(lock);
1428     conns.erase(con);
1429     lderr(g_ceph_context) << __func__ << " " << con << dendl;
1430   }
1431   bool ms_handle_refused(Connection *con) override {
1432     return false;
1433   }
1434   void ms_fast_dispatch(Message *m) override {
1435     ceph_abort();
1436   }
1437   bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
1438                             bufferlist& authorizer, bufferlist& authorizer_reply,
1439                             bool& isvalid, CryptoKey& session_key) override {
1440     isvalid = true;
1441     return true;
1442   }
1443 };
1444
1445
1446 // Markdown with external lock
1447 TEST_P(MessengerTest, MarkdownTest) {
1448   Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
1449   MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
1450   entity_addr_t bind_addr;
1451   bind_addr.parse("127.0.0.1:16800");
1452   server_msgr->bind(bind_addr);
1453   server_msgr->add_dispatcher_head(&srv_dispatcher);
1454   server_msgr->start();
1455   bind_addr.parse("127.0.0.1:16801");
1456   server_msgr2->bind(bind_addr);
1457   server_msgr2->add_dispatcher_head(&srv_dispatcher);
1458   server_msgr2->start();
1459
1460   client_msgr->add_dispatcher_head(&cli_dispatcher);
1461   client_msgr->start();
1462
1463   int i = 1000;
1464   uint64_t last = 0;
1465   bool equal = false;
1466   uint64_t equal_count = 0;
1467   while (i--) {
1468     ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst());
1469     ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst());
1470     MPing *m = new MPing();
1471     ASSERT_EQ(conn1->send_message(m), 0);
1472     m = new MPing();
1473     ASSERT_EQ(conn2->send_message(m), 0);
1474     CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1);
1475     if (srv_dispatcher.count == last) {
1476       lderr(g_ceph_context) << __func__ << " last is " << last << dendl;
1477       equal = true;
1478       equal_count++;
1479     } else {
1480       equal = false;
1481       equal_count = 0;
1482     }
1483     last = srv_dispatcher.count;
1484     if (equal_count)
1485       usleep(1000*500);
1486     ASSERT_FALSE(equal && equal_count > 3);
1487   }
1488   server_msgr->shutdown();
1489   client_msgr->shutdown();
1490   server_msgr2->shutdown();
1491   server_msgr->wait();
1492   client_msgr->wait();
1493   server_msgr2->wait();
1494   delete server_msgr2;
1495 }
1496
1497 INSTANTIATE_TEST_CASE_P(
1498   Messenger,
1499   MessengerTest,
1500   ::testing::Values(
1501     "async+posix",
1502     "simple"
1503   )
1504 );
1505
1506 #else
1507
1508 // Google Test may not support value-parameterized tests with some
1509 // compilers. If we use conditional compilation to compile out all
1510 // code referring to the gtest_main library, MSVC linker will not link
1511 // that library at all and consequently complain about missing entry
1512 // point defined in that library (fatal error LNK1561: entry point
1513 // must be defined). This dummy test keeps gtest_main linked in.
1514 TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
1515
1516 #endif
1517
1518
1519 int main(int argc, char **argv) {
1520   vector<const char*> args;
1521   argv_to_vec(argc, (const char **)argv, args);
1522   env_to_vec(args);
1523
1524   auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
1525   g_ceph_context->_conf->set_val("auth_cluster_required", "none");
1526   g_ceph_context->_conf->set_val("auth_service_required", "none");
1527   g_ceph_context->_conf->set_val("auth_client_required", "none");
1528   g_ceph_context->_conf->set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
1529   g_ceph_context->_conf->set_val("ms_die_on_bad_msg", "true");
1530   g_ceph_context->_conf->set_val("ms_die_on_old_message", "true");
1531   g_ceph_context->_conf->set_val("ms_max_backoff", "1");
1532   common_init_finish(g_ceph_context);
1533
1534   ::testing::InitGoogleTest(&argc, argv);
1535   return RUN_ALL_TESTS();
1536 }
1537
1538 /*
1539  * Local Variables:
1540  * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"
1541  * End:
1542  */