+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef CEPH_SIMPLEMESSENGER_H
-#define CEPH_SIMPLEMESSENGER_H
-
-#include "include/types.h"
-#include "include/xlist.h"
-
-#include <list>
-#include <map>
-using namespace std;
-#include "include/unordered_map.h"
-#include "include/unordered_set.h"
-
-#include "common/Mutex.h"
-#include "include/Spinlock.h"
-#include "common/Cond.h"
-#include "common/Thread.h"
-#include "common/Throttle.h"
-
-#include "msg/SimplePolicyMessenger.h"
-#include "msg/Message.h"
-#include "include/assert.h"
-
-#include "msg/DispatchQueue.h"
-#include "Pipe.h"
-#include "Accepter.h"
-
-/*
- * This class handles transmission and reception of messages. Generally
- * speaking, there are several major components:
- *
- * - Connection
- * Each logical session is associated with a Connection.
- * - Pipe
- * Each network connection is handled through a pipe, which handles
- * the input and output of each message. There is normally a 1:1
- * relationship between Pipe and Connection, but logical sessions may
- * get handed off between Pipes when sockets reconnect or during
- * connection races.
- * - IncomingQueue
- * Incoming messages are associated with an IncomingQueue, and there
- * is one such queue associated with each Pipe.
- * - DispatchQueue
- * IncomingQueues get queued in the DIspatchQueue, which is responsible
- * for doing a round-robin sweep and processing them via a worker thread.
- * - SimpleMessenger
- * It's the exterior class passed to the external message handler and
- * most of the API details.
- *
- * Lock ordering:
- *
- * SimpleMessenger::lock
- * Pipe::pipe_lock
- * DispatchQueue::lock
- * IncomingQueue::lock
- */
-
-class SimpleMessenger : public SimplePolicyMessenger {
- // First we have the public Messenger interface implementation...
-public:
- /**
- * Initialize the SimpleMessenger!
- *
- * @param cct The CephContext to use
- * @param name The name to assign ourselves
- * _nonce A unique ID to use for this SimpleMessenger. It should not
- * be a value that will be repeated if the daemon restarts.
- * features The local features bits for the local_connection
- */
- SimpleMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t _nonce);
-
- /**
- * Destroy the SimpleMessenger. Pretty simple since all the work is done
- * elsewhere.
- */
- ~SimpleMessenger() override;
-
- /** @defgroup Accessors
- * @{
- */
- void set_addr_unknowns(const entity_addr_t& addr) override;
- void set_addr(const entity_addr_t &addr) override;
-
- int get_dispatch_queue_len() override {
- return dispatch_queue.get_queue_len();
- }
-
- double get_dispatch_queue_max_age(utime_t now) override {
- return dispatch_queue.get_max_age(now);
- }
- /** @} Accessors */
-
- /**
- * @defgroup Configuration functions
- * @{
- */
- void set_cluster_protocol(int p) override {
- assert(!started && !did_bind);
- cluster_protocol = p;
- }
-
- int bind(const entity_addr_t& bind_addr) override;
- int rebind(const set<int>& avoid_ports) override;
- int client_bind(const entity_addr_t& bind_addr) override;
-
- /** @} Configuration functions */
-
- /**
- * @defgroup Startup/Shutdown
- * @{
- */
- int start() override;
- void wait() override;
- int shutdown() override;
-
- /** @} // Startup/Shutdown */
-
- /**
- * @defgroup Messaging
- * @{
- */
- int send_message(Message *m, const entity_inst_t& dest) override {
- return _send_message(m, dest);
- }
-
- int send_message(Message *m, Connection *con) {
- return _send_message(m, con);
- }
-
- /** @} // Messaging */
-
- /**
- * @defgroup Connection Management
- * @{
- */
- ConnectionRef get_connection(const entity_inst_t& dest) override;
- ConnectionRef get_loopback_connection() override;
- int send_keepalive(Connection *con);
- void mark_down(const entity_addr_t& addr) override;
- void mark_down(Connection *con);
- void mark_disposable(Connection *con);
- void mark_down_all() override;
- /** @} // Connection Management */
-protected:
- /**
- * @defgroup Messenger Interfaces
- * @{
- */
- /**
- * Start up the DispatchQueue thread once we have somebody to dispatch to.
- */
- void ready() override;
- /** @} // Messenger Interfaces */
-private:
- /**
- * @defgroup Inner classes
- * @{
- */
-
-public:
- Accepter accepter;
- DispatchQueue dispatch_queue;
-
- friend class Accepter;
-
- /**
- * Register a new pipe for accept
- *
- * @param sd socket
- */
- Pipe *add_accept_pipe(int sd);
-
-private:
-
- /**
- * A thread used to tear down Pipes when they're complete.
- */
- class ReaperThread : public Thread {
- SimpleMessenger *msgr;
- public:
- explicit ReaperThread(SimpleMessenger *m) : msgr(m) {}
- void *entry() override {
- msgr->reaper_entry();
- return 0;
- }
- } reaper_thread;
-
- /**
- * @} // Inner classes
- */
-
- /**
- * @defgroup Utility functions
- * @{
- */
-
- /**
- * Create a Pipe associated with the given entity (of the given type).
- * Initiate the connection. (This function returning does not guarantee
- * connection success.)
- *
- * @param addr The address of the entity to connect to.
- * @param type The peer type of the entity at the address.
- * @param con An existing Connection to associate with the new Pipe. If
- * NULL, it creates a new Connection.
- * @param first an initial message to queue on the new pipe
- *
- * @return a pointer to the newly-created Pipe. Caller does not own a
- * reference; take one if you need it.
- */
- Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con,
- Message *first);
- /**
- * Send a message, lazily or not.
- * This just glues send_message together and passes
- * the input on to submit_message.
- */
- int _send_message(Message *m, const entity_inst_t& dest);
- /**
- * Same as above, but for the Connection-based variants.
- */
- int _send_message(Message *m, Connection *con);
- /**
- * Queue up a Message for delivery to the entity specified
- * by addr and dest_type.
- * submit_message() is responsible for creating
- * new Pipes (and closing old ones) as necessary.
- *
- * @param m The Message to queue up. This function eats a reference.
- * @param con The existing Connection to use, or NULL if you don't know of one.
- * @param addr The address to send the Message to.
- * @param dest_type The peer type of the address we're sending to
- * just drop silently under failure.
- * @param already_locked If false, submit_message() will acquire the
- * SimpleMessenger lock before accessing shared data structures; otherwise
- * it will assume the lock is held. NOTE: if you are making a request
- * without locking, you MUST have filled in the con with a valid pointer.
- */
- void submit_message(Message *m, PipeConnection *con,
- const entity_addr_t& addr, int dest_type,
- bool already_locked);
- /**
- * Look through the pipes in the pipe_reap_queue and tear them down.
- */
- void reaper();
- /**
- * @} // Utility functions
- */
-
- // SimpleMessenger stuff
- /// approximately unique ID set by the Constructor for use in entity_addr_t
- uint64_t nonce;
- /// overall lock used for SimpleMessenger data structures
- Mutex lock;
- /// true, specifying we haven't learned our addr; set false when we find it.
- // maybe this should be protected by the lock?
- bool need_addr;
-
-public:
- bool get_need_addr() const { return need_addr; }
-
-private:
- /**
- * false; set to true if the SimpleMessenger bound to a specific address;
- * and set false again by Accepter::stop(). This isn't lock-protected
- * since you shouldn't be able to race the only writers.
- */
- bool did_bind;
- /// counter for the global seq our connection protocol uses
- __u32 global_seq;
- /// lock to protect the global_seq
- ceph_spinlock_t global_seq_lock;
-
- /**
- * hash map of addresses to Pipes
- *
- * NOTE: a Pipe* with state CLOSED may still be in the map but is considered
- * invalid and can be replaced by anyone holding the msgr lock
- */
- ceph::unordered_map<entity_addr_t, Pipe*> rank_pipe;
- /**
- * list of pipes are in teh process of accepting
- *
- * These are not yet in the rank_pipe map.
- */
- set<Pipe*> accepting_pipes;
- /// a set of all the Pipes we have which are somehow active
- set<Pipe*> pipes;
- /// a list of Pipes we want to tear down
- list<Pipe*> pipe_reap_queue;
-
- /// internal cluster protocol version, if any, for talking to entities of the same type.
- int cluster_protocol;
-
- Cond stop_cond;
- bool stopped = true;
-
- bool reaper_started, reaper_stop;
- Cond reaper_cond;
-
- /// This Cond is slept on by wait() and signaled by dispatch_entry()
- Cond wait_cond;
-
- friend class Pipe;
-
- Pipe *_lookup_pipe(const entity_addr_t& k) {
- ceph::unordered_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(k);
- if (p == rank_pipe.end())
- return NULL;
- // see lock cribbing in Pipe::fault()
- if (p->second->state_closed)
- return NULL;
- return p->second;
- }
-
-public:
-
- int timeout;
-
- /// con used for sending messages to ourselves
- ConnectionRef local_connection;
-
- /**
- * @defgroup SimpleMessenger internals
- * @{
- */
-
- /**
- * This wraps ms_deliver_get_authorizer. We use it for Pipe.
- */
- AuthAuthorizer *get_authorizer(int peer_type, bool force_new);
- /**
- * This wraps ms_deliver_verify_authorizer; we use it for Pipe.
- */
- bool verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& auth, bufferlist& auth_reply,
- bool& isvalid,CryptoKey& session_key);
- /**
- * Increment the global sequence for this SimpleMessenger and return it.
- * This is for the connect protocol, although it doesn't hurt if somebody
- * else calls it.
- *
- * @return a global sequence ID that nobody else has seen.
- */
- __u32 get_global_seq(__u32 old=0) {
- ceph_spin_lock(&global_seq_lock);
- if (old > global_seq)
- global_seq = old;
- __u32 ret = ++global_seq;
- ceph_spin_unlock(&global_seq_lock);
- return ret;
- }
- /**
- * Get the protocol version we support for the given peer type: either
- * a peer protocol (if it matches our own), the protocol version for the
- * peer (if we're connecting), or our protocol version (if we're accepting).
- */
- int get_proto_version(int peer_type, bool connect);
-
- /**
- * Fill in the features, address and peer type for the local connection, which
- * is used for delivering messages back to ourself.
- */
- void init_local_connection();
- /**
- * Tell the SimpleMessenger its full IP address.
- *
- * This is used by Pipes when connecting to other endpoints, and
- * probably shouldn't be called by anybody else.
- */
- void learned_addr(const entity_addr_t& peer_addr_for_me);
-
- /**
- * This function is used by the reaper thread. As long as nobody
- * has set reaper_stop, it calls the reaper function, then
- * waits to be signaled when it needs to reap again (or when it needs
- * to stop).
- */
- void reaper_entry();
- /**
- * Add a pipe to the pipe_reap_queue, to be torn down on
- * the next call to reaper().
- * It should really only be the Pipe calling this, in our current
- * implementation.
- *
- * @param pipe A Pipe which has stopped its threads and is
- * ready to be torn down.
- */
- void queue_reap(Pipe *pipe);
-
- /**
- * Used to get whether this connection ready to send
- */
- bool is_connected(Connection *con);
- /**
- * @} // SimpleMessenger Internals
- */
-} ;
-
-#endif /* CEPH_SIMPLEMESSENGER_H */