X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftest%2Fdirect_messenger%2Ftest_direct_messenger.cc;fp=src%2Fceph%2Fsrc%2Ftest%2Fdirect_messenger%2Ftest_direct_messenger.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=0540baa48cde85b18f25fb67e8388ca746fe258a;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/test/direct_messenger/test_direct_messenger.cc b/src/ceph/src/test/direct_messenger/test_direct_messenger.cc deleted file mode 100644 index 0540baa..0000000 --- a/src/ceph/src/test/direct_messenger/test_direct_messenger.cc +++ /dev/null @@ -1,437 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include -#include -#include - -#include - -#include "global/global_init.h" -#include "common/ceph_argparse.h" - -#include "DirectMessenger.h" -#include "msg/FastStrategy.h" -#include "msg/QueueStrategy.h" -#include "messages/MPing.h" - - -/// mock dispatcher that calls the given callback -class MockDispatcher : public Dispatcher { - std::function callback; - public: - MockDispatcher(CephContext *cct, std::function callback) - : Dispatcher(cct), callback(std::move(callback)) {} - bool ms_handle_reset(Connection *con) override { return false; } - void ms_handle_remote_reset(Connection *con) override {} - bool ms_handle_refused(Connection *con) override { return false; } - bool ms_dispatch(Message *m) override { - callback(m); - m->put(); - return true; - } -}; - -/// test synchronous dispatch of messenger and connection interfaces -TEST(DirectMessenger, SyncDispatch) -{ - auto cct = g_ceph_context; - - // use FastStrategy for synchronous dispatch - DirectMessenger client(cct, entity_name_t::CLIENT(1), - "client", 0, new FastStrategy()); - DirectMessenger server(cct, entity_name_t::CLIENT(2), - "server", 0, new FastStrategy()); - - ASSERT_EQ(0, client.set_direct_peer(&server)); - ASSERT_EQ(0, server.set_direct_peer(&client)); - - bool got_request = false; - bool got_reply = false; - - MockDispatcher client_dispatcher(cct, [&] (Message *m) { - got_reply = true; - }); - client.add_dispatcher_head(&client_dispatcher); - - MockDispatcher server_dispatcher(cct, [&] (Message *m) { - got_request = true; - ASSERT_EQ(0, m->get_connection()->send_message(new MPing())); - }); - server.add_dispatcher_head(&server_dispatcher); - - ASSERT_EQ(0, client.start()); - ASSERT_EQ(0, server.start()); - - // test DirectMessenger::send_message() - ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst())); - ASSERT_TRUE(got_request); - ASSERT_TRUE(got_reply); - - // test DirectConnection::send_message() - { - got_request = false; - got_reply = false; - auto conn = client.get_connection(server.get_myinst()); - ASSERT_EQ(0, conn->send_message(new MPing())); - ASSERT_TRUE(got_request); - ASSERT_TRUE(got_reply); - } - - // test DirectMessenger::send_message() with loopback address - got_request = false; - got_reply = false; - ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst())); - ASSERT_FALSE(got_request); // server should never see this - ASSERT_TRUE(got_reply); - - // test DirectConnection::send_message() with loopback address - { - got_request = false; - got_reply = false; - auto conn = client.get_connection(client.get_myinst()); - ASSERT_EQ(0, conn->send_message(new MPing())); - ASSERT_FALSE(got_request); // server should never see this - ASSERT_TRUE(got_reply); - } - - // test DirectConnection::send_message() with loopback connection - { - got_request = false; - got_reply = false; - auto conn = client.get_loopback_connection(); - ASSERT_EQ(0, conn->send_message(new MPing())); - ASSERT_FALSE(got_request); // server should never see this - ASSERT_TRUE(got_reply); - } - - ASSERT_EQ(0, client.shutdown()); - client.wait(); - - ASSERT_EQ(0, server.shutdown()); - server.wait(); -} - -/// test asynchronous dispatch of messenger and connection interfaces -TEST(DirectMessenger, AsyncDispatch) -{ - auto cct = g_ceph_context; - - // use QueueStrategy for async replies - DirectMessenger client(cct, entity_name_t::CLIENT(1), - "client", 0, new QueueStrategy(1)); - DirectMessenger server(cct, entity_name_t::CLIENT(2), - "server", 0, new FastStrategy()); - - ASSERT_EQ(0, client.set_direct_peer(&server)); - ASSERT_EQ(0, server.set_direct_peer(&client)); - - // condition variable to wait on ping reply - std::mutex mutex; - std::condition_variable cond; - bool done = false; - - auto wait_for_reply = [&] { - std::unique_lock lock(mutex); - while (!done) { - cond.wait(lock); - } - done = false; // clear for reuse - }; - - // client dispatcher signals the condition variable on reply - MockDispatcher client_dispatcher(cct, [&] (Message *m) { - std::lock_guard lock(mutex); - done = true; - cond.notify_one(); - }); - client.add_dispatcher_head(&client_dispatcher); - - MockDispatcher server_dispatcher(cct, [&] (Message *m) { - // hold the lock over the call to send_message() to prove that the client's - // dispatch is asynchronous. if it isn't, it will deadlock - std::lock_guard lock(mutex); - ASSERT_EQ(0, m->get_connection()->send_message(new MPing())); - }); - server.add_dispatcher_head(&server_dispatcher); - - ASSERT_EQ(0, client.start()); - ASSERT_EQ(0, server.start()); - - // test DirectMessenger::send_message() - ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst())); - wait_for_reply(); - - // test DirectConnection::send_message() - { - auto conn = client.get_connection(server.get_myinst()); - ASSERT_EQ(0, conn->send_message(new MPing())); - } - wait_for_reply(); - - // test DirectMessenger::send_message() with loopback address - { - // hold the lock to test that loopback dispatch is asynchronous - std::lock_guard lock(mutex); - ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst())); - } - wait_for_reply(); - - // test DirectConnection::send_message() with loopback address - { - auto conn = client.get_connection(client.get_myinst()); - // hold the lock to test that loopback dispatch is asynchronous - std::lock_guard lock(mutex); - ASSERT_EQ(0, conn->send_message(new MPing())); - } - wait_for_reply(); - - // test DirectConnection::send_message() with loopback connection - { - auto conn = client.get_loopback_connection(); - // hold the lock to test that loopback dispatch is asynchronous - std::lock_guard lock(mutex); - ASSERT_EQ(0, conn->send_message(new MPing())); - } - wait_for_reply(); - - ASSERT_EQ(0, client.shutdown()); - client.wait(); - - ASSERT_EQ(0, server.shutdown()); - server.wait(); -} - -/// test that wait() blocks until shutdown() -TEST(DirectMessenger, WaitShutdown) -{ - auto cct = g_ceph_context; - - // test wait() with both Queue- and FastStrategy - DirectMessenger client(cct, entity_name_t::CLIENT(1), - "client", 0, new QueueStrategy(1)); - DirectMessenger server(cct, entity_name_t::CLIENT(2), - "server", 0, new FastStrategy()); - - ASSERT_EQ(0, client.set_direct_peer(&server)); - ASSERT_EQ(0, server.set_direct_peer(&client)); - - ASSERT_EQ(0, client.start()); - ASSERT_EQ(0, server.start()); - - std::atomic client_waiting{false}; - std::atomic server_waiting{false}; - - // spawn threads to wait() on each of the messengers - std::thread client_thread([&] { - client_waiting = true; - client.wait(); - client_waiting = false; - }); - std::thread server_thread([&] { - server_waiting = true; - server.wait(); - server_waiting = false; - }); - - // give them time to start - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - ASSERT_TRUE(client_waiting); - ASSERT_TRUE(server_waiting); - - // call shutdown to unblock the waiting threads - ASSERT_EQ(0, client.shutdown()); - ASSERT_EQ(0, server.shutdown()); - - client_thread.join(); - server_thread.join(); - - ASSERT_FALSE(client_waiting); - ASSERT_FALSE(server_waiting); -} - -/// test connection and messenger interfaces after mark_down() -TEST(DirectMessenger, MarkDown) -{ - auto cct = g_ceph_context; - - DirectMessenger client(cct, entity_name_t::CLIENT(1), - "client", 0, new FastStrategy()); - DirectMessenger server(cct, entity_name_t::CLIENT(2), - "server", 0, new FastStrategy()); - - ASSERT_EQ(0, client.set_direct_peer(&server)); - ASSERT_EQ(0, server.set_direct_peer(&client)); - - ASSERT_EQ(0, client.start()); - ASSERT_EQ(0, server.start()); - - auto client_to_server = client.get_connection(server.get_myinst()); - auto server_to_client = server.get_connection(client.get_myinst()); - - ASSERT_TRUE(client_to_server->is_connected()); - ASSERT_TRUE(server_to_client->is_connected()); - - // mark_down() breaks the connection on both sides - client_to_server->mark_down(); - - ASSERT_FALSE(client_to_server->is_connected()); - ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing())); - ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst())); - - ASSERT_FALSE(server_to_client->is_connected()); - ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing())); - ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client.get_myinst())); - - ASSERT_EQ(0, client.shutdown()); - client.wait(); - - ASSERT_EQ(0, server.shutdown()); - server.wait(); -} - -/// test connection and messenger interfaces after shutdown() -TEST(DirectMessenger, SendShutdown) -{ - auto cct = g_ceph_context; - - // put client on the heap so we can free it early - std::unique_ptr client{ - new DirectMessenger(cct, entity_name_t::CLIENT(1), - "client", 0, new FastStrategy())}; - DirectMessenger server(cct, entity_name_t::CLIENT(2), - "server", 0, new FastStrategy()); - - ASSERT_EQ(0, client->set_direct_peer(&server)); - ASSERT_EQ(0, server.set_direct_peer(client.get())); - - ASSERT_EQ(0, client->start()); - ASSERT_EQ(0, server.start()); - - const auto client_inst = client->get_myinst(); - const auto server_inst = server.get_myinst(); - - auto client_to_server = client->get_connection(server_inst); - auto server_to_client = server.get_connection(client_inst); - - ASSERT_TRUE(client_to_server->is_connected()); - ASSERT_TRUE(server_to_client->is_connected()); - - // shut down the client to break connections - ASSERT_EQ(0, client->shutdown()); - client->wait(); - - ASSERT_FALSE(client_to_server->is_connected()); - ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing())); - ASSERT_EQ(-ENOTCONN, client->send_message(new MPing(), server_inst)); - - // free the client connection/messenger to test that calls to the server no - // longer try to dereference them - client_to_server.reset(); - client.reset(); - - ASSERT_FALSE(server_to_client->is_connected()); - ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing())); - ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client_inst)); - - ASSERT_EQ(0, server.shutdown()); - server.wait(); -} - -/// test connection and messenger interfaces after bind() -TEST(DirectMessenger, Bind) -{ - auto cct = g_ceph_context; - - DirectMessenger client(cct, entity_name_t::CLIENT(1), - "client", 0, new FastStrategy()); - DirectMessenger server(cct, entity_name_t::CLIENT(2), - "server", 0, new FastStrategy()); - - entity_addr_t client_addr; - client_addr.set_family(AF_INET); - client_addr.set_port(1); - - // client bind succeeds before set_direct_peer() - ASSERT_EQ(0, client.bind(client_addr)); - - ASSERT_EQ(0, client.set_direct_peer(&server)); - ASSERT_EQ(0, server.set_direct_peer(&client)); - - // server bind fails after set_direct_peer() - entity_addr_t empty_addr; - ASSERT_EQ(-EINVAL, server.bind(empty_addr)); - - ASSERT_EQ(0, client.start()); - ASSERT_EQ(0, server.start()); - - auto client_to_server = client.get_connection(server.get_myinst()); - auto server_to_client = server.get_connection(client.get_myinst()); - - ASSERT_TRUE(client_to_server->is_connected()); - ASSERT_TRUE(server_to_client->is_connected()); - - // no address in connection to server - ASSERT_EQ(empty_addr, client_to_server->get_peer_addr()); - // bind address is reflected in connection to client - ASSERT_EQ(client_addr, server_to_client->get_peer_addr()); - - // mark_down() with bind address breaks the connection - server.mark_down(client_addr); - - ASSERT_FALSE(client_to_server->is_connected()); - ASSERT_FALSE(server_to_client->is_connected()); - - ASSERT_EQ(0, client.shutdown()); - client.wait(); - - ASSERT_EQ(0, server.shutdown()); - server.wait(); -} - -/// test connection and messenger interfaces before calls to set_direct_peer() -TEST(DirectMessenger, StartWithoutPeer) -{ - auto cct = g_ceph_context; - - DirectMessenger client(cct, entity_name_t::CLIENT(1), - "client", 0, new FastStrategy()); - DirectMessenger server(cct, entity_name_t::CLIENT(2), - "server", 0, new FastStrategy()); - - // can't start until set_direct_peer() - ASSERT_EQ(-EINVAL, client.start()); - ASSERT_EQ(-EINVAL, server.start()); - - ASSERT_EQ(0, client.set_direct_peer(&server)); - - // only client can start - ASSERT_EQ(0, client.start()); - ASSERT_EQ(-EINVAL, server.start()); - - // client has a connection but can't send - auto conn = client.get_connection(server.get_myinst()); - ASSERT_NE(nullptr, conn); - ASSERT_FALSE(conn->is_connected()); - ASSERT_EQ(-ENOTCONN, conn->send_message(new MPing())); - ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst())); - - ASSERT_EQ(0, client.shutdown()); - client.wait(); -} - -int main(int argc, char **argv) -{ - // command-line arguments - vector args; - argv_to_vec(argc, (const char **)argv, args); - env_to_vec(args); - - auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_ANY, - CODE_ENVIRONMENT_DAEMON, 0); - common_init_finish(cct.get()); - - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -}