Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / Messenger.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15
16
17 #ifndef CEPH_MESSENGER_H
18 #define CEPH_MESSENGER_H
19
20 #include <map>
21 using namespace std;
22
23 #include "Message.h"
24 #include "Dispatcher.h"
25 #include "common/Mutex.h"
26 #include "common/Cond.h"
27 #include "include/Context.h"
28 #include "include/types.h"
29 #include "include/ceph_features.h"
30 #include "auth/Crypto.h"
31
32 #include <errno.h>
33 #include <sstream>
34 #include <signal.h>
35
36 #define SOCKET_PRIORITY_MIN_DELAY 6
37
38 class Timer;
39
40
41 class Messenger {
42 private:
43   list<Dispatcher*> dispatchers;
44   list <Dispatcher*> fast_dispatchers;
45   ZTracer::Endpoint trace_endpoint;
46
47   void set_endpoint_addr(const entity_addr_t& a,
48                          const entity_name_t &name);
49
50 protected:
51   /// the "name" of the local daemon. eg client.99
52   entity_inst_t my_inst;
53   int default_send_priority;
54   /// set to true once the Messenger has started, and set to false on shutdown
55   bool started;
56   uint32_t magic;
57   int socket_priority;
58
59 public:
60   /**
61    * Various Messenger conditional config/type flags to allow
62    * different "transport" Messengers to tune themselves
63    */
64   static const int HAS_HEAVY_TRAFFIC    = 0x0001;
65   static const int HAS_MANY_CONNECTIONS = 0x0002;
66   static const int HEARTBEAT            = 0x0004;
67
68   /**
69    *  The CephContext this Messenger uses. Many other components initialize themselves
70    *  from this value.
71    */
72   CephContext *cct;
73   int crcflags;
74
75   /**
76    * A Policy describes the rules of a Connection. Is there a limit on how
77    * much data this Connection can have locally? When the underlying connection
78    * experiences an error, does the Connection disappear? Can this Messenger
79    * re-establish the underlying connection?
80    */
81   struct Policy {
82     /// If true, the Connection is tossed out on errors.
83     bool lossy;
84     /// If true, the underlying connection can't be re-established from this end.
85     bool server;
86     /// If true, we will standby when idle
87     bool standby;
88     /// If true, we will try to detect session resets
89     bool resetcheck;
90     /**
91      *  The throttler is used to limit how much data is held by Messages from
92      *  the associated Connection(s). When reading in a new Message, the Messenger
93      *  will call throttler->throttle() for the size of the new Message.
94      */
95     Throttle *throttler_bytes;
96     Throttle *throttler_messages;
97
98     /// Specify features supported locally by the endpoint.
99     uint64_t features_supported;
100     /// Specify features any remotes must have to talk to this endpoint.
101     uint64_t features_required;
102
103     Policy()
104       : lossy(false), server(false), standby(false), resetcheck(true),
105         throttler_bytes(NULL),
106         throttler_messages(NULL),
107         features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
108         features_required(0) {}
109   private:
110     Policy(bool l, bool s, bool st, bool r, uint64_t req)
111       : lossy(l), server(s), standby(st), resetcheck(r),
112         throttler_bytes(NULL),
113         throttler_messages(NULL),
114         features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
115         features_required(req) {}
116
117   public:
118     static Policy stateful_server(uint64_t req) {
119       return Policy(false, true, true, true, req);
120     }
121     static Policy stateless_server(uint64_t req) {
122       return Policy(true, true, false, false, req);
123     }
124     static Policy lossless_peer(uint64_t req) {
125       return Policy(false, false, true, false, req);
126     }
127     static Policy lossless_peer_reuse(uint64_t req) {
128       return Policy(false, false, true, true, req);
129     }
130     static Policy lossy_client(uint64_t req) {
131       return Policy(true, false, false, false, req);
132     }
133     static Policy lossless_client(uint64_t req) {
134       return Policy(false, false, false, true, req);
135     }
136   };
137
138   /**
139    * Messenger constructor. Call this from your implementation.
140    * Messenger users should construct full implementations directly,
141    * or use the create() function.
142    */
143   Messenger(CephContext *cct_, entity_name_t w)
144     : trace_endpoint("0.0.0.0", 0, "Messenger"),
145       my_inst(),
146       default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false),
147       magic(0),
148       socket_priority(-1),
149       cct(cct_),
150       crcflags(get_default_crc_flags(cct->_conf))
151   {
152     my_inst.name = w;
153   }
154   virtual ~Messenger() {}
155
156   /**
157    * create a new messenger
158    *
159    * Create a new messenger instance, with whatever implementation is
160    * available or specified via the configuration in cct.
161    *
162    * @param cct context
163    * @param type name of messenger type
164    * @param name entity name to register
165    * @param lname logical name of the messenger in this process (e.g., "client")
166    * @param nonce nonce value to uniquely identify this instance on the current host
167    * @param features bits for the local connection
168    * @param cflags general set of flags to configure transport resources
169    */
170   static Messenger *create(CephContext *cct,
171                            const string &type,
172                            entity_name_t name,
173                            string lname,
174                            uint64_t nonce,
175                            uint64_t cflags);
176
177   /**
178    * create a new messenger
179    *
180    * Create a new messenger instance.
181    * Same as the above, but a slightly simpler interface for clients:
182    * - Generate a random nonce
183    * - use the default feature bits
184    * - get the messenger type from cct
185    * - use the client entity_type
186    *
187    * @param cct context
188    * @param lname logical name of the messenger in this process (e.g., "client")
189    */
190   static Messenger *create_client_messenger(CephContext *cct, string lname);
191
192   /**
193    * @defgroup Accessors
194    * @{
195    */
196   /**
197    * Retrieve the Messenger's instance.
198    *
199    * @return A const reference to the instance this Messenger
200    * currently believes to be its own.
201    */
202   const entity_inst_t& get_myinst() { return my_inst; }
203   /**
204    * set messenger's instance
205    */
206   void set_myinst(entity_inst_t i) { my_inst = i; }
207
208   uint32_t get_magic() { return magic; }
209   void set_magic(int _magic) { magic = _magic; }
210
211   /**
212    * Retrieve the Messenger's address.
213    *
214    * @return A const reference to the address this Messenger
215    * currently believes to be its own.
216    */
217   const entity_addr_t& get_myaddr() { return my_inst.addr; }
218 protected:
219   /**
220    * set messenger's address
221    */
222   virtual void set_myaddr(const entity_addr_t& a) {
223     my_inst.addr = a;
224     set_endpoint_addr(a, my_inst.name);
225   }
226 public:
227   /**
228    * @return the zipkin trace endpoint
229    */
230   const ZTracer::Endpoint* get_trace_endpoint() const {
231     return &trace_endpoint;
232   }
233
234   /**
235    * Retrieve the Messenger's name.
236    *
237    * @return A const reference to the name this Messenger
238    * currently believes to be its own.
239    */
240   const entity_name_t& get_myname() { return my_inst.name; }
241   /**
242    * Set the name of the local entity. The name is reported to others and
243    * can be changed while the system is running, but doing so at incorrect
244    * times may have bad results.
245    *
246    * @param m The name to set.
247    */
248   void set_myname(const entity_name_t& m) { my_inst.name = m; }
249   /**
250    * Set the unknown address components for this Messenger.
251    * This is useful if the Messenger doesn't know its full address just by
252    * binding, but another Messenger on the same interface has already learned
253    * its full address. This function does not fill in known address elements,
254    * cause a rebind, or do anything of that sort.
255    *
256    * @param addr The address to use as a template.
257    */
258   virtual void set_addr_unknowns(const entity_addr_t &addr) = 0;
259   /**
260    * Set the address for this Messenger. This is useful if the Messenger
261    * binds to a specific address but advertises a different address on the
262    * the network.
263    *
264    * @param addr The address to use.
265    */
266   virtual void set_addr(const entity_addr_t &addr) = 0;
267   /// Get the default send priority.
268   int get_default_send_priority() { return default_send_priority; }
269   /**
270    * Get the number of Messages which the Messenger has received
271    * but not yet dispatched.
272    */
273   virtual int get_dispatch_queue_len() = 0;
274
275   /**
276    * Get age of oldest undelivered message
277    * (0 if the queue is empty)
278    */
279   virtual double get_dispatch_queue_max_age(utime_t now) = 0;
280   /**
281    * Get the default crc flags for this messenger.
282    * but not yet dispatched.
283    */
284   static int get_default_crc_flags(md_config_t *);
285
286   /**
287    * @} // Accessors
288    */
289
290   /**
291    * @defgroup Configuration
292    * @{
293    */
294   /**
295    * Set the cluster protocol in use by this daemon.
296    * This is an init-time function and cannot be called after calling
297    * start() or bind().
298    *
299    * @param p The cluster protocol to use. Defined externally.
300    */
301   virtual void set_cluster_protocol(int p) = 0;
302   /**
303    * Set a policy which is applied to all peers who do not have a type-specific
304    * Policy.
305    * This is an init-time function and cannot be called after calling
306    * start() or bind().
307    *
308    * @param p The Policy to apply.
309    */
310   virtual void set_default_policy(Policy p) = 0;
311   /**
312    * Set a policy which is applied to all peers of the given type.
313    * This is an init-time function and cannot be called after calling
314    * start() or bind().
315    *
316    * @param type The peer type this policy applies to.
317    * @param p The policy to apply.
318    */
319   virtual void set_policy(int type, Policy p) = 0;
320   /**
321    * Set the Policy associated with a type of peer.
322    *
323    * This can be called either on initial setup, or after connections
324    * are already established.  However, the policies for existing
325    * connections will not be affected; the new policy will only apply
326    * to future connections.
327    *
328    * @param t The peer type to get the default policy for.
329    * @return A const Policy reference.
330    */
331   virtual Policy get_policy(int t) = 0;
332   /**
333    * Get the default Policy
334    *
335    * @return A const Policy reference.
336    */
337   virtual Policy get_default_policy() = 0;
338   /**
339    * Set Throttlers applied to all Messages from the given type of peer
340    *
341    * This is an init-time function and cannot be called after calling
342    * start() or bind().
343    *
344    * @param type The peer type the Throttlers will apply to.
345    * @param bytes The Throttle for the number of bytes carried by the message
346    * @param msgs The Throttle for the number of messages for this @p type
347    * @note The Messenger does not take ownership of the Throttle pointers, but
348    * you must not destroy them before you destroy the Messenger.
349    */
350   virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0;
351   /**
352    * Set the default send priority
353    *
354    * This is an init-time function and must be called *before* calling
355    * start().
356    *
357    * @param p The cluster protocol to use. Defined externally.
358    */
359   void set_default_send_priority(int p) {
360     assert(!started);
361     default_send_priority = p;
362   }
363   /**
364    * Set the priority(SO_PRIORITY) for all packets to be sent on this socket.
365    *
366    * Linux uses this value to order the networking queues: packets with a higher
367    * priority may be processed first depending on the selected device queueing
368    * discipline.
369    *
370    * @param prio The priority. Setting a priority outside the range 0 to 6
371    * requires the CAP_NET_ADMIN capability.
372    */
373   void set_socket_priority(int prio) {
374     socket_priority = prio;
375   }
376   /**
377    * Get the socket priority
378    *
379    * @return the socket priority
380    */
381   int get_socket_priority() {
382     return socket_priority;
383   }
384   /**
385    * Add a new Dispatcher to the front of the list. If you add
386    * a Dispatcher which is already included, it will get a duplicate
387    * entry. This will reduce efficiency but not break anything.
388    *
389    * @param d The Dispatcher to insert into the list.
390    */
391   void add_dispatcher_head(Dispatcher *d) { 
392     bool first = dispatchers.empty();
393     dispatchers.push_front(d);
394     if (d->ms_can_fast_dispatch_any())
395       fast_dispatchers.push_front(d);
396     if (first)
397       ready();
398   }
399   /**
400    * Add a new Dispatcher to the end of the list. If you add
401    * a Dispatcher which is already included, it will get a duplicate
402    * entry. This will reduce efficiency but not break anything.
403    *
404    * @param d The Dispatcher to insert into the list.
405    */
406   void add_dispatcher_tail(Dispatcher *d) { 
407     bool first = dispatchers.empty();
408     dispatchers.push_back(d);
409     if (d->ms_can_fast_dispatch_any())
410       fast_dispatchers.push_back(d);
411     if (first)
412       ready();
413   }
414   /**
415    * Bind the Messenger to a specific address. If bind_addr
416    * is not completely filled in the system will use the
417    * valid portions and cycle through the unset ones (eg, the port)
418    * in an unspecified order.
419    *
420    * @param bind_addr The address to bind to.
421    * @return 0 on success, or -1 on error, or -errno if
422    * we can be more specific about the failure.
423    */
424   virtual int bind(const entity_addr_t& bind_addr) = 0;
425   /**
426    * This function performs a full restart of the Messenger component,
427    * whatever that means.  Other entities who connect to this
428    * Messenger post-rebind() should perceive it as a new entity which
429    * they have not previously contacted, and it MUST bind to a
430    * different address than it did previously.
431    *
432    * @param avoid_ports Additional port to avoid binding to.
433    */
434   virtual int rebind(const set<int>& avoid_ports) { return -EOPNOTSUPP; }
435   /**
436    * Bind the 'client' Messenger to a specific address.Messenger will bind
437    * the address before connect to others when option ms_bind_before_connect
438    * is true.
439    * @param bind_addr The address to bind to.
440    * @return 0 on success, or -1 on error, or -errno if
441    */
442   virtual int client_bind(const entity_addr_t& bind_addr) = 0;
443   /**
444    * @} // Configuration
445    */
446
447   /**
448    * @defgroup Startup/Shutdown
449    * @{
450    */
451   /**
452    * Perform any resource allocation, thread startup, etc
453    * that is required before attempting to connect to other
454    * Messengers or transmit messages.
455    * Once this function completes, started shall be set to true.
456    *
457    * @return 0 on success; -errno on failure.
458    */
459   virtual int start() { started = true; return 0; }
460
461   // shutdown
462   /**
463    * Block until the Messenger has finished shutting down (according
464    * to the shutdown() function).
465    * It is valid to call this after calling shutdown(), but it must
466    * be called before deleting the Messenger.
467    */
468   virtual void wait() = 0;
469   /**
470    * Initiate a shutdown of the Messenger.
471    *
472    * @return 0 on success, -errno otherwise.
473    */
474   virtual int shutdown() { started = false; return 0; }
475   /**
476    * @} // Startup/Shutdown
477    */
478
479   /**
480    * @defgroup Messaging
481    * @{
482    */
483   /**
484    * Queue the given Message for the given entity.
485    * Success in this function does not guarantee Message delivery, only
486    * success in queueing the Message. Other guarantees may be provided based
487    * on the Connection policy associated with the dest.
488    *
489    * @param m The Message to send. The Messenger consumes a single reference
490    * when you pass it in.
491    * @param dest The entity to send the Message to.
492    *
493    * DEPRECATED: please do not use this interface for any new code;
494    * use the Connection* variant.
495    *
496    * @return 0 on success, or -errno on failure.
497    */
498   virtual int send_message(Message *m, const entity_inst_t& dest) = 0;
499
500   /**
501    * @} // Messaging
502    */
503   /**
504    * @defgroup Connection Management
505    * @{
506    */
507   /**
508    * Get the Connection object associated with a given entity. If a
509    * Connection does not exist, create one and establish a logical connection.
510    * The caller owns a reference when this returns. Call ->put() when you're
511    * done!
512    *
513    * @param dest The entity to get a connection for.
514    */
515   virtual ConnectionRef get_connection(const entity_inst_t& dest) = 0;
516   /**
517    * Get the Connection object associated with ourselves.
518    */
519   virtual ConnectionRef get_loopback_connection() = 0;
520   /**
521    * Mark down a Connection to a remote.
522    *
523    * This will cause us to discard our outgoing queue for them, and if
524    * reset detection is enabled in the policy and the endpoint tries
525    * to reconnect they will discard their queue when we inform them of
526    * the session reset.
527    *
528    * If there is no Connection to the given dest, it is a no-op.
529    *
530    * This generates a RESET notification to the Dispatcher.
531    *
532    * DEPRECATED: please do not use this interface for any new code;
533    * use the Connection* variant.
534    *
535    * @param a The address to mark down.
536    */
537   virtual void mark_down(const entity_addr_t& a) = 0;
538   /**
539    * Mark all the existing Connections down. This is equivalent
540    * to iterating over all Connections and calling mark_down()
541    * on each.
542    *
543    * This will generate a RESET event for each closed connections.
544    */
545   virtual void mark_down_all() = 0;
546   /**
547    * @} // Connection Management
548    */
549 protected:
550   /**
551    * @defgroup Subclass Interfacing
552    * @{
553    */
554   /**
555    * A courtesy function for Messenger implementations which
556    * will be called when we receive our first Dispatcher.
557    */
558   virtual void ready() { }
559   /**
560    * @} // Subclass Interfacing
561    */
562 public:
563 #ifdef CEPH_USE_SIGPIPE_BLOCKER
564   /**
565    * We need to disable SIGPIPE on all platforms, and if they
566    * don't give us a better mechanism (read: are on Solaris) that
567    * means blocking the signal whenever we do a send or sendmsg...
568    * That means any implementations must invoke MSGR_SIGPIPE_STOPPER in-scope
569    * whenever doing so. On most systems that's blank, but on systems where
570    * it's needed we construct an RAII object to plug and un-plug the SIGPIPE.
571    * See http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
572    */
573   struct sigpipe_stopper {
574     bool blocked;
575     sigset_t existing_mask;
576     sigset_t pipe_mask;
577     sigpipe_stopper() {
578       sigemptyset(&pipe_mask);
579       sigaddset(&pipe_mask, SIGPIPE);
580       sigset_t signals;
581       sigemptyset(&signals);
582       sigpending(&signals);
583       if (sigismember(&signals, SIGPIPE)) {
584         blocked = false;
585       } else {
586         blocked = true;
587         int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask);
588         assert(r == 0);
589       }
590     }
591     ~sigpipe_stopper() {
592       if (blocked) {
593         struct timespec nowait{0};
594         int r = sigtimedwait(&pipe_mask, 0, &nowait);
595         assert(r == EAGAIN || r == 0);
596         r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0);
597         assert(r == 0);
598       }
599     }
600   };
601 #  define MSGR_SIGPIPE_STOPPER Messenger::sigpipe_stopper stopper();
602 #else
603 #  define MSGR_SIGPIPE_STOPPER
604 #endif
605   /**
606    * @defgroup Dispatcher Interfacing
607    * @{
608    */
609   /**
610    * Determine whether a message can be fast-dispatched. We will
611    * query each Dispatcher in sequence to determine if they are
612    * capable of handling a particular message via "fast dispatch".
613    *
614    * @param m The Message we are testing.
615    */
616   bool ms_can_fast_dispatch(const Message *m) {
617     for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
618          p != fast_dispatchers.end();
619          ++p) {
620       if ((*p)->ms_can_fast_dispatch(m))
621         return true;
622     }
623     return false;
624   }
625
626   /**
627    * Deliver a single Message via "fast dispatch".
628    *
629    * @param m The Message we are fast dispatching. We take ownership
630    * of one reference to it.
631    * If none of our Dispatchers can handle it, ceph_abort().
632    */
633   void ms_fast_dispatch(Message *m) {
634     m->set_dispatch_stamp(ceph_clock_now());
635     for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
636          p != fast_dispatchers.end();
637          ++p) {
638       if ((*p)->ms_can_fast_dispatch(m)) {
639         (*p)->ms_fast_dispatch(m);
640         return;
641       }
642     }
643     ceph_abort();
644   }
645   /**
646    *
647    */
648   void ms_fast_preprocess(Message *m) {
649     for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
650          p != fast_dispatchers.end();
651          ++p) {
652       (*p)->ms_fast_preprocess(m);
653     }
654   }
655   /**
656    *  Deliver a single Message. Send it to each Dispatcher
657    *  in sequence until one of them handles it.
658    *  If none of our Dispatchers can handle it, assert(0).
659    *
660    *  @param m The Message to deliver. We take ownership of
661    *  one reference to it.
662    */
663   void ms_deliver_dispatch(Message *m) {
664     m->set_dispatch_stamp(ceph_clock_now());
665     for (list<Dispatcher*>::iterator p = dispatchers.begin();
666          p != dispatchers.end();
667          ++p) {
668       if ((*p)->ms_dispatch(m))
669         return;
670     }
671     lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
672                          << m->get_source_inst() << dendl;
673     assert(!cct->_conf->ms_die_on_unhandled_msg);
674     m->put();
675   }
676   /**
677    * Notify each Dispatcher of a new Connection. Call
678    * this function whenever a new Connection is initiated or
679    * reconnects.
680    *
681    * @param con Pointer to the new Connection.
682    */
683   void ms_deliver_handle_connect(Connection *con) {
684     for (list<Dispatcher*>::iterator p = dispatchers.begin();
685          p != dispatchers.end();
686          ++p)
687       (*p)->ms_handle_connect(con);
688   }
689
690   /**
691    * Notify each fast Dispatcher of a new Connection. Call
692    * this function whenever a new Connection is initiated or
693    * reconnects.
694    *
695    * @param con Pointer to the new Connection.
696    */
697   void ms_deliver_handle_fast_connect(Connection *con) {
698     for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
699          p != fast_dispatchers.end();
700          ++p)
701       (*p)->ms_handle_fast_connect(con);
702   }
703
704   /**
705    * Notify each Dispatcher of a new incomming Connection. Call
706    * this function whenever a new Connection is accepted.
707    *
708    * @param con Pointer to the new Connection.
709    */
710   void ms_deliver_handle_accept(Connection *con) {
711     for (list<Dispatcher*>::iterator p = dispatchers.begin();
712          p != dispatchers.end();
713          ++p)
714       (*p)->ms_handle_accept(con);
715   }
716
717   /**
718    * Notify each fast Dispatcher of a new incoming Connection. Call
719    * this function whenever a new Connection is accepted.
720    *
721    * @param con Pointer to the new Connection.
722    */
723   void ms_deliver_handle_fast_accept(Connection *con) {
724     for (list<Dispatcher*>::iterator p = fast_dispatchers.begin();
725          p != fast_dispatchers.end();
726          ++p)
727       (*p)->ms_handle_fast_accept(con);
728   }
729
730   /**
731    * Notify each Dispatcher of a Connection which may have lost
732    * Messages. Call this function whenever you detect that a lossy Connection
733    * has been disconnected.
734    *
735    * @param con Pointer to the broken Connection.
736    */
737   void ms_deliver_handle_reset(Connection *con) {
738     for (list<Dispatcher*>::iterator p = dispatchers.begin();
739          p != dispatchers.end();
740          ++p) {
741       if ((*p)->ms_handle_reset(con))
742         return;
743     }
744   }
745   /**
746    * Notify each Dispatcher of a Connection which has been "forgotten" about
747    * by the remote end, implying that messages have probably been lost.
748    * Call this function whenever you detect a reset.
749    *
750    * @param con Pointer to the broken Connection.
751    */
752   void ms_deliver_handle_remote_reset(Connection *con) {
753     for (list<Dispatcher*>::iterator p = dispatchers.begin();
754          p != dispatchers.end();
755          ++p)
756       (*p)->ms_handle_remote_reset(con);
757   }
758
759   /**
760    * Notify each Dispatcher of a Connection for which reconnection
761    * attempts are being refused. Call this function whenever you
762    * detect that a lossy Connection has been disconnected and it's
763    * impossible to reconnect.
764    *
765    * @param con Pointer to the broken Connection.
766    */
767   void ms_deliver_handle_refused(Connection *con) {
768     for (list<Dispatcher*>::iterator p = dispatchers.begin();
769          p != dispatchers.end();
770          ++p) {
771       if ((*p)->ms_handle_refused(con))
772         return;
773     }
774   }
775
776   /**
777    * Get the AuthAuthorizer for a new outgoing Connection.
778    *
779    * @param peer_type The peer type for the new Connection
780    * @param force_new True if we want to wait for new keys, false otherwise.
781    * @return A pointer to the AuthAuthorizer, if we have one; NULL otherwise
782    */
783   AuthAuthorizer *ms_deliver_get_authorizer(int peer_type, bool force_new) {
784     AuthAuthorizer *a = 0;
785     for (list<Dispatcher*>::iterator p = dispatchers.begin();
786          p != dispatchers.end();
787          ++p) {
788       if ((*p)->ms_get_authorizer(peer_type, &a, force_new))
789         return a;
790     }
791     return NULL;
792   }
793   /**
794    * Verify that the authorizer on a new incoming Connection is correct.
795    *
796    * @param con The new incoming Connection
797    * @param peer_type The type of the endpoint on the new Connection
798    * @param protocol The ID of the protocol in use (at time of writing, cephx or none)
799    * @param authorizer The authorization string supplied by the remote
800    * @param authorizer_reply Output param: The string we should send back to
801    * the remote to authorize ourselves. Only filled in if isvalid
802    * @param isvalid Output param: True if authorizer is valid, false otherwise
803    *
804    * @return True if we were able to prove or disprove correctness of
805    * authorizer, false otherwise.
806    */
807   bool ms_deliver_verify_authorizer(Connection *con, int peer_type,
808                                     int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
809                                     bool& isvalid, CryptoKey& session_key) {
810     for (list<Dispatcher*>::iterator p = dispatchers.begin();
811          p != dispatchers.end();
812          ++p) {
813       if ((*p)->ms_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid, session_key))
814         return true;
815     }
816     return false;
817   }
818
819   /**
820    * @} // Dispatcher Interfacing
821    */
822 };
823
824
825
826 #endif