remove ceph code
[stor4nfv.git] / src / ceph / src / msg / simple / SimpleMessenger.cc
diff --git a/src/ceph/src/msg/simple/SimpleMessenger.cc b/src/ceph/src/msg/simple/SimpleMessenger.cc
deleted file mode 100644 (file)
index 78e190d..0000000
+++ /dev/null
@@ -1,757 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-#include <errno.h>
-#include <iostream>
-#include <fstream>
-
-
-#include "SimpleMessenger.h"
-
-#include "common/config.h"
-#include "common/Timer.h"
-#include "common/errno.h"
-#include "common/valgrind.h"
-#include "auth/Crypto.h"
-#include "include/Spinlock.h"
-
-#define dout_subsys ceph_subsys_ms
-#undef dout_prefix
-#define dout_prefix _prefix(_dout, this)
-static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) {
-  return *_dout << "-- " << msgr->get_myaddr() << " ";
-}
-
-
-/*******************
- * SimpleMessenger
- */
-
-SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
-                                string mname, uint64_t _nonce)
-  : SimplePolicyMessenger(cct, name,mname, _nonce),
-    accepter(this, _nonce),
-    dispatch_queue(cct, this, mname),
-    reaper_thread(this),
-    nonce(_nonce),
-    lock("SimpleMessenger::lock"), need_addr(true), did_bind(false),
-    global_seq(0),
-    cluster_protocol(0),
-    reaper_started(false), reaper_stop(false),
-    timeout(0),
-    local_connection(new PipeConnection(cct, this))
-{
-  ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout),
-                             "SimpleMessenger read timeout");
-  ceph_spin_init(&global_seq_lock);
-  init_local_connection();
-}
-
-/**
- * Destroy the SimpleMessenger. Pretty simple since all the work is done
- * elsewhere.
- */
-SimpleMessenger::~SimpleMessenger()
-{
-  assert(!did_bind); // either we didn't bind or we shut down the Accepter
-  assert(rank_pipe.empty()); // we don't have any running Pipes.
-  assert(!reaper_started); // the reaper thread is stopped
-  ceph_spin_destroy(&global_seq_lock);
-}
-
-void SimpleMessenger::ready()
-{
-  ldout(cct,10) << "ready " << get_myaddr() << dendl;
-  dispatch_queue.start();
-
-  lock.Lock();
-  if (did_bind)
-    accepter.start();
-  lock.Unlock();
-}
-
-
-int SimpleMessenger::shutdown()
-{
-  ldout(cct,10) << "shutdown " << get_myaddr() << dendl;
-  mark_down_all();
-
-  // break ref cycles on the loopback connection
-  local_connection->set_priv(NULL);
-
-  lock.Lock();
-  stop_cond.Signal();
-  stopped = true;
-  lock.Unlock();
-
-  return 0;
-}
-
-int SimpleMessenger::_send_message(Message *m, const entity_inst_t& dest)
-{
-  // set envelope
-  m->get_header().src = get_myname();
-  m->set_cct(cct);
-
-  if (!m->get_priority()) m->set_priority(get_default_send_priority());
-  ldout(cct,1) <<"--> " << dest.name << " "
-          << dest.addr << " -- " << *m
-         << " -- ?+" << m->get_data().length()
-         << " " << m 
-         << dendl;
-
-  if (dest.addr == entity_addr_t()) {
-    ldout(cct,0) << "send_message message " << *m
-                 << " with empty dest " << dest.addr << dendl;
-    m->put();
-    return -EINVAL;
-  }
-
-  lock.Lock();
-  Pipe *pipe = _lookup_pipe(dest.addr);
-  submit_message(m, (pipe ? pipe->connection_state.get() : NULL),
-                 dest.addr, dest.name.type(), true);
-  lock.Unlock();
-  return 0;
-}
-
-int SimpleMessenger::_send_message(Message *m, Connection *con)
-{
-  //set envelope
-  m->get_header().src = get_myname();
-
-  if (!m->get_priority()) m->set_priority(get_default_send_priority());
-
-  ldout(cct,1) << "--> " << con->get_peer_addr()
-      << " -- " << *m
-      << " -- ?+" << m->get_data().length()
-      << " " << m << " con " << con
-      << dendl;
-
-  submit_message(m, static_cast<PipeConnection*>(con),
-                con->get_peer_addr(), con->get_peer_type(), false);
-  return 0;
-}
-
-/**
- * If my_inst.addr doesn't have an IP set, this function
- * will fill it in from the passed addr. Otherwise it does nothing and returns.
- */
-void SimpleMessenger::set_addr_unknowns(const entity_addr_t &addr)
-{
-  if (my_inst.addr.is_blank_ip()) {
-    int port = my_inst.addr.get_port();
-    my_inst.addr.u = addr.u;
-    my_inst.addr.set_port(port);
-    init_local_connection();
-  }
-}
-
-void SimpleMessenger::set_addr(const entity_addr_t &addr)
-{
-  entity_addr_t t = addr;
-  t.set_nonce(nonce);
-  set_myaddr(t);
-  init_local_connection();
-}
-
-int SimpleMessenger::get_proto_version(int peer_type, bool connect)
-{
-  int my_type = my_inst.name.type();
-
-  // set reply protocol version
-  if (peer_type == my_type) {
-    // internal
-    return cluster_protocol;
-  } else {
-    // public
-    if (connect) {
-      switch (peer_type) {
-      case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
-      case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
-      case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
-      }
-    } else {
-      switch (my_type) {
-      case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
-      case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
-      case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
-      }
-    }
-  }
-  return 0;
-}
-
-
-
-
-
-
-
-/********************************************
- * SimpleMessenger
- */
-#undef dout_prefix
-#define dout_prefix _prefix(_dout, this)
-
-void SimpleMessenger::reaper_entry()
-{
-  ldout(cct,10) << "reaper_entry start" << dendl;
-  lock.Lock();
-  while (!reaper_stop) {
-    reaper();  // may drop and retake the lock
-    if (reaper_stop)
-      break;
-    reaper_cond.Wait(lock);
-  }
-  lock.Unlock();
-  ldout(cct,10) << "reaper_entry done" << dendl;
-}
-
-/*
- * note: assumes lock is held
- */
-void SimpleMessenger::reaper()
-{
-  ldout(cct,10) << "reaper" << dendl;
-  assert(lock.is_locked());
-
-  while (!pipe_reap_queue.empty()) {
-    Pipe *p = pipe_reap_queue.front();
-    pipe_reap_queue.pop_front();
-    ldout(cct,10) << "reaper reaping pipe " << p << " " <<
-      p->get_peer_addr() << dendl;
-    p->pipe_lock.Lock();
-    p->discard_out_queue();
-    if (p->connection_state) {
-      // mark_down, mark_down_all, or fault() should have done this,
-      // or accept() may have switch the Connection to a different
-      // Pipe... but make sure!
-      bool cleared = p->connection_state->clear_pipe(p);
-      assert(!cleared);
-    }
-    p->pipe_lock.Unlock();
-    p->unregister_pipe();
-    assert(pipes.count(p));
-    pipes.erase(p);
-
-    // drop msgr lock while joining thread; the delay through could be
-    // trying to fast dispatch, preventing it from joining without
-    // blocking and deadlocking.
-    lock.Unlock();
-    p->join();
-    lock.Lock();
-
-    if (p->sd >= 0)
-      ::close(p->sd);
-    ldout(cct,10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
-    p->put();
-    ldout(cct,10) << "reaper deleted pipe " << p << dendl;
-  }
-  ldout(cct,10) << "reaper done" << dendl;
-}
-
-void SimpleMessenger::queue_reap(Pipe *pipe)
-{
-  ldout(cct,10) << "queue_reap " << pipe << dendl;
-  lock.Lock();
-  pipe_reap_queue.push_back(pipe);
-  reaper_cond.Signal();
-  lock.Unlock();
-}
-
-bool SimpleMessenger::is_connected(Connection *con)
-{
-  bool r = false;
-  if (con) {
-    Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
-    if (p) {
-      assert(p->msgr == this);
-      r = p->is_connected();
-      p->put();
-    }
-  }
-  return r;
-}
-
-int SimpleMessenger::bind(const entity_addr_t &bind_addr)
-{
-  lock.Lock();
-  if (started) {
-    ldout(cct,10) << "rank.bind already started" << dendl;
-    lock.Unlock();
-    return -1;
-  }
-  ldout(cct,10) << "rank.bind " << bind_addr << dendl;
-  lock.Unlock();
-
-  // bind to a socket
-  set<int> avoid_ports;
-  int r = accepter.bind(bind_addr, avoid_ports);
-  if (r >= 0)
-    did_bind = true;
-  return r;
-}
-
-int SimpleMessenger::rebind(const set<int>& avoid_ports)
-{
-  ldout(cct,1) << "rebind avoid " << avoid_ports << dendl;
-  assert(did_bind);
-  accepter.stop();
-  mark_down_all();
-  return accepter.rebind(avoid_ports);
-}
-
-
-int SimpleMessenger::client_bind(const entity_addr_t &bind_addr)
-{
-  if (!cct->_conf->ms_bind_before_connect)
-    return 0;
-  Mutex::Locker l(lock);
-  if (did_bind) {
-    assert(my_inst.addr == bind_addr);
-    return 0;
-  }
-  if (started) {
-    ldout(cct,10) << "rank.bind already started" << dendl;
-    return -1;
-  }
-  ldout(cct,10) << "rank.bind " << bind_addr << dendl;
-
-  set_myaddr(bind_addr);
-  return 0;
-}
-
-
-int SimpleMessenger::start()
-{
-  lock.Lock();
-  ldout(cct,1) << "messenger.start" << dendl;
-
-  // register at least one entity, first!
-  assert(my_inst.name.type() >= 0);
-
-  assert(!started);
-  started = true;
-  stopped = false;
-
-  if (!did_bind) {
-    my_inst.addr.nonce = nonce;
-    init_local_connection();
-  }
-
-  lock.Unlock();
-
-  reaper_started = true;
-  reaper_thread.create("ms_reaper");
-  return 0;
-}
-
-Pipe *SimpleMessenger::add_accept_pipe(int sd)
-{
-  lock.Lock();
-  Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
-  p->sd = sd;
-  p->pipe_lock.Lock();
-  p->start_reader();
-  p->pipe_lock.Unlock();
-  pipes.insert(p);
-  accepting_pipes.insert(p);
-  lock.Unlock();
-  return p;
-}
-
-/* connect_rank
- * NOTE: assumes messenger.lock held.
- */
-Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
-                                   int type,
-                                   PipeConnection *con,
-                                   Message *first)
-{
-  assert(lock.is_locked());
-  assert(addr != my_inst.addr);
-  
-  ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
-  
-  // create pipe
-  Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING,
-                       static_cast<PipeConnection*>(con));
-  pipe->pipe_lock.Lock();
-  pipe->set_peer_type(type);
-  pipe->set_peer_addr(addr);
-  pipe->policy = get_policy(type);
-  pipe->start_writer();
-  if (first)
-    pipe->_send(first);
-  pipe->pipe_lock.Unlock();
-  pipe->register_pipe();
-  pipes.insert(pipe);
-
-  return pipe;
-}
-
-
-
-
-
-
-AuthAuthorizer *SimpleMessenger::get_authorizer(int peer_type, bool force_new)
-{
-  return ms_deliver_get_authorizer(peer_type, force_new);
-}
-
-bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type,
-                                       int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
-                                       bool& isvalid,CryptoKey& session_key)
-{
-  return ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid,session_key);
-}
-
-ConnectionRef SimpleMessenger::get_connection(const entity_inst_t& dest)
-{
-  Mutex::Locker l(lock);
-  if (my_inst.addr == dest.addr) {
-    // local
-    return local_connection;
-  }
-
-  // remote
-  while (true) {
-    Pipe *pipe = _lookup_pipe(dest.addr);
-    if (pipe) {
-      ldout(cct, 10) << "get_connection " << dest << " existing " << pipe << dendl;
-    } else {
-      pipe = connect_rank(dest.addr, dest.name.type(), NULL, NULL);
-      ldout(cct, 10) << "get_connection " << dest << " new " << pipe << dendl;
-    }
-    Mutex::Locker l(pipe->pipe_lock);
-    if (pipe->connection_state)
-      return pipe->connection_state;
-    // we failed too quickly!  retry.  FIXME.
-  }
-}
-
-ConnectionRef SimpleMessenger::get_loopback_connection()
-{
-  return local_connection;
-}
-
-void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
-                                    const entity_addr_t& dest_addr, int dest_type,
-                                    bool already_locked)
-{
-  m->trace.event("simple submitting message");
-  if (cct->_conf->ms_dump_on_send) {
-    m->encode(-1, true);
-    ldout(cct, 0) << "submit_message " << *m << "\n";
-    m->get_payload().hexdump(*_dout);
-    if (m->get_data().length() > 0) {
-      *_dout << " data:\n";
-      m->get_data().hexdump(*_dout);
-    }
-    *_dout << dendl;
-    m->clear_payload();
-  }
-
-  // existing connection?
-  if (con) {
-    Pipe *pipe = NULL;
-    bool ok = static_cast<PipeConnection*>(con)->try_get_pipe(&pipe);
-    if (!ok) {
-      ldout(cct,0) << "submit_message " << *m << " remote, " << dest_addr
-                  << ", failed lossy con, dropping message " << m << dendl;
-      m->put();
-      return;
-    }
-    while (pipe && ok) {
-      // we loop in case of a racing reconnect, either from us or them
-      pipe->pipe_lock.Lock(); // can't use a Locker because of the Pipe ref
-      if (pipe->state != Pipe::STATE_CLOSED) {
-       ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
-       pipe->_send(m);
-       pipe->pipe_lock.Unlock();
-       pipe->put();
-       return;
-      }
-      Pipe *current_pipe;
-      ok = con->try_get_pipe(&current_pipe);
-      pipe->pipe_lock.Unlock();
-      if (current_pipe == pipe) {
-       ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr
-                     << ", had pipe " << pipe << ", but it closed." << dendl;
-       pipe->put();
-       current_pipe->put();
-       m->put();
-       return;
-      } else {
-       pipe->put();
-       pipe = current_pipe;
-      }
-    }
-  }
-
-  // local?
-  if (my_inst.addr == dest_addr) {
-    // local
-    ldout(cct,20) << "submit_message " << *m << " local" << dendl;
-    m->set_connection(local_connection.get());
-    dispatch_queue.local_delivery(m, m->get_priority());
-    return;
-  }
-
-  // remote, no existing pipe.
-  const Policy& policy = get_policy(dest_type);
-  if (policy.server) {
-    ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", lossy server for target type "
-                 << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl;
-    m->put();
-  } else {
-    ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
-    if (!already_locked) {
-      /** We couldn't handle the Message without reference to global data, so
-       *  grab the lock and do it again. If we got here, we know it's a non-lossy
-       *  Connection, so we can use our existing pointer without doing another lookup. */
-      Mutex::Locker l(lock);
-      submit_message(m, con, dest_addr, dest_type, true);
-    } else {
-      connect_rank(dest_addr, dest_type, static_cast<PipeConnection*>(con), m);
-    }
-  }
-}
-
-int SimpleMessenger::send_keepalive(Connection *con)
-{
-  int ret = 0;
-  Pipe *pipe = static_cast<Pipe *>(
-    static_cast<PipeConnection*>(con)->get_pipe());
-  if (pipe) {
-    ldout(cct,20) << "send_keepalive con " << con << ", have pipe." << dendl;
-    assert(pipe->msgr == this);
-    pipe->pipe_lock.Lock();
-    pipe->_send_keepalive();
-    pipe->pipe_lock.Unlock();
-    pipe->put();
-  } else {
-    ldout(cct,0) << "send_keepalive con " << con << ", no pipe." << dendl;
-    ret = -EPIPE;
-  }
-  return ret;
-}
-
-
-
-void SimpleMessenger::wait()
-{
-  lock.Lock();
-  if (!started) {
-    lock.Unlock();
-    return;
-  }
-  if (!stopped)
-    stop_cond.Wait(lock);
-
-  lock.Unlock();
-
-  // done!  clean up.
-  if (did_bind) {
-    ldout(cct,20) << "wait: stopping accepter thread" << dendl;
-    accepter.stop();
-    did_bind = false;
-    ldout(cct,20) << "wait: stopped accepter thread" << dendl;
-  }
-
-  dispatch_queue.shutdown();
-  if (dispatch_queue.is_started()) {
-    ldout(cct,10) << "wait: waiting for dispatch queue" << dendl;
-    dispatch_queue.wait();
-    dispatch_queue.discard_local();
-    ldout(cct,10) << "wait: dispatch queue is stopped" << dendl;
-  }
-
-  if (reaper_started) {
-    ldout(cct,20) << "wait: stopping reaper thread" << dendl;
-    lock.Lock();
-    reaper_cond.Signal();
-    reaper_stop = true;
-    lock.Unlock();
-    reaper_thread.join();
-    reaper_started = false;
-    ldout(cct,20) << "wait: stopped reaper thread" << dendl;
-  }
-
-  // close+reap all pipes
-  lock.Lock();
-  {
-    ldout(cct,10) << "wait: closing pipes" << dendl;
-
-    while (!rank_pipe.empty()) {
-      Pipe *p = rank_pipe.begin()->second;
-      p->unregister_pipe();
-      p->pipe_lock.Lock();
-      p->stop_and_wait();
-      // don't generate an event here; we're shutting down anyway.
-      PipeConnectionRef con = p->connection_state;
-      if (con)
-       con->clear_pipe(p);
-      p->pipe_lock.Unlock();
-    }
-
-    reaper();
-    ldout(cct,10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
-    while (!pipes.empty()) {
-      reaper_cond.Wait(lock);
-      reaper();
-    }
-  }
-  lock.Unlock();
-
-  ldout(cct,10) << "wait: done." << dendl;
-  ldout(cct,1) << "shutdown complete." << dendl;
-  started = false;
-}
-
-
-void SimpleMessenger::mark_down_all()
-{
-  ldout(cct,1) << "mark_down_all" << dendl;
-  lock.Lock();
-  for (set<Pipe*>::iterator q = accepting_pipes.begin(); q != accepting_pipes.end(); ++q) {
-    Pipe *p = *q;
-    ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl;
-    p->pipe_lock.Lock();
-    p->stop();
-    PipeConnectionRef con = p->connection_state;
-    if (con && con->clear_pipe(p))
-      dispatch_queue.queue_reset(con.get());
-    p->pipe_lock.Unlock();
-  }
-  accepting_pipes.clear();
-
-  while (!rank_pipe.empty()) {
-    ceph::unordered_map<entity_addr_t,Pipe*>::iterator it = rank_pipe.begin();
-    Pipe *p = it->second;
-    ldout(cct,5) << "mark_down_all " << it->first << " " << p << dendl;
-    rank_pipe.erase(it);
-    p->unregister_pipe();
-    p->pipe_lock.Lock();
-    p->stop();
-    PipeConnectionRef con = p->connection_state;
-    if (con && con->clear_pipe(p))
-      dispatch_queue.queue_reset(con.get());
-    p->pipe_lock.Unlock();
-  }
-  lock.Unlock();
-}
-
-void SimpleMessenger::mark_down(const entity_addr_t& addr)
-{
-  lock.Lock();
-  Pipe *p = _lookup_pipe(addr);
-  if (p) {
-    ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl;
-    p->unregister_pipe();
-    p->pipe_lock.Lock();
-    p->stop();
-    if (p->connection_state) {
-      // generate a reset event for the caller in this case, even
-      // though they asked for it, since this is the addr-based (and
-      // not Connection* based) interface
-      PipeConnectionRef con = p->connection_state;
-      if (con && con->clear_pipe(p))
-       dispatch_queue.queue_reset(con.get());
-    }
-    p->pipe_lock.Unlock();
-  } else {
-    ldout(cct,1) << "mark_down " << addr << " -- pipe dne" << dendl;
-  }
-  lock.Unlock();
-}
-
-void SimpleMessenger::mark_down(Connection *con)
-{
-  if (con == NULL)
-    return;
-  lock.Lock();
-  Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
-  if (p) {
-    ldout(cct,1) << "mark_down " << con << " -- " << p << dendl;
-    assert(p->msgr == this);
-    p->unregister_pipe();
-    p->pipe_lock.Lock();
-    p->stop();
-    if (p->connection_state) {
-      // do not generate a reset event for the caller in this case,
-      // since they asked for it.
-      p->connection_state->clear_pipe(p);
-    }
-    p->pipe_lock.Unlock();
-    p->put();
-  } else {
-    ldout(cct,1) << "mark_down " << con << " -- pipe dne" << dendl;
-  }
-  lock.Unlock();
-}
-
-void SimpleMessenger::mark_disposable(Connection *con)
-{
-  lock.Lock();
-  Pipe *p = static_cast<Pipe *>(static_cast<PipeConnection*>(con)->get_pipe());
-  if (p) {
-    ldout(cct,1) << "mark_disposable " << con << " -- " << p << dendl;
-    assert(p->msgr == this);
-    p->pipe_lock.Lock();
-    p->policy.lossy = true;
-    p->pipe_lock.Unlock();
-    p->put();
-  } else {
-    ldout(cct,1) << "mark_disposable " << con << " -- pipe dne" << dendl;
-  }
-  lock.Unlock();
-}
-
-void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
-{
-  // be careful here: multiple threads may block here, and readers of
-  // my_inst.addr do NOT hold any lock.
-
-  // this always goes from true -> false under the protection of the
-  // mutex.  if it is already false, we need not retake the mutex at
-  // all.
-  if (!need_addr)
-    return;
-
-  lock.Lock();
-  if (need_addr) {
-    entity_addr_t t = peer_addr_for_me;
-    t.set_port(my_inst.addr.get_port());
-    t.set_nonce(my_inst.addr.get_nonce());
-    ANNOTATE_BENIGN_RACE_SIZED(&my_inst.addr, sizeof(my_inst.addr),
-                               "SimpleMessenger learned addr");
-    my_inst.addr = t;
-    ldout(cct,1) << "learned my addr " << my_inst.addr << dendl;
-    need_addr = false;
-    init_local_connection();
-  }
-  lock.Unlock();
-}
-
-void SimpleMessenger::init_local_connection()
-{
-  local_connection->peer_addr = my_inst.addr;
-  local_connection->peer_type = my_inst.name.type();
-  local_connection->set_features(CEPH_FEATURES_ALL);
-  ms_deliver_handle_fast_connect(local_connection.get());
-}