Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / simple / SimpleMessenger.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #ifndef CEPH_SIMPLEMESSENGER_H
16 #define CEPH_SIMPLEMESSENGER_H
17
18 #include "include/types.h"
19 #include "include/xlist.h"
20
21 #include <list>
22 #include <map>
23 using namespace std;
24 #include "include/unordered_map.h"
25 #include "include/unordered_set.h"
26
27 #include "common/Mutex.h"
28 #include "include/Spinlock.h"
29 #include "common/Cond.h"
30 #include "common/Thread.h"
31 #include "common/Throttle.h"
32
33 #include "msg/SimplePolicyMessenger.h"
34 #include "msg/Message.h"
35 #include "include/assert.h"
36
37 #include "msg/DispatchQueue.h"
38 #include "Pipe.h"
39 #include "Accepter.h"
40
41 /*
42  * This class handles transmission and reception of messages. Generally
43  * speaking, there are several major components:
44  *
45  * - Connection
46  *    Each logical session is associated with a Connection.
47  * - Pipe
48  *    Each network connection is handled through a pipe, which handles
49  *    the input and output of each message.  There is normally a 1:1
50  *    relationship between Pipe and Connection, but logical sessions may
51  *    get handed off between Pipes when sockets reconnect or during
52  *    connection races.
53  * - IncomingQueue
54  *    Incoming messages are associated with an IncomingQueue, and there
55  *    is one such queue associated with each Pipe.
56  * - DispatchQueue
57  *    IncomingQueues get queued in the DIspatchQueue, which is responsible
58  *    for doing a round-robin sweep and processing them via a worker thread.
59  * - SimpleMessenger
60  *    It's the exterior class passed to the external message handler and
61  *    most of the API details.
62  *
63  * Lock ordering:
64  *
65  *   SimpleMessenger::lock
66  *       Pipe::pipe_lock
67  *           DispatchQueue::lock
68  *               IncomingQueue::lock
69  */
70
71 class SimpleMessenger : public SimplePolicyMessenger {
72   // First we have the public Messenger interface implementation...
73 public:
74   /**
75    * Initialize the SimpleMessenger!
76    *
77    * @param cct The CephContext to use
78    * @param name The name to assign ourselves
79    * _nonce A unique ID to use for this SimpleMessenger. It should not
80    * be a value that will be repeated if the daemon restarts.
81    * features The local features bits for the local_connection
82    */
83   SimpleMessenger(CephContext *cct, entity_name_t name,
84                   string mname, uint64_t _nonce);
85
86   /**
87    * Destroy the SimpleMessenger. Pretty simple since all the work is done
88    * elsewhere.
89    */
90   ~SimpleMessenger() override;
91
92   /** @defgroup Accessors
93    * @{
94    */
95   void set_addr_unknowns(const entity_addr_t& addr) override;
96   void set_addr(const entity_addr_t &addr) override;
97
98   int get_dispatch_queue_len() override {
99     return dispatch_queue.get_queue_len();
100   }
101
102   double get_dispatch_queue_max_age(utime_t now) override {
103     return dispatch_queue.get_max_age(now);
104   }
105   /** @} Accessors */
106
107   /**
108    * @defgroup Configuration functions
109    * @{
110    */
111   void set_cluster_protocol(int p) override {
112     assert(!started && !did_bind);
113     cluster_protocol = p;
114   }
115
116   int bind(const entity_addr_t& bind_addr) override;
117   int rebind(const set<int>& avoid_ports) override;
118   int client_bind(const entity_addr_t& bind_addr) override;
119
120   /** @} Configuration functions */
121
122   /**
123    * @defgroup Startup/Shutdown
124    * @{
125    */
126   int start() override;
127   void wait() override;
128   int shutdown() override;
129
130   /** @} // Startup/Shutdown */
131
132   /**
133    * @defgroup Messaging
134    * @{
135    */
136   int send_message(Message *m, const entity_inst_t& dest) override {
137     return _send_message(m, dest);
138   }
139
140   int send_message(Message *m, Connection *con) {
141     return _send_message(m, con);
142   }
143
144   /** @} // Messaging */
145
146   /**
147    * @defgroup Connection Management
148    * @{
149    */
150   ConnectionRef get_connection(const entity_inst_t& dest) override;
151   ConnectionRef get_loopback_connection() override;
152   int send_keepalive(Connection *con);
153   void mark_down(const entity_addr_t& addr) override;
154   void mark_down(Connection *con);
155   void mark_disposable(Connection *con);
156   void mark_down_all() override;
157   /** @} // Connection Management */
158 protected:
159   /**
160    * @defgroup Messenger Interfaces
161    * @{
162    */
163   /**
164    * Start up the DispatchQueue thread once we have somebody to dispatch to.
165    */
166   void ready() override;
167   /** @} // Messenger Interfaces */
168 private:
169   /**
170    * @defgroup Inner classes
171    * @{
172    */
173
174 public:
175   Accepter accepter;
176   DispatchQueue dispatch_queue;
177
178   friend class Accepter;
179
180   /**
181    * Register a new pipe for accept
182    *
183    * @param sd socket
184    */
185   Pipe *add_accept_pipe(int sd);
186
187 private:
188
189   /**
190    * A thread used to tear down Pipes when they're complete.
191    */
192   class ReaperThread : public Thread {
193     SimpleMessenger *msgr;
194   public:
195     explicit ReaperThread(SimpleMessenger *m) : msgr(m) {}
196     void *entry() override {
197       msgr->reaper_entry();
198       return 0;
199     }
200   } reaper_thread;
201
202   /**
203    * @} // Inner classes
204    */
205
206   /**
207    * @defgroup Utility functions
208    * @{
209    */
210
211   /**
212    * Create a Pipe associated with the given entity (of the given type).
213    * Initiate the connection. (This function returning does not guarantee
214    * connection success.)
215    *
216    * @param addr The address of the entity to connect to.
217    * @param type The peer type of the entity at the address.
218    * @param con An existing Connection to associate with the new Pipe. If
219    * NULL, it creates a new Connection.
220    * @param first an initial message to queue on the new pipe
221    *
222    * @return a pointer to the newly-created Pipe. Caller does not own a
223    * reference; take one if you need it.
224    */
225   Pipe *connect_rank(const entity_addr_t& addr, int type, PipeConnection *con,
226                      Message *first);
227   /**
228    * Send a message, lazily or not.
229    * This just glues send_message together and passes
230    * the input on to submit_message.
231    */
232   int _send_message(Message *m, const entity_inst_t& dest);
233   /**
234    * Same as above, but for the Connection-based variants.
235    */
236   int _send_message(Message *m, Connection *con);
237   /**
238    * Queue up a Message for delivery to the entity specified
239    * by addr and dest_type.
240    * submit_message() is responsible for creating
241    * new Pipes (and closing old ones) as necessary.
242    *
243    * @param m The Message to queue up. This function eats a reference.
244    * @param con The existing Connection to use, or NULL if you don't know of one.
245    * @param addr The address to send the Message to.
246    * @param dest_type The peer type of the address we're sending to
247    * just drop silently under failure.
248    * @param already_locked If false, submit_message() will acquire the
249    * SimpleMessenger lock before accessing shared data structures; otherwise
250    * it will assume the lock is held. NOTE: if you are making a request
251    * without locking, you MUST have filled in the con with a valid pointer.
252    */
253   void submit_message(Message *m, PipeConnection *con,
254                       const entity_addr_t& addr, int dest_type,
255                       bool already_locked);
256   /**
257    * Look through the pipes in the pipe_reap_queue and tear them down.
258    */
259   void reaper();
260   /**
261    * @} // Utility functions
262    */
263
264   // SimpleMessenger stuff
265   /// approximately unique ID set by the Constructor for use in entity_addr_t
266   uint64_t nonce;
267   /// overall lock used for SimpleMessenger data structures
268   Mutex lock;
269   /// true, specifying we haven't learned our addr; set false when we find it.
270   // maybe this should be protected by the lock?
271   bool need_addr;
272
273 public:
274   bool get_need_addr() const { return need_addr; }
275
276 private:
277   /**
278    *  false; set to true if the SimpleMessenger bound to a specific address;
279    *  and set false again by Accepter::stop(). This isn't lock-protected
280    *  since you shouldn't be able to race the only writers.
281    */
282   bool did_bind;
283   /// counter for the global seq our connection protocol uses
284   __u32 global_seq;
285   /// lock to protect the global_seq
286   ceph_spinlock_t global_seq_lock;
287
288   /**
289    * hash map of addresses to Pipes
290    *
291    * NOTE: a Pipe* with state CLOSED may still be in the map but is considered
292    * invalid and can be replaced by anyone holding the msgr lock
293    */
294   ceph::unordered_map<entity_addr_t, Pipe*> rank_pipe;
295   /**
296    * list of pipes are in teh process of accepting
297    *
298    * These are not yet in the rank_pipe map.
299    */
300   set<Pipe*> accepting_pipes;
301   /// a set of all the Pipes we have which are somehow active
302   set<Pipe*>      pipes;
303   /// a list of Pipes we want to tear down
304   list<Pipe*>     pipe_reap_queue;
305
306   /// internal cluster protocol version, if any, for talking to entities of the same type.
307   int cluster_protocol;
308
309   Cond  stop_cond;
310   bool stopped = true;
311
312   bool reaper_started, reaper_stop;
313   Cond reaper_cond;
314
315   /// This Cond is slept on by wait() and signaled by dispatch_entry()
316   Cond  wait_cond;
317
318   friend class Pipe;
319
320   Pipe *_lookup_pipe(const entity_addr_t& k) {
321     ceph::unordered_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.find(k);
322     if (p == rank_pipe.end())
323       return NULL;
324     // see lock cribbing in Pipe::fault()
325     if (p->second->state_closed)
326       return NULL;
327     return p->second;
328   }
329
330 public:
331
332   int timeout;
333
334   /// con used for sending messages to ourselves
335   ConnectionRef local_connection;
336
337   /**
338    * @defgroup SimpleMessenger internals
339    * @{
340    */
341
342   /**
343    * This wraps ms_deliver_get_authorizer. We use it for Pipe.
344    */
345   AuthAuthorizer *get_authorizer(int peer_type, bool force_new);
346   /**
347    * This wraps ms_deliver_verify_authorizer; we use it for Pipe.
348    */
349   bool verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& auth, bufferlist& auth_reply,
350                          bool& isvalid,CryptoKey& session_key);
351   /**
352    * Increment the global sequence for this SimpleMessenger and return it.
353    * This is for the connect protocol, although it doesn't hurt if somebody
354    * else calls it.
355    *
356    * @return a global sequence ID that nobody else has seen.
357    */
358   __u32 get_global_seq(__u32 old=0) {
359     ceph_spin_lock(&global_seq_lock);
360     if (old > global_seq)
361       global_seq = old;
362     __u32 ret = ++global_seq;
363     ceph_spin_unlock(&global_seq_lock);
364     return ret;
365   }
366   /**
367    * Get the protocol version we support for the given peer type: either
368    * a peer protocol (if it matches our own), the protocol version for the
369    * peer (if we're connecting), or our protocol version (if we're accepting).
370    */
371   int get_proto_version(int peer_type, bool connect);
372
373   /**
374    * Fill in the features, address and peer type for the local connection, which
375    * is used for delivering messages back to ourself.
376    */
377   void init_local_connection();
378   /**
379    * Tell the SimpleMessenger its full IP address.
380    *
381    * This is used by Pipes when connecting to other endpoints, and
382    * probably shouldn't be called by anybody else.
383    */
384   void learned_addr(const entity_addr_t& peer_addr_for_me);
385
386   /**
387    * This function is used by the reaper thread. As long as nobody
388    * has set reaper_stop, it calls the reaper function, then
389    * waits to be signaled when it needs to reap again (or when it needs
390    * to stop).
391    */
392   void reaper_entry();
393   /**
394    * Add a pipe to the pipe_reap_queue, to be torn down on
395    * the next call to reaper().
396    * It should really only be the Pipe calling this, in our current
397    * implementation.
398    *
399    * @param pipe A Pipe which has stopped its threads and is
400    * ready to be torn down.
401    */
402   void queue_reap(Pipe *pipe);
403
404   /**
405    * Used to get whether this connection ready to send
406    */
407   bool is_connected(Connection *con);
408   /**
409    * @} // SimpleMessenger Internals
410    */
411 } ;
412
413 #endif /* CEPH_SIMPLEMESSENGER_H */