X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Ftest%2Fdirect_messenger%2FDirectMessenger.cc;fp=src%2Fceph%2Fsrc%2Ftest%2Fdirect_messenger%2FDirectMessenger.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=ea6439e18d3f02a60e50c4ce31d674b9dc40ee1f;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/test/direct_messenger/DirectMessenger.cc b/src/ceph/src/test/direct_messenger/DirectMessenger.cc deleted file mode 100644 index ea6439e..0000000 --- a/src/ceph/src/test/direct_messenger/DirectMessenger.cc +++ /dev/null @@ -1,252 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "DirectMessenger.h" -#include "msg/DispatchStrategy.h" - - -class DirectConnection : public Connection { - /// sent messages are dispatched here - DispatchStrategy *const dispatchers; - - /// the connection that will be attached to outgoing messages, so that replies - /// can be dispatched back to the sender. the pointer is atomic for - /// thread-safety between mark_down() and send_message(). no reference is held - /// on this Connection to avoid cyclical refs. we don't need a reference - /// because its owning DirectMessenger will mark both connections down (and - /// clear this pointer) before dropping its own reference - std::atomic reply_connection{nullptr}; - - public: - DirectConnection(CephContext *cct, DirectMessenger *m, - DispatchStrategy *dispatchers) - : Connection(cct, m), - dispatchers(dispatchers) - {} - - /// sets the Connection that will receive replies to outgoing messages - void set_direct_reply_connection(ConnectionRef conn); - - /// return true if a peer connection exists - bool is_connected() override; - - /// pass the given message directly to our dispatchers - int send_message(Message *m) override; - - /// release our pointer to the peer connection. later calls to is_connected() - /// will return false, and send_message() will fail with -ENOTCONN - void mark_down() override; - - /// noop - keepalive messages are not needed within a process - void send_keepalive() override {} - - /// noop - reconnect/recovery semantics are not needed within a process - void mark_disposable() override {} -}; - -void DirectConnection::set_direct_reply_connection(ConnectionRef conn) -{ - reply_connection.store(conn.get()); -} - -bool DirectConnection::is_connected() -{ - // true between calls to set_direct_reply_connection() and mark_down() - return reply_connection.load() != nullptr; -} - -int DirectConnection::send_message(Message *m) -{ - // read reply_connection atomically and take a reference - ConnectionRef conn = reply_connection.load(); - if (!conn) { - m->put(); - return -ENOTCONN; - } - // attach reply_connection to the Message, so that calls to - // m->get_connection()->send_message() can be dispatched back to the sender - m->set_connection(conn); - - dispatchers->ds_dispatch(m); - return 0; -} - -void DirectConnection::mark_down() -{ - Connection *conn = reply_connection.load(); - if (!conn) { - return; // already marked down - } - if (!reply_connection.compare_exchange_weak(conn, nullptr)) { - return; // lost the race to mark down - } - // called only once to avoid loops - conn->mark_down(); -} - - -static ConnectionRef create_loopback(DirectMessenger *m, - entity_name_t name, - DispatchStrategy *dispatchers) -{ - auto loopback = boost::intrusive_ptr( - new DirectConnection(m->cct, m, dispatchers)); - // loopback replies go to itself - loopback->set_direct_reply_connection(loopback); - loopback->set_peer_type(name.type()); - loopback->set_features(CEPH_FEATURES_ALL); - return loopback; -} - -DirectMessenger::DirectMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t nonce, - DispatchStrategy *dispatchers) - : SimplePolicyMessenger(cct, name, mname, nonce), - dispatchers(dispatchers), - loopback_connection(create_loopback(this, name, dispatchers)) -{ - dispatchers->set_messenger(this); -} - -DirectMessenger::~DirectMessenger() -{ -} - -int DirectMessenger::set_direct_peer(DirectMessenger *peer) -{ - if (get_myinst() == peer->get_myinst()) { - return -EADDRINUSE; // must have a different entity instance - } - peer_inst = peer->get_myinst(); - - // allocate a Connection that dispatches to the peer messenger - auto direct_connection = boost::intrusive_ptr( - new DirectConnection(cct, peer, peer->dispatchers.get())); - - direct_connection->set_peer_addr(peer_inst.addr); - direct_connection->set_peer_type(peer_inst.name.type()); - direct_connection->set_features(CEPH_FEATURES_ALL); - - // if set_direct_peer() was already called on the peer messenger, we can - // finish by attaching their connections. if not, the later call to - // peer->set_direct_peer() will attach their connection to ours - auto connection = peer->get_connection(get_myinst()); - if (connection) { - auto p = static_cast(connection.get()); - - p->set_direct_reply_connection(direct_connection); - direct_connection->set_direct_reply_connection(p); - } - - peer_connection = std::move(direct_connection); - return 0; -} - -int DirectMessenger::bind(const entity_addr_t &bind_addr) -{ - if (peer_connection) { - return -EINVAL; // can't change address after sharing it with the peer - } - set_myaddr(bind_addr); - loopback_connection->set_peer_addr(bind_addr); - return 0; -} - -int DirectMessenger::client_bind(const entity_addr_t &bind_addr) -{ - // same as bind - return bind(bind_addr); -} - -int DirectMessenger::start() -{ - if (!peer_connection) { - return -EINVAL; // did not connect to a peer - } - if (started) { - return -EINVAL; // already started - } - - dispatchers->start(); - return SimplePolicyMessenger::start(); -} - -int DirectMessenger::shutdown() -{ - if (!started) { - return -EINVAL; // not started - } - - mark_down_all(); - peer_connection.reset(); - loopback_connection.reset(); - - dispatchers->shutdown(); - SimplePolicyMessenger::shutdown(); - sem.Put(); // signal wait() - return 0; -} - -void DirectMessenger::wait() -{ - sem.Get(); // wait on signal from shutdown() - dispatchers->wait(); -} - -ConnectionRef DirectMessenger::get_connection(const entity_inst_t& dst) -{ - if (dst == peer_inst) { - return peer_connection; - } - if (dst == get_myinst()) { - return loopback_connection; - } - return nullptr; -} - -ConnectionRef DirectMessenger::get_loopback_connection() -{ - return loopback_connection; -} - -int DirectMessenger::send_message(Message *m, const entity_inst_t& dst) -{ - auto conn = get_connection(dst); - if (!conn) { - m->put(); - return -ENOTCONN; - } - return conn->send_message(m); -} - -void DirectMessenger::mark_down(const entity_addr_t& addr) -{ - ConnectionRef conn; - if (addr == peer_inst.addr) { - conn = peer_connection; - } else if (addr == get_myaddr()) { - conn = loopback_connection; - } - if (conn) { - conn->mark_down(); - } -} - -void DirectMessenger::mark_down_all() -{ - if (peer_connection) { - peer_connection->mark_down(); - } - loopback_connection->mark_down(); -}