Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / simple / SimpleMessenger.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include <errno.h>
16 #include <iostream>
17 #include <fstream>
18
19
20 #include "SimpleMessenger.h"
21
22 #include "common/config.h"
23 #include "common/Timer.h"
24 #include "common/errno.h"
25 #include "common/valgrind.h"
26 #include "auth/Crypto.h"
27 #include "include/Spinlock.h"
28
29 #define dout_subsys ceph_subsys_ms
30 #undef dout_prefix
31 #define dout_prefix _prefix(_dout, this)
32 static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) {
33   return *_dout << "-- " << msgr->get_myaddr() << " ";
34 }
35
36
37 /*******************
38  * SimpleMessenger
39  */
40
41 SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
42                                  string mname, uint64_t _nonce)
43   : SimplePolicyMessenger(cct, name,mname, _nonce),
44     accepter(this, _nonce),
45     dispatch_queue(cct, this, mname),
46     reaper_thread(this),
47     nonce(_nonce),
48     lock("SimpleMessenger::lock"), need_addr(true), did_bind(false),
49     global_seq(0),
50     cluster_protocol(0),
51     reaper_started(false), reaper_stop(false),
52     timeout(0),
53     local_connection(new PipeConnection(cct, this))
54 {
55   ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout),
56                              "SimpleMessenger read timeout");
57   ceph_spin_init(&global_seq_lock);
58   init_local_connection();
59 }
60
61 /**
62  * Destroy the SimpleMessenger. Pretty simple since all the work is done
63  * elsewhere.
64  */
65 SimpleMessenger::~SimpleMessenger()
66 {
67   assert(!did_bind); // either we didn't bind or we shut down the Accepter
68   assert(rank_pipe.empty()); // we don't have any running Pipes.
69   assert(!reaper_started); // the reaper thread is stopped
70   ceph_spin_destroy(&global_seq_lock);
71 }
72
73 void SimpleMessenger::ready()
74 {
75   ldout(cct,10) << "ready " << get_myaddr() << dendl;
76   dispatch_queue.start();
77
78   lock.Lock();
79   if (did_bind)
80     accepter.start();
81   lock.Unlock();
82 }
83
84
85 int SimpleMessenger::shutdown()
86 {
87   ldout(cct,10) << "shutdown " << get_myaddr() << dendl;
88   mark_down_all();
89
90   // break ref cycles on the loopback connection
91   local_connection->set_priv(NULL);
92
93   lock.Lock();
94   stop_cond.Signal();
95   stopped = true;
96   lock.Unlock();
97
98   return 0;
99 }
100
101 int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest)
102 {
103   // set envelope
104   m->get_header().src = get_myname();
105   m->set_cct(cct);
106
107   if (!m->get_priority()) m->set_priority(get_default_send_priority());
108  
109   ldout(cct,1) <<"--> " << dest.name << " "
110           << dest.addr << " -- " << *m
111           << " -- ?+" << m->get_data().length()
112           << " " << m 
113           << dendl;
114
115   if (dest.addr == entity_addr_t()) {
116     ldout(cct,0) << "send_message message " << *m
117                  << " with empty dest " << dest.addr << dendl;
118     m->put();
119     return -EINVAL;
120   }
121
122   lock.Lock();
123   Pipe *pipe = _lookup_pipe(dest.addr);
124   submit_message(m, (pipe ? pipe->connection_state.get() : NULL),
125                  dest.addr, dest.name.type(), true);
126   lock.Unlock();
127   return 0;
128 }
129
130 int SimpleMessenger::_send_message(Message *m, Connection *con)
131 {
132   //set envelope
133   m->get_header().src = get_myname();
134
135   if (!m->get_priority()) m->set_priority(get_default_send_priority());
136
137   ldout(cct,1) << "--> " << con->get_peer_addr()
138       << " -- " << *m
139       << " -- ?+" << m->get_data().length()
140       << " " << m << " con " << con
141       << dendl;
142
143   submit_message(m, static_cast<PipeConnection*>(con),
144                  con->get_peer_addr(), con->get_peer_type(), false);
145   return 0;
146 }
147
148 /**
149  * If my_inst.addr doesn't have an IP set, this function
150  * will fill it in from the passed addr. Otherwise it does nothing and returns.
151  */
152 void SimpleMessenger::set_addr_unknowns(const entity_addr_t &addr)
153 {
154   if (my_inst.addr.is_blank_ip()) {
155     int port = my_inst.addr.get_port();
156     my_inst.addr.u = addr.u;
157     my_inst.addr.set_port(port);
158     init_local_connection();
159   }
160 }
161
162 void SimpleMessenger::set_addr(const entity_addr_t &addr)
163 {
164   entity_addr_t t = addr;
165   t.set_nonce(nonce);
166   set_myaddr(t);
167   init_local_connection();
168 }
169
170 int SimpleMessenger::get_proto_version(int peer_type, bool connect)
171 {
172   int my_type = my_inst.name.type();
173
174   // set reply protocol version
175   if (peer_type == my_type) {
176     // internal
177     return cluster_protocol;
178   } else {
179     // public
180     if (connect) {
181       switch (peer_type) {
182       case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
183       case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
184       case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
185       }
186     } else {
187       switch (my_type) {
188       case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
189       case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
190       case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
191       }
192     }
193   }
194   return 0;
195 }
196
197
198
199
200
201
202
203 /********************************************
204  * SimpleMessenger
205  */
206 #undef dout_prefix
207 #define dout_prefix _prefix(_dout, this)
208
209 void SimpleMessenger::reaper_entry()
210 {
211   ldout(cct,10) << "reaper_entry start" << dendl;
212   lock.Lock();
213   while (!reaper_stop) {
214     reaper();  // may drop and retake the lock
215     if (reaper_stop)
216       break;
217     reaper_cond.Wait(lock);
218   }
219   lock.Unlock();
220   ldout(cct,10) << "reaper_entry done" << dendl;
221 }
222
223 /*
224  * note: assumes lock is held
225  */
226 void SimpleMessenger::reaper()
227 {
228   ldout(cct,10) << "reaper" << dendl;
229   assert(lock.is_locked());
230
231   while (!pipe_reap_queue.empty()) {
232     Pipe *p = pipe_reap_queue.front();
233     pipe_reap_queue.pop_front();
234     ldout(cct,10) << "reaper reaping pipe " << p << " " <<
235       p->get_peer_addr() << dendl;
236     p->pipe_lock.Lock();
237     p->discard_out_queue();
238     if (p->connection_state) {
239       // mark_down, mark_down_all, or fault() should have done this,
240       // or accept() may have switch the Connection to a different
241       // Pipe... but make sure!
242       bool cleared = p->connection_state->clear_pipe(p);
243       assert(!cleared);
244     }
245     p->pipe_lock.Unlock();
246     p->unregister_pipe();
247     assert(pipes.count(p));
248     pipes.erase(p);
249
250     // drop msgr lock while joining thread; the delay through could be
251     // trying to fast dispatch, preventing it from joining without
252     // blocking and deadlocking.
253     lock.Unlock();
254     p->join();
255     lock.Lock();
256
257     if (p->sd >= 0)
258       ::close(p->sd);
259     ldout(cct,10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
260     p->put();
261     ldout(cct,10) << "reaper deleted pipe " << p << dendl;
262   }
263   ldout(cct,10) << "reaper done" << dendl;
264 }
265
266 void SimpleMessenger::queue_reap(Pipe *pipe)
267 {
268   ldout(cct,10) << "queue_reap " << pipe << dendl;
269   lock.Lock();
270   pipe_reap_queue.push_back(pipe);
271   reaper_cond.Signal();
272   lock.Unlock();
273 }
274
275 bool SimpleMessenger::is_connected(Connection *con)
276 {
277   bool r = false;
278   if (con) {
279     Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
280     if (p) {
281       assert(p->msgr == this);
282       r = p->is_connected();
283       p->put();
284     }
285   }
286   return r;
287 }
288
289 int SimpleMessenger::bind(const entity_addr_t &bind_addr)
290 {
291   lock.Lock();
292   if (started) {
293     ldout(cct,10) << "rank.bind already started" << dendl;
294     lock.Unlock();
295     return -1;
296   }
297   ldout(cct,10) << "rank.bind " << bind_addr << dendl;
298   lock.Unlock();
299
300   // bind to a socket
301   set<int> avoid_ports;
302   int r = accepter.bind(bind_addr, avoid_ports);
303   if (r >= 0)
304     did_bind = true;
305   return r;
306 }
307
308 int SimpleMessenger::rebind(const set<int>& avoid_ports)
309 {
310   ldout(cct,1) << "rebind avoid " << avoid_ports << dendl;
311   assert(did_bind);
312   accepter.stop();
313   mark_down_all();
314   return accepter.rebind(avoid_ports);
315 }
316
317
318 int SimpleMessenger::client_bind(const entity_addr_t &bind_addr)
319 {
320   if (!cct->_conf->ms_bind_before_connect)
321     return 0;
322   Mutex::Locker l(lock);
323   if (did_bind) {
324     assert(my_inst.addr == bind_addr);
325     return 0;
326   }
327   if (started) {
328     ldout(cct,10) << "rank.bind already started" << dendl;
329     return -1;
330   }
331   ldout(cct,10) << "rank.bind " << bind_addr << dendl;
332
333   set_myaddr(bind_addr);
334   return 0;
335 }
336
337
338 int SimpleMessenger::start()
339 {
340   lock.Lock();
341   ldout(cct,1) << "messenger.start" << dendl;
342
343   // register at least one entity, first!
344   assert(my_inst.name.type() >= 0);
345
346   assert(!started);
347   started = true;
348   stopped = false;
349
350   if (!did_bind) {
351     my_inst.addr.nonce = nonce;
352     init_local_connection();
353   }
354
355   lock.Unlock();
356
357   reaper_started = true;
358   reaper_thread.create("ms_reaper");
359   return 0;
360 }
361
362 Pipe *SimpleMessenger::add_accept_pipe(int sd)
363 {
364   lock.Lock();
365   Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
366   p->sd = sd;
367   p->pipe_lock.Lock();
368   p->start_reader();
369   p->pipe_lock.Unlock();
370   pipes.insert(p);
371   accepting_pipes.insert(p);
372   lock.Unlock();
373   return p;
374 }
375
376 /* connect_rank
377  * NOTE: assumes messenger.lock held.
378  */
379 Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
380                                     int type,
381                                     PipeConnection *con,
382                                     Message *first)
383 {
384   assert(lock.is_locked());
385   assert(addr != my_inst.addr);
386   
387   ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
388   
389   // create pipe
390   Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING,
391                         static_cast<PipeConnection*>(con));
392   pipe->pipe_lock.Lock();
393   pipe->set_peer_type(type);
394   pipe->set_peer_addr(addr);
395   pipe->policy = get_policy(type);
396   pipe->start_writer();
397   if (first)
398     pipe->_send(first);
399   pipe->pipe_lock.Unlock();
400   pipe->register_pipe();
401   pipes.insert(pipe);
402
403   return pipe;
404 }
405
406
407
408
409
410
411 AuthAuthorizer *SimpleMessenger::get_authorizer(int peer_type, bool force_new)
412 {
413   return ms_deliver_get_authorizer(peer_type, force_new);
414 }
415
416 bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type,
417                                         int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
418                                         bool& isvalid,CryptoKey& session_key)
419 {
420   return ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid,session_key);
421 }
422
423 ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest)
424 {
425   Mutex::Locker l(lock);
426   if (my_inst.addr == dest.addr) {
427     // local
428     return local_connection;
429   }
430
431   // remote
432   while (true) {
433     Pipe *pipe = _lookup_pipe(dest.addr);
434     if (pipe) {
435       ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
436     } else {
437       pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL);
438       ldout(cct, 10) << "get_connection " << dest << " new " << pipe << dendl;
439     }
440     Mutex::Locker l(pipe->pipe_lock);
441     if (pipe->connection_state)
442       return pipe->connection_state;
443     // we failed too quickly!  retry.  FIXME.
444   }
445 }
446
447 ConnectionRef SimpleMessenger::get_loopback_connection()
448 {
449   return local_connection;
450 }
451
452 void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
453                                      const entity_addr_t& dest_addr, int dest_type,
454                                      bool already_locked)
455 {
456   m->trace.event("simple submitting message");
457   if (cct->_conf->ms_dump_on_send) {
458     m->encode(-1, true);
459     ldout(cct, 0) << "submit_message " << *m << "\n";
460     m->get_payload().hexdump(*_dout);
461     if (m->get_data().length() > 0) {
462       *_dout << " data:\n";
463       m->get_data().hexdump(*_dout);
464     }
465     *_dout << dendl;
466     m->clear_payload();
467   }
468
469   // existing connection?
470   if (con) {
471     Pipe *pipe = NULL;
472     bool ok = static_cast<PipeConnection*>(con)->try_get_pipe(&pipe);
473     if (!ok) {
474       ldout(cct,0) << "submit_message " << *m << " remote, " << dest_addr
475                    << ", failed lossy con, dropping message " << m << dendl;
476       m->put();
477       return;
478     }
479     while (pipe && ok) {
480       // we loop in case of a racing reconnect, either from us or them
481       pipe->pipe_lock.Lock(); // can't use a Locker because of the Pipe ref
482       if (pipe->state != Pipe::STATE_CLOSED) {
483         ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
484         pipe->_send(m);
485         pipe->pipe_lock.Unlock();
486         pipe->put();
487         return;
488       }
489       Pipe *current_pipe;
490       ok = con->try_get_pipe(&current_pipe);
491       pipe->pipe_lock.Unlock();
492       if (current_pipe == pipe) {
493         ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr
494                       << ", had pipe " << pipe << ", but it closed." << dendl;
495         pipe->put();
496         current_pipe->put();
497         m->put();
498         return;
499       } else {
500         pipe->put();
501         pipe = current_pipe;
502       }
503     }
504   }
505
506   // local?
507   if (my_inst.addr == dest_addr) {
508     // local
509     ldout(cct,20) << "submit_message " << *m << " local" << dendl;
510     m->set_connection(local_connection.get());
511     dispatch_queue.local_delivery(m, m->get_priority());
512     return;
513   }
514
515   // remote, no existing pipe.
516   const Policy& policy = get_policy(dest_type);
517   if (policy.server) {
518     ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", lossy server for target type "
519                   << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
520     m->put();
521   } else {
522     ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
523     if (!already_locked) {
524       /** We couldn't handle the Message without reference to global data, so
525        *  grab the lock and do it again. If we got here, we know it's a non-lossy
526        *  Connection, so we can use our existing pointer without doing another lookup. */
527       Mutex::Locker l(lock);
528       submit_message(m, con, dest_addr, dest_type, true);
529     } else {
530       connect_rank(dest_addr, dest_type, static_cast<PipeConnection*>(con), m);
531     }
532   }
533 }
534
535 int SimpleMessenger::send_keepalive(Connection *con)
536 {
537   int ret = 0;
538   Pipe *pipe = static_cast<Pipe *>(
539     static_cast<PipeConnection*>(con)->get_pipe());
540   if (pipe) {
541     ldout(cct,20) << "send_keepalive con " << con << ", have pipe." << dendl;
542     assert(pipe->msgr == this);
543     pipe->pipe_lock.Lock();
544     pipe->_send_keepalive();
545     pipe->pipe_lock.Unlock();
546     pipe->put();
547   } else {
548     ldout(cct,0) << "send_keepalive con " << con << ", no pipe." << dendl;
549     ret = -EPIPE;
550   }
551   return ret;
552 }
553
554
555
556 void SimpleMessenger::wait()
557 {
558   lock.Lock();
559   if (!started) {
560     lock.Unlock();
561     return;
562   }
563   if (!stopped)
564     stop_cond.Wait(lock);
565
566   lock.Unlock();
567
568   // done!  clean up.
569   if (did_bind) {
570     ldout(cct,20) << "wait: stopping accepter thread" << dendl;
571     accepter.stop();
572     did_bind = false;
573     ldout(cct,20) << "wait: stopped accepter thread" << dendl;
574   }
575
576   dispatch_queue.shutdown();
577   if (dispatch_queue.is_started()) {
578     ldout(cct,10) << "wait: waiting for dispatch queue" << dendl;
579     dispatch_queue.wait();
580     dispatch_queue.discard_local();
581     ldout(cct,10) << "wait: dispatch queue is stopped" << dendl;
582   }
583
584   if (reaper_started) {
585     ldout(cct,20) << "wait: stopping reaper thread" << dendl;
586     lock.Lock();
587     reaper_cond.Signal();
588     reaper_stop = true;
589     lock.Unlock();
590     reaper_thread.join();
591     reaper_started = false;
592     ldout(cct,20) << "wait: stopped reaper thread" << dendl;
593   }
594
595   // close+reap all pipes
596   lock.Lock();
597   {
598     ldout(cct,10) << "wait: closing pipes" << dendl;
599
600     while (!rank_pipe.empty()) {
601       Pipe *p = rank_pipe.begin()->second;
602       p->unregister_pipe();
603       p->pipe_lock.Lock();
604       p->stop_and_wait();
605       // don't generate an event here; we're shutting down anyway.
606       PipeConnectionRef con = p->connection_state;
607       if (con)
608         con->clear_pipe(p);
609       p->pipe_lock.Unlock();
610     }
611
612     reaper();
613     ldout(cct,10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
614     while (!pipes.empty()) {
615       reaper_cond.Wait(lock);
616       reaper();
617     }
618   }
619   lock.Unlock();
620
621   ldout(cct,10) << "wait: done." << dendl;
622   ldout(cct,1) << "shutdown complete." << dendl;
623   started = false;
624 }
625
626
627 void SimpleMessenger::mark_down_all()
628 {
629   ldout(cct,1) << "mark_down_all" << dendl;
630   lock.Lock();
631   for (set<Pipe*>::iterator q = accepting_pipes.begin(); q != accepting_pipes.end(); ++q) {
632     Pipe *p = *q;
633     ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl;
634     p->pipe_lock.Lock();
635     p->stop();
636     PipeConnectionRef con = p->connection_state;
637     if (con && con->clear_pipe(p))
638       dispatch_queue.queue_reset(con.get());
639     p->pipe_lock.Unlock();
640   }
641   accepting_pipes.clear();
642
643   while (!rank_pipe.empty()) {
644     ceph::unordered_map<entity_addr_t,Pipe*>::iterator it = rank_pipe.begin();
645     Pipe *p = it->second;
646     ldout(cct,5) << "mark_down_all " << it->first << " " << p << dendl;
647     rank_pipe.erase(it);
648     p->unregister_pipe();
649     p->pipe_lock.Lock();
650     p->stop();
651     PipeConnectionRef con = p->connection_state;
652     if (con && con->clear_pipe(p))
653       dispatch_queue.queue_reset(con.get());
654     p->pipe_lock.Unlock();
655   }
656   lock.Unlock();
657 }
658
659 void SimpleMessenger::mark_down(const entity_addr_t& addr)
660 {
661   lock.Lock();
662   Pipe *p = _lookup_pipe(addr);
663   if (p) {
664     ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl;
665     p->unregister_pipe();
666     p->pipe_lock.Lock();
667     p->stop();
668     if (p->connection_state) {
669       // generate a reset event for the caller in this case, even
670       // though they asked for it, since this is the addr-based (and
671       // not Connection* based) interface
672       PipeConnectionRef con = p->connection_state;
673       if (con && con->clear_pipe(p))
674         dispatch_queue.queue_reset(con.get());
675     }
676     p->pipe_lock.Unlock();
677   } else {
678     ldout(cct,1) << "mark_down " << addr << " -- pipe dne" << dendl;
679   }
680   lock.Unlock();
681 }
682
683 void SimpleMessenger::mark_down(Connection *con)
684 {
685   if (con == NULL)
686     return;
687   lock.Lock();
688   Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
689   if (p) {
690     ldout(cct,1) << "mark_down " << con << " -- " << p << dendl;
691     assert(p->msgr == this);
692     p->unregister_pipe();
693     p->pipe_lock.Lock();
694     p->stop();
695     if (p->connection_state) {
696       // do not generate a reset event for the caller in this case,
697       // since they asked for it.
698       p->connection_state->clear_pipe(p);
699     }
700     p->pipe_lock.Unlock();
701     p->put();
702   } else {
703     ldout(cct,1) << "mark_down " << con << " -- pipe dne" << dendl;
704   }
705   lock.Unlock();
706 }
707
708 void SimpleMessenger::mark_disposable(Connection *con)
709 {
710   lock.Lock();
711   Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
712   if (p) {
713     ldout(cct,1) << "mark_disposable " << con << " -- " << p << dendl;
714     assert(p->msgr == this);
715     p->pipe_lock.Lock();
716     p->policy.lossy = true;
717     p->pipe_lock.Unlock();
718     p->put();
719   } else {
720     ldout(cct,1) << "mark_disposable " << con << " -- pipe dne" << dendl;
721   }
722   lock.Unlock();
723 }
724
725 void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
726 {
727   // be careful here: multiple threads may block here, and readers of
728   // my_inst.addr do NOT hold any lock.
729
730   // this always goes from true -> false under the protection of the
731   // mutex.  if it is already false, we need not retake the mutex at
732   // all.
733   if (!need_addr)
734     return;
735
736   lock.Lock();
737   if (need_addr) {
738     entity_addr_t t = peer_addr_for_me;
739     t.set_port(my_inst.addr.get_port());
740     t.set_nonce(my_inst.addr.get_nonce());
741     ANNOTATE_BENIGN_RACE_SIZED(&my_inst.addr, sizeof(my_inst.addr),
742                                "SimpleMessenger learned addr");
743     my_inst.addr = t;
744     ldout(cct,1) << "learned my addr " << my_inst.addr << dendl;
745     need_addr = false;
746     init_local_connection();
747   }
748   lock.Unlock();
749 }
750
751 void SimpleMessenger::init_local_connection()
752 {
753   local_connection->peer_addr = my_inst.addr;
754   local_connection->peer_type = my_inst.name.type();
755   local_connection->set_features(CEPH_FEATURES_ALL);
756   ms_deliver_handle_fast_connect(local_connection.get());
757 }