X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fsimple%2FSimpleMessenger.h;fp=src%2Fceph%2Fsrc%2Fmsg%2Fsimple%2FSimpleMessenger.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=0a0512382eb3c337e2b8fb473fed3a6bde712466;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/simple/SimpleMessenger.h b/src/ceph/src/msg/simple/SimpleMessenger.h deleted file mode 100644 index 0a05123..0000000 --- a/src/ceph/src/msg/simple/SimpleMessenger.h +++ /dev/null @@ -1,413 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_SIMPLEMESSENGER_H -#define CEPH_SIMPLEMESSENGER_H - -#include "include/types.h" -#include "include/xlist.h" - -#include -#include -using namespace std; -#include "include/unordered_map.h" -#include "include/unordered_set.h" - -#include "common/Mutex.h" -#include "include/Spinlock.h" -#include "common/Cond.h" -#include "common/Thread.h" -#include "common/Throttle.h" - -#include "msg/SimplePolicyMessenger.h" -#include "msg/Message.h" -#include "include/assert.h" - -#include "msg/DispatchQueue.h" -#include "Pipe.h" -#include "Accepter.h" - -/* - * This class handles transmission and reception of messages. Generally - * speaking, there are several major components: - * - * - Connection - * Each logical session is associated with a Connection. - * - Pipe - * Each network connection is handled through a pipe, which handles - * the input and output of each message. There is normally a 1:1 - * relationship between Pipe and Connection, but logical sessions may - * get handed off between Pipes when sockets reconnect or during - * connection races. - * - IncomingQueue - * Incoming messages are associated with an IncomingQueue, and there - * is one such queue associated with each Pipe. - * - DispatchQueue - * IncomingQueues get queued in the DIspatchQueue, which is responsible - * for doing a round-robin sweep and processing them via a worker thread. - * - SimpleMessenger - * It's the exterior class passed to the external message handler and - * most of the API details. - * - * Lock ordering: - * - * SimpleMessenger::lock - * Pipe::pipe_lock - * DispatchQueue::lock - * IncomingQueue::lock - */ - -class SimpleMessenger : public SimplePolicyMessenger { - // First we have the public Messenger interface implementation... -public: - /** - * Initialize the SimpleMessenger! - * - * @param cct The CephContext to use - * @param name The name to assign ourselves - * _nonce A unique ID to use for this SimpleMessenger. It should not - * be a value that will be repeated if the daemon restarts. - * features The local features bits for the local_connection - */ - SimpleMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t _nonce); - - /** - * Destroy the SimpleMessenger. Pretty simple since all the work is done - * elsewhere. - */ - ~SimpleMessenger() override; - - /** @defgroup Accessors - * @{ - */ - void set_addr_unknowns(const entity_addr_t& addr) override; - void set_addr(const entity_addr_t &addr) override; - - int get_dispatch_queue_len() override { - return dispatch_queue.get_queue_len(); - } - - double get_dispatch_queue_max_age(utime_t now) override { - return dispatch_queue.get_max_age(now); - } - /** @} Accessors */ - - /** - * @defgroup Configuration functions - * @{ - */ - void set_cluster_protocol(int p) override { - assert(!started && !did_bind); - cluster_protocol = p; - } - - int bind(const entity_addr_t& bind_addr) override; - int rebind(const set& avoid_ports) override; - int client_bind(const entity_addr_t& bind_addr) override; - - /** @} Configuration functions */ - - /** - * @defgroup Startup/Shutdown - * @{ - */ - int start() override; - void wait() override; - int shutdown() override; - - /** @} // Startup/Shutdown */ - - /** - * @defgroup Messaging - * @{ - */ - int send_message(Message *m, const entity_inst_t& dest) override { - return _send_message(m, dest); - } - - int send_message(Message *m, Connection *con) { - return _send_message(m, con); - } - - /** @} // Messaging */ - - /** - * @defgroup Connection Management - * @{ - */ - ConnectionRef get_connection(const entity_inst_t& dest) override; - ConnectionRef get_loopback_connection() override; - int send_keepalive(Connection *con); - void mark_down(const entity_addr_t& addr) override; - void mark_down(Connection *con); - void mark_disposable(Connection *con); - void mark_down_all() override; - /** @} // Connection Management */ -protected: - /** - * @defgroup Messenger Interfaces - * @{ - */ - /** - * Start up the DispatchQueue thread once we have somebody to dispatch to. - */ - void ready() override; - /** @} // Messenger Interfaces */ -private: - /** - * @defgroup Inner classes - * @{ - */ - -public: - Accepter accepter; - DispatchQueue dispatch_queue; - - friend class Accepter; - - /** - * Register a new pipe for accept - * - * @param sd socket - */ - Pipe *add_accept_pipe(int sd); - -private: - - /** - * A thread used to tear down Pipes when they're complete. - */ - class ReaperThread : public Thread { - SimpleMessenger *msgr; - public: - explicit ReaperThread(SimpleMessenger *m) : msgr(m) {} - void *entry() override { - msgr->reaper_entry(); - return 0; - } - } reaper_thread; - - /** - * @} // Inner classes - */ - - /** - * @defgroup Utility functions - * @{ - */ - - /** - * Create a Pipe associated with the given entity (of the given type). - * Initiate the connection. (This function returning does not guarantee - * connection success.) - * - * @param addr The address of the entity to connect to. - * @param type The peer type of the entity at the address. - * @param con An existing Connection to associate with the new Pipe. If - * NULL, it creates a new Connection. - * @param first an initial message to queue on the new pipe - * - * @return a pointer to the newly-created Pipe. Caller does not own a - * reference; take one if you need it. - */ - Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con, - Message *first); - /** - * Send a message, lazily or not. - * This just glues send_message together and passes - * the input on to submit_message. - */ - int _send_message(Message *m, const entity_inst_t& dest); - /** - * Same as above, but for the Connection-based variants. - */ - int _send_message(Message *m, Connection *con); - /** - * Queue up a Message for delivery to the entity specified - * by addr and dest_type. - * submit_message() is responsible for creating - * new Pipes (and closing old ones) as necessary. - * - * @param m The Message to queue up. This function eats a reference. - * @param con The existing Connection to use, or NULL if you don't know of one. - * @param addr The address to send the Message to. - * @param dest_type The peer type of the address we're sending to - * just drop silently under failure. - * @param already_locked If false, submit_message() will acquire the - * SimpleMessenger lock before accessing shared data structures; otherwise - * it will assume the lock is held. NOTE: if you are making a request - * without locking, you MUST have filled in the con with a valid pointer. - */ - void submit_message(Message *m, PipeConnection *con, - const entity_addr_t& addr, int dest_type, - bool already_locked); - /** - * Look through the pipes in the pipe_reap_queue and tear them down. - */ - void reaper(); - /** - * @} // Utility functions - */ - - // SimpleMessenger stuff - /// approximately unique ID set by the Constructor for use in entity_addr_t - uint64_t nonce; - /// overall lock used for SimpleMessenger data structures - Mutex lock; - /// true, specifying we haven't learned our addr; set false when we find it. - // maybe this should be protected by the lock? - bool need_addr; - -public: - bool get_need_addr() const { return need_addr; } - -private: - /** - * false; set to true if the SimpleMessenger bound to a specific address; - * and set false again by Accepter::stop(). This isn't lock-protected - * since you shouldn't be able to race the only writers. - */ - bool did_bind; - /// counter for the global seq our connection protocol uses - __u32 global_seq; - /// lock to protect the global_seq - ceph_spinlock_t global_seq_lock; - - /** - * hash map of addresses to Pipes - * - * NOTE: a Pipe* with state CLOSED may still be in the map but is considered - * invalid and can be replaced by anyone holding the msgr lock - */ - ceph::unordered_map rank_pipe; - /** - * list of pipes are in teh process of accepting - * - * These are not yet in the rank_pipe map. - */ - set accepting_pipes; - /// a set of all the Pipes we have which are somehow active - set pipes; - /// a list of Pipes we want to tear down - list pipe_reap_queue; - - /// internal cluster protocol version, if any, for talking to entities of the same type. - int cluster_protocol; - - Cond stop_cond; - bool stopped = true; - - bool reaper_started, reaper_stop; - Cond reaper_cond; - - /// This Cond is slept on by wait() and signaled by dispatch_entry() - Cond wait_cond; - - friend class Pipe; - - Pipe *_lookup_pipe(const entity_addr_t& k) { - ceph::unordered_map::iterator p = rank_pipe.find(k); - if (p == rank_pipe.end()) - return NULL; - // see lock cribbing in Pipe::fault() - if (p->second->state_closed) - return NULL; - return p->second; - } - -public: - - int timeout; - - /// con used for sending messages to ourselves - ConnectionRef local_connection; - - /** - * @defgroup SimpleMessenger internals - * @{ - */ - - /** - * This wraps ms_deliver_get_authorizer. We use it for Pipe. - */ - AuthAuthorizer *get_authorizer(int peer_type, bool force_new); - /** - * This wraps ms_deliver_verify_authorizer; we use it for Pipe. - */ - bool verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& auth, bufferlist& auth_reply, - bool& isvalid,CryptoKey& session_key); - /** - * Increment the global sequence for this SimpleMessenger and return it. - * This is for the connect protocol, although it doesn't hurt if somebody - * else calls it. - * - * @return a global sequence ID that nobody else has seen. - */ - __u32 get_global_seq(__u32 old=0) { - ceph_spin_lock(&global_seq_lock); - if (old > global_seq) - global_seq = old; - __u32 ret = ++global_seq; - ceph_spin_unlock(&global_seq_lock); - return ret; - } - /** - * Get the protocol version we support for the given peer type: either - * a peer protocol (if it matches our own), the protocol version for the - * peer (if we're connecting), or our protocol version (if we're accepting). - */ - int get_proto_version(int peer_type, bool connect); - - /** - * Fill in the features, address and peer type for the local connection, which - * is used for delivering messages back to ourself. - */ - void init_local_connection(); - /** - * Tell the SimpleMessenger its full IP address. - * - * This is used by Pipes when connecting to other endpoints, and - * probably shouldn't be called by anybody else. - */ - void learned_addr(const entity_addr_t& peer_addr_for_me); - - /** - * This function is used by the reaper thread. As long as nobody - * has set reaper_stop, it calls the reaper function, then - * waits to be signaled when it needs to reap again (or when it needs - * to stop). - */ - void reaper_entry(); - /** - * Add a pipe to the pipe_reap_queue, to be torn down on - * the next call to reaper(). - * It should really only be the Pipe calling this, in our current - * implementation. - * - * @param pipe A Pipe which has stopped its threads and is - * ready to be torn down. - */ - void queue_reap(Pipe *pipe); - - /** - * Used to get whether this connection ready to send - */ - bool is_connected(Connection *con); - /** - * @} // SimpleMessenger Internals - */ -} ; - -#endif /* CEPH_SIMPLEMESSENGER_H */