X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FAsyncMessenger.h;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FAsyncMessenger.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=7ebc7777c93e611f1ca49efa78f5edc4654a0b61;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/AsyncMessenger.h b/src/ceph/src/msg/async/AsyncMessenger.h deleted file mode 100644 index 7ebc777..0000000 --- a/src/ceph/src/msg/async/AsyncMessenger.h +++ /dev/null @@ -1,450 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2014 UnitedStack - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_ASYNCMESSENGER_H -#define CEPH_ASYNCMESSENGER_H - -#include "include/types.h" -#include "include/xlist.h" - -#include -using namespace std; -#include "include/unordered_map.h" -#include "include/unordered_set.h" - -#include "common/Mutex.h" -#include "common/Cond.h" -#include "common/Thread.h" - -#include "include/Spinlock.h" - -#include "msg/SimplePolicyMessenger.h" -#include "msg/DispatchQueue.h" -#include "include/assert.h" -#include "AsyncConnection.h" -#include "Event.h" - - -class AsyncMessenger; - -/** - * If the Messenger binds to a specific address, the Processor runs - * and listens for incoming connections. - */ -class Processor { - AsyncMessenger *msgr; - NetHandler net; - Worker *worker; - ServerSocket listen_socket; - EventCallbackRef listen_handler; - - class C_processor_accept; - - public: - Processor(AsyncMessenger *r, Worker *w, CephContext *c); - ~Processor() { delete listen_handler; }; - - void stop(); - int bind(const entity_addr_t &bind_addr, - const set& avoid_ports, - entity_addr_t* bound_addr); - void start(); - void accept(); -}; - -/* - * AsyncMessenger is represented for maintaining a set of asynchronous connections, - * it may own a bind address and the accepted connections will be managed by - * AsyncMessenger. - * - */ - -class AsyncMessenger : public SimplePolicyMessenger { - // First we have the public Messenger interface implementation... -public: - /** - * Initialize the AsyncMessenger! - * - * @param cct The CephContext to use - * @param name The name to assign ourselves - * _nonce A unique ID to use for this AsyncMessenger. It should not - * be a value that will be repeated if the daemon restarts. - */ - AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type, - string mname, uint64_t _nonce); - - /** - * Destroy the AsyncMessenger. Pretty simple since all the work is done - * elsewhere. - */ - ~AsyncMessenger() override; - - /** @defgroup Accessors - * @{ - */ - void set_addr_unknowns(const entity_addr_t &addr) override; - void set_addr(const entity_addr_t &addr) override; - - int get_dispatch_queue_len() override { - return dispatch_queue.get_queue_len(); - } - - double get_dispatch_queue_max_age(utime_t now) override { - return dispatch_queue.get_max_age(now); - } - /** @} Accessors */ - - /** - * @defgroup Configuration functions - * @{ - */ - void set_cluster_protocol(int p) override { - assert(!started && !did_bind); - cluster_protocol = p; - } - - int bind(const entity_addr_t& bind_addr) override; - int rebind(const set& avoid_ports) override; - int client_bind(const entity_addr_t& bind_addr) override; - - /** @} Configuration functions */ - - /** - * @defgroup Startup/Shutdown - * @{ - */ - int start() override; - void wait() override; - int shutdown() override; - - /** @} // Startup/Shutdown */ - - /** - * @defgroup Messaging - * @{ - */ - int send_message(Message *m, const entity_inst_t& dest) override { - Mutex::Locker l(lock); - - return _send_message(m, dest); - } - - /** @} // Messaging */ - - /** - * @defgroup Connection Management - * @{ - */ - ConnectionRef get_connection(const entity_inst_t& dest) override; - ConnectionRef get_loopback_connection() override; - void mark_down(const entity_addr_t& addr) override; - void mark_down_all() override { - shutdown_connections(true); - } - /** @} // Connection Management */ - - /** - * @defgroup Inner classes - * @{ - */ - - /** - * @} // Inner classes - */ - -protected: - /** - * @defgroup Messenger Interfaces - * @{ - */ - /** - * Start up the DispatchQueue thread once we have somebody to dispatch to. - */ - void ready() override; - /** @} // Messenger Interfaces */ - -private: - - /** - * @defgroup Utility functions - * @{ - */ - - /** - * Create a connection associated with the given entity (of the given type). - * Initiate the connection. (This function returning does not guarantee - * connection success.) - * - * @param addr The address of the entity to connect to. - * @param type The peer type of the entity at the address. - * - * @return a pointer to the newly-created connection. Caller does not own a - * reference; take one if you need it. - */ - AsyncConnectionRef create_connect(const entity_addr_t& addr, int type); - - /** - * Queue up a Message for delivery to the entity specified - * by addr and dest_type. - * submit_message() is responsible for creating - * new AsyncConnection (and closing old ones) as necessary. - * - * @param m The Message to queue up. This function eats a reference. - * @param con The existing Connection to use, or NULL if you don't know of one. - * @param dest_addr The address to send the Message to. - * @param dest_type The peer type of the address we're sending to - * just drop silently under failure. - */ - void submit_message(Message *m, AsyncConnectionRef con, - const entity_addr_t& dest_addr, int dest_type); - - int _send_message(Message *m, const entity_inst_t& dest); - void _finish_bind(const entity_addr_t& bind_addr, - const entity_addr_t& listen_addr); - - private: - static const uint64_t ReapDeadConnectionThreshold = 5; - - NetworkStack *stack; - std::vector processors; - friend class Processor; - DispatchQueue dispatch_queue; - - // the worker run messenger's cron jobs - Worker *local_worker; - - std::string ms_type; - - /// overall lock used for AsyncMessenger data structures - Mutex lock; - // AsyncMessenger stuff - /// approximately unique ID set by the Constructor for use in entity_addr_t - uint64_t nonce; - - /// true, specifying we haven't learned our addr; set false when we find it. - // maybe this should be protected by the lock? - bool need_addr; - - /** - * set to bind address if bind was called before NetworkStack was ready to - * bind - */ - entity_addr_t pending_bind_addr; - - /** - * false; set to true if a pending bind exists - */ - bool pending_bind = false; - - /** - * The following aren't lock-protected since you shouldn't be able to race - * the only writers. - */ - - /** - * false; set to true if the AsyncMessenger bound to a specific address; - * and set false again by Accepter::stop(). - */ - bool did_bind; - /// counter for the global seq our connection protocol uses - __u32 global_seq; - /// lock to protect the global_seq - ceph_spinlock_t global_seq_lock; - - /** - * hash map of addresses to Asyncconnection - * - * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered - * invalid and can be replaced by anyone holding the msgr lock - */ - ceph::unordered_map conns; - - /** - * list of connection are in teh process of accepting - * - * These are not yet in the conns map. - */ - set accepting_conns; - - /** - * list of connection are closed which need to be clean up - * - * Because AsyncMessenger and AsyncConnection follow a lock rule that - * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock - * but can't reversed. This rule is aimed to avoid dead lock. - * So if AsyncConnection want to unregister itself from AsyncMessenger, - * we pick up this idea that just queue itself to this set and do lazy - * deleted for AsyncConnection. "_lookup_conn" must ensure not return a - * AsyncConnection in this set. - */ - Mutex deleted_lock; - set deleted_conns; - - EventCallbackRef reap_handler; - - /// internal cluster protocol version, if any, for talking to entities of the same type. - int cluster_protocol; - - Cond stop_cond; - bool stopped; - - AsyncConnectionRef _lookup_conn(const entity_addr_t& k) { - assert(lock.is_locked()); - ceph::unordered_map::iterator p = conns.find(k); - if (p == conns.end()) - return NULL; - - // lazy delete, see "deleted_conns" - Mutex::Locker l(deleted_lock); - if (deleted_conns.erase(p->second)) { - p->second->get_perf_counter()->dec(l_msgr_active_connections); - conns.erase(p); - return NULL; - } - - return p->second; - } - - void _init_local_connection() { - assert(lock.is_locked()); - local_connection->peer_addr = my_inst.addr; - local_connection->peer_type = my_inst.name.type(); - local_connection->set_features(CEPH_FEATURES_ALL); - ms_deliver_handle_fast_connect(local_connection.get()); - } - - void shutdown_connections(bool queue_reset); - -public: - - /// con used for sending messages to ourselves - ConnectionRef local_connection; - - /** - * @defgroup AsyncMessenger internals - * @{ - */ - /** - * This wraps _lookup_conn. - */ - AsyncConnectionRef lookup_conn(const entity_addr_t& k) { - Mutex::Locker l(lock); - return _lookup_conn(k); - } - - int accept_conn(AsyncConnectionRef conn) { - Mutex::Locker l(lock); - auto it = conns.find(conn->peer_addr); - if (it != conns.end()) { - AsyncConnectionRef existing = it->second; - - // lazy delete, see "deleted_conns" - // If conn already in, we will return 0 - Mutex::Locker l(deleted_lock); - if (deleted_conns.erase(existing)) { - existing->get_perf_counter()->dec(l_msgr_active_connections); - conns.erase(it); - } else if (conn != existing) { - return -1; - } - } - conns[conn->peer_addr] = conn; - conn->get_perf_counter()->inc(l_msgr_active_connections); - accepting_conns.erase(conn); - return 0; - } - - void learned_addr(const entity_addr_t &peer_addr_for_me); - void add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr); - NetworkStack *get_stack() { - return stack; - } - - /** - * This wraps ms_deliver_get_authorizer. We use it for AsyncConnection. - */ - AuthAuthorizer *get_authorizer(int peer_type, bool force_new) { - return ms_deliver_get_authorizer(peer_type, force_new); - } - - /** - * This wraps ms_deliver_verify_authorizer; we use it for AsyncConnection. - */ - bool verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& auth, bufferlist& auth_reply, - bool& isvalid, CryptoKey& session_key) { - return ms_deliver_verify_authorizer(con, peer_type, protocol, auth, - auth_reply, isvalid, session_key); - } - /** - * Increment the global sequence for this AsyncMessenger and return it. - * This is for the connect protocol, although it doesn't hurt if somebody - * else calls it. - * - * @return a global sequence ID that nobody else has seen. - */ - __u32 get_global_seq(__u32 old=0) { - ceph_spin_lock(&global_seq_lock); - if (old > global_seq) - global_seq = old; - __u32 ret = ++global_seq; - ceph_spin_unlock(&global_seq_lock); - return ret; - } - /** - * Get the protocol version we support for the given peer type: either - * a peer protocol (if it matches our own), the protocol version for the - * peer (if we're connecting), or our protocol version (if we're accepting). - */ - int get_proto_version(int peer_type, bool connect) const; - - /** - * Fill in the address and peer type for the local connection, which - * is used for delivering messages back to ourself. - */ - void init_local_connection() { - Mutex::Locker l(lock); - _init_local_connection(); - } - - /** - * Unregister connection from `conns` - * - * See "deleted_conns" - */ - void unregister_conn(AsyncConnectionRef conn) { - Mutex::Locker l(deleted_lock); - deleted_conns.insert(conn); - - if (deleted_conns.size() >= ReapDeadConnectionThreshold) { - local_worker->center.dispatch_event_external(reap_handler); - } - } - - /** - * Reap dead connection from `deleted_conns` - * - * @return the number of dead connections - * - * See "deleted_conns" - */ - int reap_dead(); - - /** - * @} // AsyncMessenger Internals - */ -} ; - -#endif /* CEPH_ASYNCMESSENGER_H */