1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
22 #include "common/Mutex.h"
23 #include "common/Cond.h"
24 #include "common/ceph_argparse.h"
25 #include "global/global_init.h"
26 #include "msg/Dispatcher.h"
27 #include "msg/msg_types.h"
28 #include "msg/Message.h"
29 #include "msg/Messenger.h"
30 #include "msg/Connection.h"
31 #include "messages/MPing.h"
32 #include "messages/MCommand.h"
34 #include <boost/random/mersenne_twister.hpp>
35 #include <boost/random/uniform_int.hpp>
36 #include <boost/random/binomial_distribution.hpp>
37 #include <gtest/gtest.h>
39 typedef boost::mt11213b gen_type;
41 #include "common/dout.h"
42 #include "include/assert.h"
44 #define dout_subsys ceph_subsys_ms
46 #define dout_prefix *_dout << " ceph_test_msgr "
49 #if GTEST_HAS_PARAM_TEST
51 #define CHECK_AND_WAIT_TRUE(expr) do { \
60 class MessengerTest : public ::testing::TestWithParam<const char*> {
62 Messenger *server_msgr;
63 Messenger *client_msgr;
65 MessengerTest(): server_msgr(NULL), client_msgr(NULL) {}
66 void SetUp() override {
67 lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
68 server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
69 client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
70 server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
71 client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
73 void TearDown() override {
74 ASSERT_EQ(server_msgr->get_dispatch_queue_len(), 0);
75 ASSERT_EQ(client_msgr->get_dispatch_queue_len(), 0);
83 class FakeDispatcher : public Dispatcher {
85 struct Session : public RefCountedObject {
86 atomic<uint64_t> count;
89 explicit Session(ConnectionRef c): RefCountedObject(g_ceph_context), count(0), con(c) {
91 uint64_t get_count() { return count; }
98 bool got_remote_reset;
102 explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context), lock("FakeDispatcher::lock"),
103 is_server(s), got_new(false), got_remote_reset(false),
104 got_connect(false), loopback(false) {}
105 bool ms_can_fast_dispatch_any() const override { return true; }
106 bool ms_can_fast_dispatch(const Message *m) const override {
107 switch (m->get_type()) {
115 void ms_handle_fast_connect(Connection *con) override {
117 lderr(g_ceph_context) << __func__ << " " << con << dendl;
118 Session *s = static_cast<Session*>(con->get_priv());
120 s = new Session(con);
121 con->set_priv(s->get());
122 lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl;
129 void ms_handle_fast_accept(Connection *con) override {
130 Session *s = static_cast<Session*>(con->get_priv());
132 s = new Session(con);
133 con->set_priv(s->get());
137 bool ms_dispatch(Message *m) override {
138 Session *s = static_cast<Session*>(m->get_connection()->get_priv());
140 s = new Session(m->get_connection());
141 m->get_connection()->set_priv(s->get());
145 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
149 Mutex::Locker l(lock);
155 bool ms_handle_reset(Connection *con) override {
156 Mutex::Locker l(lock);
157 lderr(g_ceph_context) << __func__ << " " << con << dendl;
158 Session *s = static_cast<Session*>(con->get_priv());
160 s->con.reset(NULL); // break con <-> session ref cycle
161 con->set_priv(NULL); // break ref <-> session cycle, if any
166 void ms_handle_remote_reset(Connection *con) override {
167 Mutex::Locker l(lock);
168 lderr(g_ceph_context) << __func__ << " " << con << dendl;
169 Session *s = static_cast<Session*>(con->get_priv());
171 s->con.reset(NULL); // break con <-> session ref cycle
172 con->set_priv(NULL); // break ref <-> session cycle, if any
175 got_remote_reset = true;
178 bool ms_handle_refused(Connection *con) override {
181 void ms_fast_dispatch(Message *m) override {
182 Session *s = static_cast<Session*>(m->get_connection()->get_priv());
184 s = new Session(m->get_connection());
185 m->get_connection()->set_priv(s->get());
189 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
192 assert(m->get_source().is_osd());
195 } else if (loopback) {
196 assert(m->get_source().is_client());
199 Mutex::Locker l(lock);
204 bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
205 bufferlist& authorizer, bufferlist& authorizer_reply,
206 bool& isvalid, CryptoKey& session_key) override {
211 void reply_message(Message *m) {
212 MPing *rm = new MPing();
213 m->get_connection()->send_message(rm);
217 typedef FakeDispatcher::Session Session;
219 TEST_P(MessengerTest, SimpleTest) {
220 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
221 entity_addr_t bind_addr;
222 bind_addr.parse("127.0.0.1");
223 server_msgr->bind(bind_addr);
224 server_msgr->add_dispatcher_head(&srv_dispatcher);
225 server_msgr->start();
227 client_msgr->add_dispatcher_head(&cli_dispatcher);
228 client_msgr->start();
230 // 1. simple round trip
231 MPing *m = new MPing();
232 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
234 ASSERT_EQ(conn->send_message(m), 0);
235 Mutex::Locker l(cli_dispatcher.lock);
236 while (!cli_dispatcher.got_new)
237 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
238 cli_dispatcher.got_new = false;
240 ASSERT_TRUE(conn->is_connected());
241 ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
242 ASSERT_TRUE(conn->peer_is_osd());
244 // 2. test rebind port
245 set<int> avoid_ports;
246 for (int i = 0; i < 10 ; i++)
247 avoid_ports.insert(server_msgr->get_myaddr().get_port() + i);
248 server_msgr->rebind(avoid_ports);
249 ASSERT_TRUE(avoid_ports.count(server_msgr->get_myaddr().get_port()) == 0);
251 conn = client_msgr->get_connection(server_msgr->get_myinst());
254 ASSERT_EQ(conn->send_message(m), 0);
255 Mutex::Locker l(cli_dispatcher.lock);
256 while (!cli_dispatcher.got_new)
257 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
258 cli_dispatcher.got_new = false;
260 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
262 // 3. test markdown connection
264 ASSERT_FALSE(conn->is_connected());
266 // 4. test failed connection
267 server_msgr->shutdown();
271 conn->send_message(m);
272 CHECK_AND_WAIT_TRUE(!conn->is_connected());
273 ASSERT_FALSE(conn->is_connected());
275 // 5. loopback connection
276 srv_dispatcher.loopback = true;
277 conn = client_msgr->get_loopback_connection();
280 ASSERT_EQ(conn->send_message(m), 0);
281 Mutex::Locker l(cli_dispatcher.lock);
282 while (!cli_dispatcher.got_new)
283 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
284 cli_dispatcher.got_new = false;
286 srv_dispatcher.loopback = false;
287 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
288 client_msgr->shutdown();
290 server_msgr->shutdown();
294 TEST_P(MessengerTest, NameAddrTest) {
295 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
296 entity_addr_t bind_addr;
297 bind_addr.parse("127.0.0.1");
298 server_msgr->bind(bind_addr);
299 server_msgr->add_dispatcher_head(&srv_dispatcher);
300 server_msgr->start();
302 client_msgr->add_dispatcher_head(&cli_dispatcher);
303 client_msgr->start();
305 MPing *m = new MPing();
306 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
308 ASSERT_EQ(conn->send_message(m), 0);
309 Mutex::Locker l(cli_dispatcher.lock);
310 while (!cli_dispatcher.got_new)
311 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
312 cli_dispatcher.got_new = false;
314 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
315 ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
316 ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
317 // Make should server_conn is the one we already accepted from client,
318 // so it means client_msgr has the same addr when server connection has
319 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
320 server_msgr->shutdown();
321 client_msgr->shutdown();
326 TEST_P(MessengerTest, FeatureTest) {
327 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
328 entity_addr_t bind_addr;
329 bind_addr.parse("127.0.0.1");
330 uint64_t all_feature_supported, feature_required, feature_supported = 0;
331 for (int i = 0; i < 10; i++)
332 feature_supported |= 1ULL << i;
333 feature_required = feature_supported | 1ULL << 13;
334 all_feature_supported = feature_required | 1ULL << 14;
336 Messenger::Policy p = server_msgr->get_policy(entity_name_t::TYPE_CLIENT);
337 p.features_required = feature_required;
338 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
339 server_msgr->bind(bind_addr);
340 server_msgr->add_dispatcher_head(&srv_dispatcher);
341 server_msgr->start();
343 // 1. Suppose if only support less than required
344 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
345 p.features_supported = feature_supported;
346 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
347 client_msgr->add_dispatcher_head(&cli_dispatcher);
348 client_msgr->start();
350 MPing *m = new MPing();
351 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
352 conn->send_message(m);
353 CHECK_AND_WAIT_TRUE(!conn->is_connected());
354 // should failed build a connection
355 ASSERT_FALSE(conn->is_connected());
357 client_msgr->shutdown();
360 // 2. supported met required
361 p = client_msgr->get_policy(entity_name_t::TYPE_OSD);
362 p.features_supported = all_feature_supported;
363 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
364 client_msgr->start();
366 conn = client_msgr->get_connection(server_msgr->get_myinst());
369 ASSERT_EQ(conn->send_message(m), 0);
370 Mutex::Locker l(cli_dispatcher.lock);
371 while (!cli_dispatcher.got_new)
372 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
373 cli_dispatcher.got_new = false;
375 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
377 server_msgr->shutdown();
378 client_msgr->shutdown();
383 TEST_P(MessengerTest, TimeoutTest) {
384 g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "1");
385 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
386 entity_addr_t bind_addr;
387 bind_addr.parse("127.0.0.1");
388 server_msgr->bind(bind_addr);
389 server_msgr->add_dispatcher_head(&srv_dispatcher);
390 server_msgr->start();
392 client_msgr->add_dispatcher_head(&cli_dispatcher);
393 client_msgr->start();
395 // 1. build the connection
396 MPing *m = new MPing();
397 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
399 ASSERT_EQ(conn->send_message(m), 0);
400 Mutex::Locker l(cli_dispatcher.lock);
401 while (!cli_dispatcher.got_new)
402 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
403 cli_dispatcher.got_new = false;
405 ASSERT_TRUE(conn->is_connected());
406 ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
407 ASSERT_TRUE(conn->peer_is_osd());
411 ASSERT_FALSE(conn->is_connected());
413 server_msgr->shutdown();
416 client_msgr->shutdown();
418 g_ceph_context->_conf->set_val("ms_tcp_read_timeout", "900");
421 TEST_P(MessengerTest, StatefulTest) {
423 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
424 entity_addr_t bind_addr;
425 bind_addr.parse("127.0.0.1");
426 Messenger::Policy p = Messenger::Policy::stateful_server(0);
427 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
428 p = Messenger::Policy::lossless_client(0);
429 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
431 server_msgr->bind(bind_addr);
432 server_msgr->add_dispatcher_head(&srv_dispatcher);
433 server_msgr->start();
434 client_msgr->add_dispatcher_head(&cli_dispatcher);
435 client_msgr->start();
437 // 1. test for server standby
438 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
441 ASSERT_EQ(conn->send_message(m), 0);
442 Mutex::Locker l(cli_dispatcher.lock);
443 while (!cli_dispatcher.got_new)
444 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
445 cli_dispatcher.got_new = false;
447 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
449 ASSERT_FALSE(conn->is_connected());
450 ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
452 ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
454 srv_dispatcher.got_new = false;
455 conn = client_msgr->get_connection(server_msgr->get_myinst());
458 ASSERT_EQ(conn->send_message(m), 0);
459 Mutex::Locker l(cli_dispatcher.lock);
460 while (!cli_dispatcher.got_new)
461 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
462 cli_dispatcher.got_new = false;
464 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
465 server_conn = server_msgr->get_connection(client_msgr->get_myinst());
467 Mutex::Locker l(srv_dispatcher.lock);
468 while (!srv_dispatcher.got_remote_reset)
469 srv_dispatcher.cond.Wait(srv_dispatcher.lock);
472 // 2. test for client reconnect
473 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
474 cli_dispatcher.got_connect = false;
475 cli_dispatcher.got_new = false;
476 cli_dispatcher.got_remote_reset = false;
477 server_conn->mark_down();
478 ASSERT_FALSE(server_conn->is_connected());
479 // ensure client detect server socket closed
481 Mutex::Locker l(cli_dispatcher.lock);
482 while (!cli_dispatcher.got_remote_reset)
483 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
484 cli_dispatcher.got_remote_reset = false;
487 Mutex::Locker l(cli_dispatcher.lock);
488 while (!cli_dispatcher.got_connect)
489 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
490 cli_dispatcher.got_connect = false;
492 CHECK_AND_WAIT_TRUE(conn->is_connected());
493 ASSERT_TRUE(conn->is_connected());
497 ASSERT_EQ(conn->send_message(m), 0);
498 ASSERT_TRUE(conn->is_connected());
499 Mutex::Locker l(cli_dispatcher.lock);
500 while (!cli_dispatcher.got_new)
501 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
502 cli_dispatcher.got_new = false;
505 ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv())->get_count());
506 server_conn = server_msgr->get_connection(client_msgr->get_myinst());
507 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
508 cli_dispatcher.got_remote_reset = false;
510 server_msgr->shutdown();
511 client_msgr->shutdown();
516 TEST_P(MessengerTest, StatelessTest) {
518 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
519 entity_addr_t bind_addr;
520 bind_addr.parse("127.0.0.1");
521 Messenger::Policy p = Messenger::Policy::stateless_server(0);
522 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
523 p = Messenger::Policy::lossy_client(0);
524 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
526 server_msgr->bind(bind_addr);
527 server_msgr->add_dispatcher_head(&srv_dispatcher);
528 server_msgr->start();
529 client_msgr->add_dispatcher_head(&cli_dispatcher);
530 client_msgr->start();
532 // 1. test for server lose state
533 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
536 ASSERT_EQ(conn->send_message(m), 0);
537 Mutex::Locker l(cli_dispatcher.lock);
538 while (!cli_dispatcher.got_new)
539 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
540 cli_dispatcher.got_new = false;
542 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
544 ASSERT_FALSE(conn->is_connected());
546 srv_dispatcher.got_new = false;
547 conn = client_msgr->get_connection(server_msgr->get_myinst());
550 ASSERT_EQ(conn->send_message(m), 0);
551 Mutex::Locker l(cli_dispatcher.lock);
552 while (!cli_dispatcher.got_new)
553 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
554 cli_dispatcher.got_new = false;
556 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
557 ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
560 Mutex::Locker l(srv_dispatcher.lock);
561 while (!srv_dispatcher.got_new)
562 srv_dispatcher.cond.Wait(srv_dispatcher.lock);
564 ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
566 // 2. test for client lossy
567 server_conn->mark_down();
568 ASSERT_FALSE(server_conn->is_connected());
569 conn->send_keepalive();
570 CHECK_AND_WAIT_TRUE(!conn->is_connected());
571 ASSERT_FALSE(conn->is_connected());
572 conn = client_msgr->get_connection(server_msgr->get_myinst());
575 ASSERT_EQ(conn->send_message(m), 0);
576 Mutex::Locker l(cli_dispatcher.lock);
577 while (!cli_dispatcher.got_new)
578 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
579 cli_dispatcher.got_new = false;
581 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
583 server_msgr->shutdown();
584 client_msgr->shutdown();
589 TEST_P(MessengerTest, ClientStandbyTest) {
591 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
592 entity_addr_t bind_addr;
593 bind_addr.parse("127.0.0.1");
594 Messenger::Policy p = Messenger::Policy::stateful_server(0);
595 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
596 p = Messenger::Policy::lossless_peer(0);
597 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
599 server_msgr->bind(bind_addr);
600 server_msgr->add_dispatcher_head(&srv_dispatcher);
601 server_msgr->start();
602 client_msgr->add_dispatcher_head(&cli_dispatcher);
603 client_msgr->start();
605 // 1. test for client standby, resetcheck
606 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
609 ASSERT_EQ(conn->send_message(m), 0);
610 Mutex::Locker l(cli_dispatcher.lock);
611 while (!cli_dispatcher.got_new)
612 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
613 cli_dispatcher.got_new = false;
615 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
616 ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
617 ASSERT_FALSE(cli_dispatcher.got_remote_reset);
618 cli_dispatcher.got_connect = false;
619 server_conn->mark_down();
620 ASSERT_FALSE(server_conn->is_connected());
621 // client should be standby
623 // client should be standby, so we use original connection
625 // Try send message to verify got remote reset callback
627 ASSERT_EQ(conn->send_message(m), 0);
629 Mutex::Locker l(cli_dispatcher.lock);
630 while (!cli_dispatcher.got_remote_reset)
631 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
632 cli_dispatcher.got_remote_reset = false;
633 while (!cli_dispatcher.got_connect)
634 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
635 cli_dispatcher.got_connect = false;
637 CHECK_AND_WAIT_TRUE(conn->is_connected());
638 ASSERT_TRUE(conn->is_connected());
640 ASSERT_EQ(conn->send_message(m), 0);
641 Mutex::Locker l(cli_dispatcher.lock);
642 while (!cli_dispatcher.got_new)
643 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
644 cli_dispatcher.got_new = false;
646 ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
647 server_conn = server_msgr->get_connection(client_msgr->get_myinst());
648 ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
650 server_msgr->shutdown();
651 client_msgr->shutdown();
656 TEST_P(MessengerTest, AuthTest) {
657 g_ceph_context->_conf->set_val("auth_cluster_required", "cephx");
658 g_ceph_context->_conf->set_val("auth_service_required", "cephx");
659 g_ceph_context->_conf->set_val("auth_client_required", "cephx");
660 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
661 entity_addr_t bind_addr;
662 bind_addr.parse("127.0.0.1");
663 server_msgr->bind(bind_addr);
664 server_msgr->add_dispatcher_head(&srv_dispatcher);
665 server_msgr->start();
667 client_msgr->add_dispatcher_head(&cli_dispatcher);
668 client_msgr->start();
670 // 1. simple auth round trip
671 MPing *m = new MPing();
672 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
674 ASSERT_EQ(conn->send_message(m), 0);
675 Mutex::Locker l(cli_dispatcher.lock);
676 while (!cli_dispatcher.got_new)
677 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
678 cli_dispatcher.got_new = false;
680 ASSERT_TRUE(conn->is_connected());
681 ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
684 g_ceph_context->_conf->set_val("auth_cluster_required", "none");
685 g_ceph_context->_conf->set_val("auth_service_required", "none");
686 g_ceph_context->_conf->set_val("auth_client_required", "none");
688 ASSERT_FALSE(conn->is_connected());
689 conn = client_msgr->get_connection(server_msgr->get_myinst());
691 MPing *m = new MPing();
692 ASSERT_EQ(conn->send_message(m), 0);
693 Mutex::Locker l(cli_dispatcher.lock);
694 while (!cli_dispatcher.got_new)
695 cli_dispatcher.cond.Wait(cli_dispatcher.lock);
696 cli_dispatcher.got_new = false;
698 ASSERT_TRUE(conn->is_connected());
699 ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
701 server_msgr->shutdown();
702 client_msgr->shutdown();
707 TEST_P(MessengerTest, MessageTest) {
708 FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
709 entity_addr_t bind_addr;
710 bind_addr.parse("127.0.0.1");
711 Messenger::Policy p = Messenger::Policy::stateful_server(0);
712 server_msgr->set_policy(entity_name_t::TYPE_CLIENT, p);
713 p = Messenger::Policy::lossless_peer(0);
714 client_msgr->set_policy(entity_name_t::TYPE_OSD, p);
716 server_msgr->bind(bind_addr);
717 server_msgr->add_dispatcher_head(&srv_dispatcher);
718 server_msgr->start();
719 client_msgr->add_dispatcher_head(&cli_dispatcher);
720 client_msgr->start();
723 // 1. A very large "front"(as well as "payload")
724 // Because a external message need to invade Messenger::decode_message,
725 // here we only use existing message class(MCommand)
726 ConnectionRef conn = client_msgr->get_connection(server_msgr->get_myinst());
729 uuid.generate_random();
731 string s("abcdefghijklmnopqrstuvwxyz");
732 for (int i = 0; i < 1024*30; i++)
734 MCommand *m = new MCommand(uuid);
736 conn->send_message(m);
739 Mutex::Locker l(cli_dispatcher.lock);
740 while (!cli_dispatcher.got_new)
741 cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
742 ASSERT_TRUE(cli_dispatcher.got_new);
743 cli_dispatcher.got_new = false;
746 // 2. A very large "data"
749 string s("abcdefghijklmnopqrstuvwxyz");
750 for (int i = 0; i < 1024*30; i++)
752 MPing *m = new MPing();
754 conn->send_message(m);
757 Mutex::Locker l(cli_dispatcher.lock);
758 while (!cli_dispatcher.got_new)
759 cli_dispatcher.cond.WaitInterval(cli_dispatcher.lock, t);
760 ASSERT_TRUE(cli_dispatcher.got_new);
761 cli_dispatcher.got_new = false;
763 server_msgr->shutdown();
764 client_msgr->shutdown();
770 class SyntheticWorkload;
781 Payload(Who who, uint64_t seq, const bufferlist& data)
782 : who(who), seq(seq), data(data)
785 DENC(Payload, v, p) {
793 WRITE_CLASS_DENC(Payload)
795 ostream& operator<<(ostream& out, const Payload &pl)
797 return out << "reply=" << pl.who << " i = " << pl.seq;
800 class SyntheticDispatcher : public Dispatcher {
806 bool got_remote_reset;
808 map<ConnectionRef, list<uint64_t> > conn_sent;
809 map<uint64_t, bufferlist> sent;
810 atomic<uint64_t> index;
811 SyntheticWorkload *workload;
813 SyntheticDispatcher(bool s, SyntheticWorkload *wl):
814 Dispatcher(g_ceph_context), lock("SyntheticDispatcher::lock"), is_server(s), got_new(false),
815 got_remote_reset(false), got_connect(false), index(0), workload(wl) {}
816 bool ms_can_fast_dispatch_any() const override { return true; }
817 bool ms_can_fast_dispatch(const Message *m) const override {
818 switch (m->get_type()) {
827 void ms_handle_fast_connect(Connection *con) override {
828 Mutex::Locker l(lock);
829 list<uint64_t> c = conn_sent[con];
830 for (list<uint64_t>::iterator it = c.begin();
833 conn_sent.erase(con);
837 void ms_handle_fast_accept(Connection *con) override {
838 Mutex::Locker l(lock);
839 list<uint64_t> c = conn_sent[con];
840 for (list<uint64_t>::iterator it = c.begin();
843 conn_sent.erase(con);
846 bool ms_dispatch(Message *m) override {
849 bool ms_handle_reset(Connection *con) override;
850 void ms_handle_remote_reset(Connection *con) override {
851 Mutex::Locker l(lock);
852 list<uint64_t> c = conn_sent[con];
853 for (list<uint64_t>::iterator it = c.begin();
856 conn_sent.erase(con);
857 got_remote_reset = true;
859 bool ms_handle_refused(Connection *con) override {
862 void ms_fast_dispatch(Message *m) override {
863 // MSG_COMMAND is used to disorganize regular message flow
864 if (m->get_type() == MSG_COMMAND) {
870 auto p = m->get_data().begin();
872 if (pl.who == Payload::PING) {
873 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
874 reply_message(m, pl);
876 Mutex::Locker l(lock);
880 Mutex::Locker l(lock);
881 if (sent.count(pl.seq)) {
882 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << pl << dendl;
883 ASSERT_EQ(conn_sent[m->get_connection()].front(), pl.seq);
884 ASSERT_TRUE(pl.data.contents_equal(sent[pl.seq]));
885 conn_sent[m->get_connection()].pop_front();
894 bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
895 bufferlist& authorizer, bufferlist& authorizer_reply,
896 bool& isvalid, CryptoKey& session_key) override {
901 void reply_message(const Message *m, Payload& pl) {
902 pl.who = Payload::PONG;
905 MPing *rm = new MPing();
907 m->get_connection()->send_message(rm);
908 lderr(g_ceph_context) << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << pl.seq << dendl;
911 void send_message_wrap(ConnectionRef con, const bufferlist& data) {
912 Message *m = new MPing();
913 Payload pl{Payload::PING, index++, data};
917 if (!con->get_messenger()->get_default_policy().lossy) {
918 Mutex::Locker l(lock);
919 sent[pl.seq] = pl.data;
920 conn_sent[con].push_back(pl.seq);
922 lderr(g_ceph_context) << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << pl.seq << dendl;
923 ASSERT_EQ(0, con->send_message(m));
926 uint64_t get_pending() {
927 Mutex::Locker l(lock);
931 void clear_pending(ConnectionRef con) {
932 Mutex::Locker l(lock);
934 for (list<uint64_t>::iterator it = conn_sent[con].begin();
935 it != conn_sent[con].end(); ++it)
937 conn_sent.erase(con);
941 for (auto && p : conn_sent) {
942 if (!p.second.empty()) {
943 lderr(g_ceph_context) << __func__ << " " << p.first << " wait " << p.second.size() << dendl;
950 class SyntheticWorkload {
953 set<Messenger*> available_servers;
954 set<Messenger*> available_clients;
955 map<ConnectionRef, pair<Messenger*, Messenger*> > available_connections;
956 SyntheticDispatcher dispatcher;
958 vector<bufferlist> rand_data;
961 static const unsigned max_in_flight = 64;
962 static const unsigned max_connections = 128;
963 static const unsigned max_message_len = 1024 * 1024 * 4;
965 SyntheticWorkload(int servers, int clients, string type, int random_num,
966 Messenger::Policy srv_policy, Messenger::Policy cli_policy):
967 lock("SyntheticWorkload::lock"), dispatcher(false, this), rng(time(NULL)) {
969 int base_port = 16800;
970 entity_addr_t bind_addr;
972 for (int i = 0; i < servers; ++i) {
973 msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
974 "server", getpid()+i, 0);
975 snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
976 bind_addr.parse(addr);
977 msgr->bind(bind_addr);
978 msgr->add_dispatcher_head(&dispatcher);
981 msgr->set_default_policy(srv_policy);
982 available_servers.insert(msgr);
986 for (int i = 0; i < clients; ++i) {
987 msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
988 "client", getpid()+i+servers, 0);
989 if (cli_policy.standby) {
990 snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers);
991 bind_addr.parse(addr);
992 msgr->bind(bind_addr);
994 msgr->add_dispatcher_head(&dispatcher);
997 msgr->set_default_policy(cli_policy);
998 available_clients.insert(msgr);
1002 for (int i = 0; i < random_num; i++) {
1004 boost::uniform_int<> u(32, max_message_len);
1005 uint64_t value_len = u(rng);
1006 bufferptr bp(value_len);
1008 for (uint64_t j = 0; j < value_len-sizeof(i); ) {
1009 memcpy(bp.c_str()+j, &i, sizeof(i));
1014 rand_data.push_back(bl);
1018 ConnectionRef _get_random_connection() {
1019 while (dispatcher.get_pending() > max_in_flight) {
1024 assert(lock.is_locked());
1025 boost::uniform_int<> choose(0, available_connections.size() - 1);
1026 int index = choose(rng);
1027 map<ConnectionRef, pair<Messenger*, Messenger*> >::iterator i = available_connections.begin();
1028 for (; index > 0; --index, ++i) ;
1032 bool can_create_connection() {
1033 return available_connections.size() < max_connections;
1036 void generate_connection() {
1037 Mutex::Locker l(lock);
1038 if (!can_create_connection())
1041 Messenger *server, *client;
1043 boost::uniform_int<> choose(0, available_servers.size() - 1);
1044 int index = choose(rng);
1045 set<Messenger*>::iterator i = available_servers.begin();
1046 for (; index > 0; --index, ++i) ;
1050 boost::uniform_int<> choose(0, available_clients.size() - 1);
1051 int index = choose(rng);
1052 set<Messenger*>::iterator i = available_clients.begin();
1053 for (; index > 0; --index, ++i) ;
1057 pair<Messenger*, Messenger*> p;
1059 boost::uniform_int<> choose(0, available_servers.size() - 1);
1060 if (server->get_default_policy().server) {
1061 p = make_pair(client, server);
1063 ConnectionRef conn = client->get_connection(server->get_myinst());
1064 if (available_connections.count(conn) || choose(rng) % 2)
1065 p = make_pair(client, server);
1067 p = make_pair(server, client);
1070 ConnectionRef conn = p.first->get_connection(p.second->get_myinst());
1071 available_connections[conn] = p;
1074 void send_message() {
1075 Mutex::Locker l(lock);
1076 ConnectionRef conn = _get_random_connection();
1077 boost::uniform_int<> true_false(0, 99);
1078 int val = true_false(rng);
1081 uuid.generate_random();
1082 MCommand *m = new MCommand(uuid);
1083 vector<string> cmds;
1084 cmds.push_back("command");
1086 m->set_priority(200);
1087 conn->send_message(m);
1089 boost::uniform_int<> u(0, rand_data.size()-1);
1090 dispatcher.send_message_wrap(conn, rand_data[u(rng)]);
1094 void drop_connection() {
1095 Mutex::Locker l(lock);
1096 if (available_connections.size() < 10)
1098 ConnectionRef conn = _get_random_connection();
1099 dispatcher.clear_pending(conn);
1101 pair<Messenger*, Messenger*> &p = available_connections[conn];
1102 // it's a lossless policy, so we need to mark down each side
1103 if (!p.first->get_default_policy().server && !p.second->get_default_policy().server) {
1104 ASSERT_EQ(conn->get_messenger(), p.first);
1105 ConnectionRef peer = p.second->get_connection(p.first->get_myinst());
1107 dispatcher.clear_pending(peer);
1108 available_connections.erase(peer);
1110 ASSERT_EQ(available_connections.erase(conn), 1U);
1113 void print_internal_state(bool detail=false) {
1114 Mutex::Locker l(lock);
1115 lderr(g_ceph_context) << "available_connections: " << available_connections.size()
1116 << " inflight messages: " << dispatcher.get_pending() << dendl;
1117 if (detail && !available_connections.empty()) {
1122 void wait_for_done() {
1123 int64_t tick_us = 1000 * 100; // 100ms
1124 int64_t timeout_us = 5 * 60 * 1000 * 1000; // 5 mins
1126 while (dispatcher.get_pending()) {
1128 timeout_us -= tick_us;
1130 print_internal_state(true);
1132 assert(0 == " loop time exceed 5 mins, it looks we stuck into some problems!");
1134 for (set<Messenger*>::iterator it = available_servers.begin();
1135 it != available_servers.end(); ++it) {
1138 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1141 available_servers.clear();
1143 for (set<Messenger*>::iterator it = available_clients.begin();
1144 it != available_clients.end(); ++it) {
1147 ASSERT_EQ((*it)->get_dispatch_queue_len(), 0);
1150 available_clients.clear();
1153 void handle_reset(Connection *con) {
1154 Mutex::Locker l(lock);
1155 available_connections.erase(con);
1156 dispatcher.clear_pending(con);
1160 bool SyntheticDispatcher::ms_handle_reset(Connection *con) {
1161 workload->handle_reset(con);
1165 TEST_P(MessengerTest, SyntheticStressTest) {
1166 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1167 Messenger::Policy::stateful_server(0),
1168 Messenger::Policy::lossless_client(0));
1169 for (int i = 0; i < 100; ++i) {
1170 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1171 test_msg.generate_connection();
1173 gen_type rng(time(NULL));
1174 for (int i = 0; i < 5000; ++i) {
1176 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1177 test_msg.print_internal_state();
1179 boost::uniform_int<> true_false(0, 99);
1180 int val = true_false(rng);
1182 test_msg.generate_connection();
1183 } else if (val > 80) {
1184 test_msg.drop_connection();
1185 } else if (val > 10) {
1186 test_msg.send_message();
1188 usleep(rand() % 1000 + 500);
1191 test_msg.wait_for_done();
1194 TEST_P(MessengerTest, SyntheticStressTest1) {
1195 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
1196 Messenger::Policy::lossless_peer_reuse(0),
1197 Messenger::Policy::lossless_peer_reuse(0));
1198 for (int i = 0; i < 10; ++i) {
1199 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1200 test_msg.generate_connection();
1202 gen_type rng(time(NULL));
1203 for (int i = 0; i < 10000; ++i) {
1205 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1206 test_msg.print_internal_state();
1208 boost::uniform_int<> true_false(0, 99);
1209 int val = true_false(rng);
1211 test_msg.generate_connection();
1212 } else if (val > 60) {
1213 test_msg.drop_connection();
1214 } else if (val > 10) {
1215 test_msg.send_message();
1217 usleep(rand() % 1000 + 500);
1220 test_msg.wait_for_done();
1224 TEST_P(MessengerTest, SyntheticInjectTest) {
1225 uint64_t dispatch_throttle_bytes = g_ceph_context->_conf->ms_dispatch_throttle_bytes;
1226 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
1227 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1228 g_ceph_context->_conf->set_val("ms_dispatch_throttle_bytes", "16777216");
1229 SyntheticWorkload test_msg(8, 32, GetParam(), 100,
1230 Messenger::Policy::stateful_server(0),
1231 Messenger::Policy::lossless_client(0));
1232 for (int i = 0; i < 100; ++i) {
1233 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1234 test_msg.generate_connection();
1236 gen_type rng(time(NULL));
1237 for (int i = 0; i < 1000; ++i) {
1239 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1240 test_msg.print_internal_state();
1242 boost::uniform_int<> true_false(0, 99);
1243 int val = true_false(rng);
1245 test_msg.generate_connection();
1246 } else if (val > 80) {
1247 test_msg.drop_connection();
1248 } else if (val > 10) {
1249 test_msg.send_message();
1251 usleep(rand() % 500 + 100);
1254 test_msg.wait_for_done();
1255 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1256 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1257 g_ceph_context->_conf->set_val(
1258 "ms_dispatch_throttle_bytes", std::to_string(dispatch_throttle_bytes));
1261 TEST_P(MessengerTest, SyntheticInjectTest2) {
1262 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
1263 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1264 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
1265 Messenger::Policy::lossless_peer_reuse(0),
1266 Messenger::Policy::lossless_peer_reuse(0));
1267 for (int i = 0; i < 100; ++i) {
1268 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1269 test_msg.generate_connection();
1271 gen_type rng(time(NULL));
1272 for (int i = 0; i < 1000; ++i) {
1274 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1275 test_msg.print_internal_state();
1277 boost::uniform_int<> true_false(0, 99);
1278 int val = true_false(rng);
1280 test_msg.generate_connection();
1281 } else if (val > 80) {
1282 test_msg.drop_connection();
1283 } else if (val > 10) {
1284 test_msg.send_message();
1286 usleep(rand() % 500 + 100);
1289 test_msg.wait_for_done();
1290 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1291 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1294 TEST_P(MessengerTest, SyntheticInjectTest3) {
1295 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "600");
1296 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1297 SyntheticWorkload test_msg(8, 16, GetParam(), 100,
1298 Messenger::Policy::stateless_server(0),
1299 Messenger::Policy::lossy_client(0));
1300 for (int i = 0; i < 100; ++i) {
1301 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1302 test_msg.generate_connection();
1304 gen_type rng(time(NULL));
1305 for (int i = 0; i < 1000; ++i) {
1307 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1308 test_msg.print_internal_state();
1310 boost::uniform_int<> true_false(0, 99);
1311 int val = true_false(rng);
1313 test_msg.generate_connection();
1314 } else if (val > 80) {
1315 test_msg.drop_connection();
1316 } else if (val > 10) {
1317 test_msg.send_message();
1319 usleep(rand() % 500 + 100);
1322 test_msg.wait_for_done();
1323 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1324 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1328 TEST_P(MessengerTest, SyntheticInjectTest4) {
1329 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
1330 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
1331 g_ceph_context->_conf->set_val("ms_inject_delay_probability", "1");
1332 g_ceph_context->_conf->set_val("ms_inject_delay_type", "client osd", false);
1333 g_ceph_context->_conf->set_val("ms_inject_delay_max", "5");
1334 SyntheticWorkload test_msg(16, 32, GetParam(), 100,
1335 Messenger::Policy::lossless_peer(0),
1336 Messenger::Policy::lossless_peer(0));
1337 for (int i = 0; i < 100; ++i) {
1338 if (!(i % 10)) lderr(g_ceph_context) << "seeding connection " << i << dendl;
1339 test_msg.generate_connection();
1341 gen_type rng(time(NULL));
1342 for (int i = 0; i < 1000; ++i) {
1344 lderr(g_ceph_context) << "Op " << i << ": " << dendl;
1345 test_msg.print_internal_state();
1347 boost::uniform_int<> true_false(0, 99);
1348 int val = true_false(rng);
1350 test_msg.generate_connection();
1351 } else if (val > 80) {
1352 // test_msg.drop_connection();
1353 } else if (val > 10) {
1354 test_msg.send_message();
1356 usleep(rand() % 500 + 100);
1359 test_msg.wait_for_done();
1360 g_ceph_context->_conf->set_val("ms_inject_socket_failures", "0");
1361 g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0");
1362 g_ceph_context->_conf->set_val("ms_inject_delay_probability", "0");
1363 g_ceph_context->_conf->set_val("ms_inject_delay_type", "", false);
1364 g_ceph_context->_conf->set_val("ms_inject_delay_max", "0");
1368 class MarkdownDispatcher : public Dispatcher {
1370 set<ConnectionRef> conns;
1373 std::atomic<uint64_t> count = { 0 };
1374 explicit MarkdownDispatcher(bool s): Dispatcher(g_ceph_context), lock("MarkdownDispatcher::lock"),
1376 bool ms_can_fast_dispatch_any() const override { return false; }
1377 bool ms_can_fast_dispatch(const Message *m) const override {
1378 switch (m->get_type()) {
1386 void ms_handle_fast_connect(Connection *con) override {
1387 lderr(g_ceph_context) << __func__ << " " << con << dendl;
1388 Mutex::Locker l(lock);
1391 void ms_handle_fast_accept(Connection *con) override {
1392 Mutex::Locker l(lock);
1395 bool ms_dispatch(Message *m) override {
1396 lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << dendl;
1397 Mutex::Locker l(lock);
1399 conns.insert(m->get_connection());
1400 if (conns.size() < 2 && !last_mark) {
1406 usleep(rand() % 500);
1407 for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) {
1408 if ((*it) != m->get_connection().get()) {
1419 bool ms_handle_reset(Connection *con) override {
1420 lderr(g_ceph_context) << __func__ << " " << con << dendl;
1421 Mutex::Locker l(lock);
1423 usleep(rand() % 500);
1426 void ms_handle_remote_reset(Connection *con) override {
1427 Mutex::Locker l(lock);
1429 lderr(g_ceph_context) << __func__ << " " << con << dendl;
1431 bool ms_handle_refused(Connection *con) override {
1434 void ms_fast_dispatch(Message *m) override {
1437 bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
1438 bufferlist& authorizer, bufferlist& authorizer_reply,
1439 bool& isvalid, CryptoKey& session_key) override {
1446 // Markdown with external lock
1447 TEST_P(MessengerTest, MarkdownTest) {
1448 Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
1449 MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
1450 entity_addr_t bind_addr;
1451 bind_addr.parse("127.0.0.1:16800");
1452 server_msgr->bind(bind_addr);
1453 server_msgr->add_dispatcher_head(&srv_dispatcher);
1454 server_msgr->start();
1455 bind_addr.parse("127.0.0.1:16801");
1456 server_msgr2->bind(bind_addr);
1457 server_msgr2->add_dispatcher_head(&srv_dispatcher);
1458 server_msgr2->start();
1460 client_msgr->add_dispatcher_head(&cli_dispatcher);
1461 client_msgr->start();
1466 uint64_t equal_count = 0;
1468 ConnectionRef conn1 = client_msgr->get_connection(server_msgr->get_myinst());
1469 ConnectionRef conn2 = client_msgr->get_connection(server_msgr2->get_myinst());
1470 MPing *m = new MPing();
1471 ASSERT_EQ(conn1->send_message(m), 0);
1473 ASSERT_EQ(conn2->send_message(m), 0);
1474 CHECK_AND_WAIT_TRUE(srv_dispatcher.count > last + 1);
1475 if (srv_dispatcher.count == last) {
1476 lderr(g_ceph_context) << __func__ << " last is " << last << dendl;
1483 last = srv_dispatcher.count;
1486 ASSERT_FALSE(equal && equal_count > 3);
1488 server_msgr->shutdown();
1489 client_msgr->shutdown();
1490 server_msgr2->shutdown();
1491 server_msgr->wait();
1492 client_msgr->wait();
1493 server_msgr2->wait();
1494 delete server_msgr2;
1497 INSTANTIATE_TEST_CASE_P(
1508 // Google Test may not support value-parameterized tests with some
1509 // compilers. If we use conditional compilation to compile out all
1510 // code referring to the gtest_main library, MSVC linker will not link
1511 // that library at all and consequently complain about missing entry
1512 // point defined in that library (fatal error LNK1561: entry point
1513 // must be defined). This dummy test keeps gtest_main linked in.
1514 TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {}
1519 int main(int argc, char **argv) {
1520 vector<const char*> args;
1521 argv_to_vec(argc, (const char **)argv, args);
1524 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
1525 g_ceph_context->_conf->set_val("auth_cluster_required", "none");
1526 g_ceph_context->_conf->set_val("auth_service_required", "none");
1527 g_ceph_context->_conf->set_val("auth_client_required", "none");
1528 g_ceph_context->_conf->set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
1529 g_ceph_context->_conf->set_val("ms_die_on_bad_msg", "true");
1530 g_ceph_context->_conf->set_val("ms_die_on_old_message", "true");
1531 g_ceph_context->_conf->set_val("ms_max_backoff", "1");
1532 common_init_finish(g_ceph_context);
1534 ::testing::InitGoogleTest(&argc, argv);
1535 return RUN_ALL_TESTS();
1540 * compile-command: "cd ../.. ; make -j4 ceph_test_msgr && valgrind --tool=memcheck ./ceph_test_msgr"