// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include #include #include #include #include "global/global_init.h" #include "common/ceph_argparse.h" #include "DirectMessenger.h" #include "msg/FastStrategy.h" #include "msg/QueueStrategy.h" #include "messages/MPing.h" /// mock dispatcher that calls the given callback class MockDispatcher : public Dispatcher { std::function callback; public: MockDispatcher(CephContext *cct, std::function callback) : Dispatcher(cct), callback(std::move(callback)) {} bool ms_handle_reset(Connection *con) override { return false; } void ms_handle_remote_reset(Connection *con) override {} bool ms_handle_refused(Connection *con) override { return false; } bool ms_dispatch(Message *m) override { callback(m); m->put(); return true; } }; /// test synchronous dispatch of messenger and connection interfaces TEST(DirectMessenger, SyncDispatch) { auto cct = g_ceph_context; // use FastStrategy for synchronous dispatch DirectMessenger client(cct, entity_name_t::CLIENT(1), "client", 0, new FastStrategy()); DirectMessenger server(cct, entity_name_t::CLIENT(2), "server", 0, new FastStrategy()); ASSERT_EQ(0, client.set_direct_peer(&server)); ASSERT_EQ(0, server.set_direct_peer(&client)); bool got_request = false; bool got_reply = false; MockDispatcher client_dispatcher(cct, [&] (Message *m) { got_reply = true; }); client.add_dispatcher_head(&client_dispatcher); MockDispatcher server_dispatcher(cct, [&] (Message *m) { got_request = true; ASSERT_EQ(0, m->get_connection()->send_message(new MPing())); }); server.add_dispatcher_head(&server_dispatcher); ASSERT_EQ(0, client.start()); ASSERT_EQ(0, server.start()); // test DirectMessenger::send_message() ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst())); ASSERT_TRUE(got_request); ASSERT_TRUE(got_reply); // test DirectConnection::send_message() { got_request = false; got_reply = false; auto conn = client.get_connection(server.get_myinst()); ASSERT_EQ(0, conn->send_message(new MPing())); ASSERT_TRUE(got_request); ASSERT_TRUE(got_reply); } // test DirectMessenger::send_message() with loopback address got_request = false; got_reply = false; ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst())); ASSERT_FALSE(got_request); // server should never see this ASSERT_TRUE(got_reply); // test DirectConnection::send_message() with loopback address { got_request = false; got_reply = false; auto conn = client.get_connection(client.get_myinst()); ASSERT_EQ(0, conn->send_message(new MPing())); ASSERT_FALSE(got_request); // server should never see this ASSERT_TRUE(got_reply); } // test DirectConnection::send_message() with loopback connection { got_request = false; got_reply = false; auto conn = client.get_loopback_connection(); ASSERT_EQ(0, conn->send_message(new MPing())); ASSERT_FALSE(got_request); // server should never see this ASSERT_TRUE(got_reply); } ASSERT_EQ(0, client.shutdown()); client.wait(); ASSERT_EQ(0, server.shutdown()); server.wait(); } /// test asynchronous dispatch of messenger and connection interfaces TEST(DirectMessenger, AsyncDispatch) { auto cct = g_ceph_context; // use QueueStrategy for async replies DirectMessenger client(cct, entity_name_t::CLIENT(1), "client", 0, new QueueStrategy(1)); DirectMessenger server(cct, entity_name_t::CLIENT(2), "server", 0, new FastStrategy()); ASSERT_EQ(0, client.set_direct_peer(&server)); ASSERT_EQ(0, server.set_direct_peer(&client)); // condition variable to wait on ping reply std::mutex mutex; std::condition_variable cond; bool done = false; auto wait_for_reply = [&] { std::unique_lock lock(mutex); while (!done) { cond.wait(lock); } done = false; // clear for reuse }; // client dispatcher signals the condition variable on reply MockDispatcher client_dispatcher(cct, [&] (Message *m) { std::lock_guard lock(mutex); done = true; cond.notify_one(); }); client.add_dispatcher_head(&client_dispatcher); MockDispatcher server_dispatcher(cct, [&] (Message *m) { // hold the lock over the call to send_message() to prove that the client's // dispatch is asynchronous. if it isn't, it will deadlock std::lock_guard lock(mutex); ASSERT_EQ(0, m->get_connection()->send_message(new MPing())); }); server.add_dispatcher_head(&server_dispatcher); ASSERT_EQ(0, client.start()); ASSERT_EQ(0, server.start()); // test DirectMessenger::send_message() ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst())); wait_for_reply(); // test DirectConnection::send_message() { auto conn = client.get_connection(server.get_myinst()); ASSERT_EQ(0, conn->send_message(new MPing())); } wait_for_reply(); // test DirectMessenger::send_message() with loopback address { // hold the lock to test that loopback dispatch is asynchronous std::lock_guard lock(mutex); ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst())); } wait_for_reply(); // test DirectConnection::send_message() with loopback address { auto conn = client.get_connection(client.get_myinst()); // hold the lock to test that loopback dispatch is asynchronous std::lock_guard lock(mutex); ASSERT_EQ(0, conn->send_message(new MPing())); } wait_for_reply(); // test DirectConnection::send_message() with loopback connection { auto conn = client.get_loopback_connection(); // hold the lock to test that loopback dispatch is asynchronous std::lock_guard lock(mutex); ASSERT_EQ(0, conn->send_message(new MPing())); } wait_for_reply(); ASSERT_EQ(0, client.shutdown()); client.wait(); ASSERT_EQ(0, server.shutdown()); server.wait(); } /// test that wait() blocks until shutdown() TEST(DirectMessenger, WaitShutdown) { auto cct = g_ceph_context; // test wait() with both Queue- and FastStrategy DirectMessenger client(cct, entity_name_t::CLIENT(1), "client", 0, new QueueStrategy(1)); DirectMessenger server(cct, entity_name_t::CLIENT(2), "server", 0, new FastStrategy()); ASSERT_EQ(0, client.set_direct_peer(&server)); ASSERT_EQ(0, server.set_direct_peer(&client)); ASSERT_EQ(0, client.start()); ASSERT_EQ(0, server.start()); std::atomic client_waiting{false}; std::atomic server_waiting{false}; // spawn threads to wait() on each of the messengers std::thread client_thread([&] { client_waiting = true; client.wait(); client_waiting = false; }); std::thread server_thread([&] { server_waiting = true; server.wait(); server_waiting = false; }); // give them time to start std::this_thread::sleep_for(std::chrono::milliseconds(50)); ASSERT_TRUE(client_waiting); ASSERT_TRUE(server_waiting); // call shutdown to unblock the waiting threads ASSERT_EQ(0, client.shutdown()); ASSERT_EQ(0, server.shutdown()); client_thread.join(); server_thread.join(); ASSERT_FALSE(client_waiting); ASSERT_FALSE(server_waiting); } /// test connection and messenger interfaces after mark_down() TEST(DirectMessenger, MarkDown) { auto cct = g_ceph_context; DirectMessenger client(cct, entity_name_t::CLIENT(1), "client", 0, new FastStrategy()); DirectMessenger server(cct, entity_name_t::CLIENT(2), "server", 0, new FastStrategy()); ASSERT_EQ(0, client.set_direct_peer(&server)); ASSERT_EQ(0, server.set_direct_peer(&client)); ASSERT_EQ(0, client.start()); ASSERT_EQ(0, server.start()); auto client_to_server = client.get_connection(server.get_myinst()); auto server_to_client = server.get_connection(client.get_myinst()); ASSERT_TRUE(client_to_server->is_connected()); ASSERT_TRUE(server_to_client->is_connected()); // mark_down() breaks the connection on both sides client_to_server->mark_down(); ASSERT_FALSE(client_to_server->is_connected()); ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing())); ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst())); ASSERT_FALSE(server_to_client->is_connected()); ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing())); ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client.get_myinst())); ASSERT_EQ(0, client.shutdown()); client.wait(); ASSERT_EQ(0, server.shutdown()); server.wait(); } /// test connection and messenger interfaces after shutdown() TEST(DirectMessenger, SendShutdown) { auto cct = g_ceph_context; // put client on the heap so we can free it early std::unique_ptr client{ new DirectMessenger(cct, entity_name_t::CLIENT(1), "client", 0, new FastStrategy())}; DirectMessenger server(cct, entity_name_t::CLIENT(2), "server", 0, new FastStrategy()); ASSERT_EQ(0, client->set_direct_peer(&server)); ASSERT_EQ(0, server.set_direct_peer(client.get())); ASSERT_EQ(0, client->start()); ASSERT_EQ(0, server.start()); const auto client_inst = client->get_myinst(); const auto server_inst = server.get_myinst(); auto client_to_server = client->get_connection(server_inst); auto server_to_client = server.get_connection(client_inst); ASSERT_TRUE(client_to_server->is_connected()); ASSERT_TRUE(server_to_client->is_connected()); // shut down the client to break connections ASSERT_EQ(0, client->shutdown()); client->wait(); ASSERT_FALSE(client_to_server->is_connected()); ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing())); ASSERT_EQ(-ENOTCONN, client->send_message(new MPing(), server_inst)); // free the client connection/messenger to test that calls to the server no // longer try to dereference them client_to_server.reset(); client.reset(); ASSERT_FALSE(server_to_client->is_connected()); ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing())); ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client_inst)); ASSERT_EQ(0, server.shutdown()); server.wait(); } /// test connection and messenger interfaces after bind() TEST(DirectMessenger, Bind) { auto cct = g_ceph_context; DirectMessenger client(cct, entity_name_t::CLIENT(1), "client", 0, new FastStrategy()); DirectMessenger server(cct, entity_name_t::CLIENT(2), "server", 0, new FastStrategy()); entity_addr_t client_addr; client_addr.set_family(AF_INET); client_addr.set_port(1); // client bind succeeds before set_direct_peer() ASSERT_EQ(0, client.bind(client_addr)); ASSERT_EQ(0, client.set_direct_peer(&server)); ASSERT_EQ(0, server.set_direct_peer(&client)); // server bind fails after set_direct_peer() entity_addr_t empty_addr; ASSERT_EQ(-EINVAL, server.bind(empty_addr)); ASSERT_EQ(0, client.start()); ASSERT_EQ(0, server.start()); auto client_to_server = client.get_connection(server.get_myinst()); auto server_to_client = server.get_connection(client.get_myinst()); ASSERT_TRUE(client_to_server->is_connected()); ASSERT_TRUE(server_to_client->is_connected()); // no address in connection to server ASSERT_EQ(empty_addr, client_to_server->get_peer_addr()); // bind address is reflected in connection to client ASSERT_EQ(client_addr, server_to_client->get_peer_addr()); // mark_down() with bind address breaks the connection server.mark_down(client_addr); ASSERT_FALSE(client_to_server->is_connected()); ASSERT_FALSE(server_to_client->is_connected()); ASSERT_EQ(0, client.shutdown()); client.wait(); ASSERT_EQ(0, server.shutdown()); server.wait(); } /// test connection and messenger interfaces before calls to set_direct_peer() TEST(DirectMessenger, StartWithoutPeer) { auto cct = g_ceph_context; DirectMessenger client(cct, entity_name_t::CLIENT(1), "client", 0, new FastStrategy()); DirectMessenger server(cct, entity_name_t::CLIENT(2), "server", 0, new FastStrategy()); // can't start until set_direct_peer() ASSERT_EQ(-EINVAL, client.start()); ASSERT_EQ(-EINVAL, server.start()); ASSERT_EQ(0, client.set_direct_peer(&server)); // only client can start ASSERT_EQ(0, client.start()); ASSERT_EQ(-EINVAL, server.start()); // client has a connection but can't send auto conn = client.get_connection(server.get_myinst()); ASSERT_NE(nullptr, conn); ASSERT_FALSE(conn->is_connected()); ASSERT_EQ(-ENOTCONN, conn->send_message(new MPing())); ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst())); ASSERT_EQ(0, client.shutdown()); client.wait(); } int main(int argc, char **argv) { // command-line arguments vector args; argv_to_vec(argc, (const char **)argv, args); env_to_vec(args); auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_ANY, CODE_ENVIRONMENT_DAEMON, 0); common_init_finish(cct.get()); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }