// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_SIMPLEMESSENGER_H #define CEPH_SIMPLEMESSENGER_H #include "include/types.h" #include "include/xlist.h" #include #include using namespace std; #include "include/unordered_map.h" #include "include/unordered_set.h" #include "common/Mutex.h" #include "include/Spinlock.h" #include "common/Cond.h" #include "common/Thread.h" #include "common/Throttle.h" #include "msg/SimplePolicyMessenger.h" #include "msg/Message.h" #include "include/assert.h" #include "msg/DispatchQueue.h" #include "Pipe.h" #include "Accepter.h" /* * This class handles transmission and reception of messages. Generally * speaking, there are several major components: * * - Connection * Each logical session is associated with a Connection. * - Pipe * Each network connection is handled through a pipe, which handles * the input and output of each message. There is normally a 1:1 * relationship between Pipe and Connection, but logical sessions may * get handed off between Pipes when sockets reconnect or during * connection races. * - IncomingQueue * Incoming messages are associated with an IncomingQueue, and there * is one such queue associated with each Pipe. * - DispatchQueue * IncomingQueues get queued in the DIspatchQueue, which is responsible * for doing a round-robin sweep and processing them via a worker thread. * - SimpleMessenger * It's the exterior class passed to the external message handler and * most of the API details. * * Lock ordering: * * SimpleMessenger::lock * Pipe::pipe_lock * DispatchQueue::lock * IncomingQueue::lock */ class SimpleMessenger : public SimplePolicyMessenger { // First we have the public Messenger interface implementation... public: /** * Initialize the SimpleMessenger! * * @param cct The CephContext to use * @param name The name to assign ourselves * _nonce A unique ID to use for this SimpleMessenger. It should not * be a value that will be repeated if the daemon restarts. * features The local features bits for the local_connection */ SimpleMessenger(CephContext *cct, entity_name_t name, string mname, uint64_t _nonce); /** * Destroy the SimpleMessenger. Pretty simple since all the work is done * elsewhere. */ ~SimpleMessenger() override; /** @defgroup Accessors * @{ */ void set_addr_unknowns(const entity_addr_t& addr) override; void set_addr(const entity_addr_t &addr) override; int get_dispatch_queue_len() override { return dispatch_queue.get_queue_len(); } double get_dispatch_queue_max_age(utime_t now) override { return dispatch_queue.get_max_age(now); } /** @} Accessors */ /** * @defgroup Configuration functions * @{ */ void set_cluster_protocol(int p) override { assert(!started && !did_bind); cluster_protocol = p; } int bind(const entity_addr_t& bind_addr) override; int rebind(const set& avoid_ports) override; int client_bind(const entity_addr_t& bind_addr) override; /** @} Configuration functions */ /** * @defgroup Startup/Shutdown * @{ */ int start() override; void wait() override; int shutdown() override; /** @} // Startup/Shutdown */ /** * @defgroup Messaging * @{ */ int send_message(Message *m, const entity_inst_t& dest) override { return _send_message(m, dest); } int send_message(Message *m, Connection *con) { return _send_message(m, con); } /** @} // Messaging */ /** * @defgroup Connection Management * @{ */ ConnectionRef get_connection(const entity_inst_t& dest) override; ConnectionRef get_loopback_connection() override; int send_keepalive(Connection *con); void mark_down(const entity_addr_t& addr) override; void mark_down(Connection *con); void mark_disposable(Connection *con); void mark_down_all() override; /** @} // Connection Management */ protected: /** * @defgroup Messenger Interfaces * @{ */ /** * Start up the DispatchQueue thread once we have somebody to dispatch to. */ void ready() override; /** @} // Messenger Interfaces */ private: /** * @defgroup Inner classes * @{ */ public: Accepter accepter; DispatchQueue dispatch_queue; friend class Accepter; /** * Register a new pipe for accept * * @param sd socket */ Pipe *add_accept_pipe(int sd); private: /** * A thread used to tear down Pipes when they're complete. */ class ReaperThread : public Thread { SimpleMessenger *msgr; public: explicit ReaperThread(SimpleMessenger *m) : msgr(m) {} void *entry() override { msgr->reaper_entry(); return 0; } } reaper_thread; /** * @} // Inner classes */ /** * @defgroup Utility functions * @{ */ /** * Create a Pipe associated with the given entity (of the given type). * Initiate the connection. (This function returning does not guarantee * connection success.) * * @param addr The address of the entity to connect to. * @param type The peer type of the entity at the address. * @param con An existing Connection to associate with the new Pipe. If * NULL, it creates a new Connection. * @param first an initial message to queue on the new pipe * * @return a pointer to the newly-created Pipe. Caller does not own a * reference; take one if you need it. */ Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con, Message *first); /** * Send a message, lazily or not. * This just glues send_message together and passes * the input on to submit_message. */ int _send_message(Message *m, const entity_inst_t& dest); /** * Same as above, but for the Connection-based variants. */ int _send_message(Message *m, Connection *con); /** * Queue up a Message for delivery to the entity specified * by addr and dest_type. * submit_message() is responsible for creating * new Pipes (and closing old ones) as necessary. * * @param m The Message to queue up. This function eats a reference. * @param con The existing Connection to use, or NULL if you don't know of one. * @param addr The address to send the Message to. * @param dest_type The peer type of the address we're sending to * just drop silently under failure. * @param already_locked If false, submit_message() will acquire the * SimpleMessenger lock before accessing shared data structures; otherwise * it will assume the lock is held. NOTE: if you are making a request * without locking, you MUST have filled in the con with a valid pointer. */ void submit_message(Message *m, PipeConnection *con, const entity_addr_t& addr, int dest_type, bool already_locked); /** * Look through the pipes in the pipe_reap_queue and tear them down. */ void reaper(); /** * @} // Utility functions */ // SimpleMessenger stuff /// approximately unique ID set by the Constructor for use in entity_addr_t uint64_t nonce; /// overall lock used for SimpleMessenger data structures Mutex lock; /// true, specifying we haven't learned our addr; set false when we find it. // maybe this should be protected by the lock? bool need_addr; public: bool get_need_addr() const { return need_addr; } private: /** * false; set to true if the SimpleMessenger bound to a specific address; * and set false again by Accepter::stop(). This isn't lock-protected * since you shouldn't be able to race the only writers. */ bool did_bind; /// counter for the global seq our connection protocol uses __u32 global_seq; /// lock to protect the global_seq ceph_spinlock_t global_seq_lock; /** * hash map of addresses to Pipes * * NOTE: a Pipe* with state CLOSED may still be in the map but is considered * invalid and can be replaced by anyone holding the msgr lock */ ceph::unordered_map rank_pipe; /** * list of pipes are in teh process of accepting * * These are not yet in the rank_pipe map. */ set accepting_pipes; /// a set of all the Pipes we have which are somehow active set pipes; /// a list of Pipes we want to tear down list pipe_reap_queue; /// internal cluster protocol version, if any, for talking to entities of the same type. int cluster_protocol; Cond stop_cond; bool stopped = true; bool reaper_started, reaper_stop; Cond reaper_cond; /// This Cond is slept on by wait() and signaled by dispatch_entry() Cond wait_cond; friend class Pipe; Pipe *_lookup_pipe(const entity_addr_t& k) { ceph::unordered_map::iterator p = rank_pipe.find(k); if (p == rank_pipe.end()) return NULL; // see lock cribbing in Pipe::fault() if (p->second->state_closed) return NULL; return p->second; } public: int timeout; /// con used for sending messages to ourselves ConnectionRef local_connection; /** * @defgroup SimpleMessenger internals * @{ */ /** * This wraps ms_deliver_get_authorizer. We use it for Pipe. */ AuthAuthorizer *get_authorizer(int peer_type, bool force_new); /** * This wraps ms_deliver_verify_authorizer; we use it for Pipe. */ bool verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& auth, bufferlist& auth_reply, bool& isvalid,CryptoKey& session_key); /** * Increment the global sequence for this SimpleMessenger and return it. * This is for the connect protocol, although it doesn't hurt if somebody * else calls it. * * @return a global sequence ID that nobody else has seen. */ __u32 get_global_seq(__u32 old=0) { ceph_spin_lock(&global_seq_lock); if (old > global_seq) global_seq = old; __u32 ret = ++global_seq; ceph_spin_unlock(&global_seq_lock); return ret; } /** * Get the protocol version we support for the given peer type: either * a peer protocol (if it matches our own), the protocol version for the * peer (if we're connecting), or our protocol version (if we're accepting). */ int get_proto_version(int peer_type, bool connect); /** * Fill in the features, address and peer type for the local connection, which * is used for delivering messages back to ourself. */ void init_local_connection(); /** * Tell the SimpleMessenger its full IP address. * * This is used by Pipes when connecting to other endpoints, and * probably shouldn't be called by anybody else. */ void learned_addr(const entity_addr_t& peer_addr_for_me); /** * This function is used by the reaper thread. As long as nobody * has set reaper_stop, it calls the reaper function, then * waits to be signaled when it needs to reap again (or when it needs * to stop). */ void reaper_entry(); /** * Add a pipe to the pipe_reap_queue, to be torn down on * the next call to reaper(). * It should really only be the Pipe calling this, in our current * implementation. * * @param pipe A Pipe which has stopped its threads and is * ready to be torn down. */ void queue_reap(Pipe *pipe); /** * Used to get whether this connection ready to send */ bool is_connected(Connection *con); /** * @} // SimpleMessenger Internals */ } ; #endif /* CEPH_SIMPLEMESSENGER_H */