Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / AsyncMessenger.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #ifndef CEPH_ASYNCMESSENGER_H
18 #define CEPH_ASYNCMESSENGER_H
19
20 #include "include/types.h"
21 #include "include/xlist.h"
22
23 #include <map>
24 using namespace std;
25 #include "include/unordered_map.h"
26 #include "include/unordered_set.h"
27
28 #include "common/Mutex.h"
29 #include "common/Cond.h"
30 #include "common/Thread.h"
31
32 #include "include/Spinlock.h"
33
34 #include "msg/SimplePolicyMessenger.h"
35 #include "msg/DispatchQueue.h"
36 #include "include/assert.h"
37 #include "AsyncConnection.h"
38 #include "Event.h"
39
40
41 class AsyncMessenger;
42
43 /**
44  * If the Messenger binds to a specific address, the Processor runs
45  * and listens for incoming connections.
46  */
47 class Processor {
48   AsyncMessenger *msgr;
49   NetHandler net;
50   Worker *worker;
51   ServerSocket listen_socket;
52   EventCallbackRef listen_handler;
53
54   class C_processor_accept;
55
56  public:
57   Processor(AsyncMessenger *r, Worker *w, CephContext *c);
58   ~Processor() { delete listen_handler; };
59
60   void stop();
61   int bind(const entity_addr_t &bind_addr,
62            const set<int>& avoid_ports,
63            entity_addr_t* bound_addr);
64   void start();
65   void accept();
66 };
67
68 /*
69  * AsyncMessenger is represented for maintaining a set of asynchronous connections,
70  * it may own a bind address and the accepted connections will be managed by
71  * AsyncMessenger.
72  *
73  */
74
75 class AsyncMessenger : public SimplePolicyMessenger {
76   // First we have the public Messenger interface implementation...
77 public:
78   /**
79    * Initialize the AsyncMessenger!
80    *
81    * @param cct The CephContext to use
82    * @param name The name to assign ourselves
83    * _nonce A unique ID to use for this AsyncMessenger. It should not
84    * be a value that will be repeated if the daemon restarts.
85    */
86   AsyncMessenger(CephContext *cct, entity_name_t name, const std::string &type,
87                  string mname, uint64_t _nonce);
88
89   /**
90    * Destroy the AsyncMessenger. Pretty simple since all the work is done
91    * elsewhere.
92    */
93   ~AsyncMessenger() override;
94
95   /** @defgroup Accessors
96    * @{
97    */
98   void set_addr_unknowns(const entity_addr_t &addr) override;
99   void set_addr(const entity_addr_t &addr) override;
100
101   int get_dispatch_queue_len() override {
102     return dispatch_queue.get_queue_len();
103   }
104
105   double get_dispatch_queue_max_age(utime_t now) override {
106     return dispatch_queue.get_max_age(now);
107   }
108   /** @} Accessors */
109
110   /**
111    * @defgroup Configuration functions
112    * @{
113    */
114   void set_cluster_protocol(int p) override {
115     assert(!started && !did_bind);
116     cluster_protocol = p;
117   }
118
119   int bind(const entity_addr_t& bind_addr) override;
120   int rebind(const set<int>& avoid_ports) override;
121   int client_bind(const entity_addr_t& bind_addr) override;
122
123   /** @} Configuration functions */
124
125   /**
126    * @defgroup Startup/Shutdown
127    * @{
128    */
129   int start() override;
130   void wait() override;
131   int shutdown() override;
132
133   /** @} // Startup/Shutdown */
134
135   /**
136    * @defgroup Messaging
137    * @{
138    */
139   int send_message(Message *m, const entity_inst_t& dest) override {
140     Mutex::Locker l(lock);
141
142     return _send_message(m, dest);
143   }
144
145   /** @} // Messaging */
146
147   /**
148    * @defgroup Connection Management
149    * @{
150    */
151   ConnectionRef get_connection(const entity_inst_t& dest) override;
152   ConnectionRef get_loopback_connection() override;
153   void mark_down(const entity_addr_t& addr) override;
154   void mark_down_all() override {
155     shutdown_connections(true);
156   }
157   /** @} // Connection Management */
158
159   /**
160    * @defgroup Inner classes
161    * @{
162    */
163
164   /**
165    * @} // Inner classes
166    */
167
168 protected:
169   /**
170    * @defgroup Messenger Interfaces
171    * @{
172    */
173   /**
174    * Start up the DispatchQueue thread once we have somebody to dispatch to.
175    */
176   void ready() override;
177   /** @} // Messenger Interfaces */
178
179 private:
180
181   /**
182    * @defgroup Utility functions
183    * @{
184    */
185
186   /**
187    * Create a connection associated with the given entity (of the given type).
188    * Initiate the connection. (This function returning does not guarantee
189    * connection success.)
190    *
191    * @param addr The address of the entity to connect to.
192    * @param type The peer type of the entity at the address.
193    *
194    * @return a pointer to the newly-created connection. Caller does not own a
195    * reference; take one if you need it.
196    */
197   AsyncConnectionRef create_connect(const entity_addr_t& addr, int type);
198
199   /**
200    * Queue up a Message for delivery to the entity specified
201    * by addr and dest_type.
202    * submit_message() is responsible for creating
203    * new AsyncConnection (and closing old ones) as necessary.
204    *
205    * @param m The Message to queue up. This function eats a reference.
206    * @param con The existing Connection to use, or NULL if you don't know of one.
207    * @param dest_addr The address to send the Message to.
208    * @param dest_type The peer type of the address we're sending to
209    * just drop silently under failure.
210    */
211   void submit_message(Message *m, AsyncConnectionRef con,
212                       const entity_addr_t& dest_addr, int dest_type);
213
214   int _send_message(Message *m, const entity_inst_t& dest);
215   void _finish_bind(const entity_addr_t& bind_addr,
216                     const entity_addr_t& listen_addr);
217
218  private:
219   static const uint64_t ReapDeadConnectionThreshold = 5;
220
221   NetworkStack *stack;
222   std::vector<Processor*> processors;
223   friend class Processor;
224   DispatchQueue dispatch_queue;
225
226   // the worker run messenger's cron jobs
227   Worker *local_worker;
228
229   std::string ms_type;
230
231   /// overall lock used for AsyncMessenger data structures
232   Mutex lock;
233   // AsyncMessenger stuff
234   /// approximately unique ID set by the Constructor for use in entity_addr_t
235   uint64_t nonce;
236
237   /// true, specifying we haven't learned our addr; set false when we find it.
238   // maybe this should be protected by the lock?
239   bool need_addr;
240
241   /**
242    * set to bind address if bind was called before NetworkStack was ready to
243    * bind
244    */
245   entity_addr_t pending_bind_addr;
246
247   /**
248    * false; set to true if a pending bind exists
249    */
250   bool pending_bind = false;
251
252   /**
253    *  The following aren't lock-protected since you shouldn't be able to race
254    *  the only writers.
255    */
256
257   /**
258    *  false; set to true if the AsyncMessenger bound to a specific address;
259    *  and set false again by Accepter::stop().
260    */
261   bool did_bind;
262   /// counter for the global seq our connection protocol uses
263   __u32 global_seq;
264   /// lock to protect the global_seq
265   ceph_spinlock_t global_seq_lock;
266
267   /**
268    * hash map of addresses to Asyncconnection
269    *
270    * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
271    * invalid and can be replaced by anyone holding the msgr lock
272    */
273   ceph::unordered_map<entity_addr_t, AsyncConnectionRef> conns;
274
275   /**
276    * list of connection are in teh process of accepting
277    *
278    * These are not yet in the conns map.
279    */
280   set<AsyncConnectionRef> accepting_conns;
281
282   /**
283    * list of connection are closed which need to be clean up
284    *
285    * Because AsyncMessenger and AsyncConnection follow a lock rule that
286    * we can lock AsyncMesenger::lock firstly then lock AsyncConnection::lock
287    * but can't reversed. This rule is aimed to avoid dead lock.
288    * So if AsyncConnection want to unregister itself from AsyncMessenger,
289    * we pick up this idea that just queue itself to this set and do lazy
290    * deleted for AsyncConnection. "_lookup_conn" must ensure not return a
291    * AsyncConnection in this set.
292    */
293   Mutex deleted_lock;
294   set<AsyncConnectionRef> deleted_conns;
295
296   EventCallbackRef reap_handler;
297
298   /// internal cluster protocol version, if any, for talking to entities of the same type.
299   int cluster_protocol;
300
301   Cond  stop_cond;
302   bool stopped;
303
304   AsyncConnectionRef _lookup_conn(const entity_addr_t& k) {
305     assert(lock.is_locked());
306     ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator p = conns.find(k);
307     if (p == conns.end())
308       return NULL;
309
310     // lazy delete, see "deleted_conns"
311     Mutex::Locker l(deleted_lock);
312     if (deleted_conns.erase(p->second)) {
313       p->second->get_perf_counter()->dec(l_msgr_active_connections);
314       conns.erase(p);
315       return NULL;
316     }
317
318     return p->second;
319   }
320
321   void _init_local_connection() {
322     assert(lock.is_locked());
323     local_connection->peer_addr = my_inst.addr;
324     local_connection->peer_type = my_inst.name.type();
325     local_connection->set_features(CEPH_FEATURES_ALL);
326     ms_deliver_handle_fast_connect(local_connection.get());
327   }
328
329   void shutdown_connections(bool queue_reset);
330
331 public:
332
333   /// con used for sending messages to ourselves
334   ConnectionRef local_connection;
335
336   /**
337    * @defgroup AsyncMessenger internals
338    * @{
339    */
340   /**
341    * This wraps _lookup_conn.
342    */
343   AsyncConnectionRef lookup_conn(const entity_addr_t& k) {
344     Mutex::Locker l(lock);
345     return _lookup_conn(k);
346   }
347
348   int accept_conn(AsyncConnectionRef conn) {
349     Mutex::Locker l(lock);
350     auto it = conns.find(conn->peer_addr);
351     if (it != conns.end()) {
352       AsyncConnectionRef existing = it->second;
353
354       // lazy delete, see "deleted_conns"
355       // If conn already in, we will return 0
356       Mutex::Locker l(deleted_lock);
357       if (deleted_conns.erase(existing)) {
358         existing->get_perf_counter()->dec(l_msgr_active_connections);
359         conns.erase(it);
360       } else if (conn != existing) {
361         return -1;
362       }
363     }
364     conns[conn->peer_addr] = conn;
365     conn->get_perf_counter()->inc(l_msgr_active_connections);
366     accepting_conns.erase(conn);
367     return 0;
368   }
369
370   void learned_addr(const entity_addr_t &peer_addr_for_me);
371   void add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
372   NetworkStack *get_stack() {
373     return stack;
374   }
375
376   /**
377    * This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.
378    */
379   AuthAuthorizer *get_authorizer(int peer_type, bool force_new) {
380     return ms_deliver_get_authorizer(peer_type, force_new);
381   }
382
383   /**
384    * This wraps ms_deliver_verify_authorizer; we use it for AsyncConnection.
385    */
386   bool verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& auth, bufferlist& auth_reply,
387                          bool& isvalid, CryptoKey& session_key) {
388     return ms_deliver_verify_authorizer(con, peer_type, protocol, auth,
389                                         auth_reply, isvalid, session_key);
390   }
391   /**
392    * Increment the global sequence for this AsyncMessenger and return it.
393    * This is for the connect protocol, although it doesn't hurt if somebody
394    * else calls it.
395    *
396    * @return a global sequence ID that nobody else has seen.
397    */
398   __u32 get_global_seq(__u32 old=0) {
399     ceph_spin_lock(&global_seq_lock);
400     if (old > global_seq)
401       global_seq = old;
402     __u32 ret = ++global_seq;
403     ceph_spin_unlock(&global_seq_lock);
404     return ret;
405   }
406   /**
407    * Get the protocol version we support for the given peer type: either
408    * a peer protocol (if it matches our own), the protocol version for the
409    * peer (if we're connecting), or our protocol version (if we're accepting).
410    */
411   int get_proto_version(int peer_type, bool connect) const;
412
413   /**
414    * Fill in the address and peer type for the local connection, which
415    * is used for delivering messages back to ourself.
416    */
417   void init_local_connection() {
418     Mutex::Locker l(lock);
419     _init_local_connection();
420   }
421
422   /**
423    * Unregister connection from `conns`
424    *
425    * See "deleted_conns"
426    */
427   void unregister_conn(AsyncConnectionRef conn) {
428     Mutex::Locker l(deleted_lock);
429     deleted_conns.insert(conn);
430
431     if (deleted_conns.size() >= ReapDeadConnectionThreshold) {
432       local_worker->center.dispatch_event_external(reap_handler);
433     }
434   }
435
436   /**
437    * Reap dead connection from `deleted_conns`
438    *
439    * @return the number of dead connections
440    *
441    * See "deleted_conns"
442    */
443   int reap_dead();
444
445   /**
446    * @} // AsyncMessenger Internals
447    */
448 } ;
449
450 #endif /* CEPH_ASYNCMESSENGER_H */