initial code repo
[stor4nfv.git] / src / ceph / src / msg / simple / Pipe.cc
diff --git a/src/ceph/src/msg/simple/Pipe.cc b/src/ceph/src/msg/simple/Pipe.cc
new file mode 100644 (file)
index 0000000..848efd4
--- /dev/null
@@ -0,0 +1,2678 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+#include <sys/uio.h>
+#include <limits.h>
+#include <poll.h>
+
+#include "msg/Message.h"
+#include "Pipe.h"
+#include "SimpleMessenger.h"
+
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/valgrind.h"
+
+// Below included to get encode_encrypt(); That probably should be in Crypto.h, instead
+
+#include "auth/Crypto.h"
+#include "auth/cephx/CephxProtocol.h"
+#include "auth/AuthSessionHandler.h"
+
+#include "include/sock_compat.h"
+
+// Constant to limit starting sequence number to 2^31.  Nothing special about it, just a big number.  PLR
+#define SEQ_MASK  0x7fffffff 
+#define dout_subsys ceph_subsys_ms
+
+#undef dout_prefix
+#define dout_prefix *_dout << *this
+ostream& Pipe::_pipe_prefix(std::ostream &out) const {
+  return out << "-- " << msgr->get_myinst().addr << " >> " << peer_addr << " pipe(" << this
+            << " sd=" << sd << " :" << port
+             << " s=" << state
+             << " pgs=" << peer_global_seq
+             << " cs=" << connect_seq
+             << " l=" << policy.lossy
+             << " c=" << connection_state
+             << ").";
+}
+
+ostream& operator<<(ostream &out, const Pipe &pipe) {
+  return pipe._pipe_prefix(out);
+}
+
+/**
+ * The DelayedDelivery is for injecting delays into Message delivery off
+ * the socket. It is only enabled if delays are requested, and if they
+ * are then it pulls Messages off the DelayQueue and puts them into the
+ * in_q (SimpleMessenger::dispatch_queue).
+ * Please note that this probably has issues with Pipe shutdown and
+ * replacement semantics. I've tried, but no guarantees.
+ */
+class Pipe::DelayedDelivery: public Thread {
+  Pipe *pipe;
+  std::deque< pair<utime_t,Message*> > delay_queue;
+  Mutex delay_lock;
+  Cond delay_cond;
+  int flush_count;
+  bool active_flush;
+  bool stop_delayed_delivery;
+  bool delay_dispatching; // we are in fast dispatch now
+  bool stop_fast_dispatching_flag; // we need to stop fast dispatching
+
+public:
+  explicit DelayedDelivery(Pipe *p)
+    : pipe(p),
+      delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
+      active_flush(false),
+      stop_delayed_delivery(false),
+      delay_dispatching(false),
+      stop_fast_dispatching_flag(false) { }
+  ~DelayedDelivery() override {
+    discard();
+  }
+  void *entry() override;
+  void queue(utime_t release, Message *m) {
+    Mutex::Locker l(delay_lock);
+    delay_queue.push_back(make_pair(release, m));
+    delay_cond.Signal();
+  }
+  void discard();
+  void flush();
+  bool is_flushing() {
+    Mutex::Locker l(delay_lock);
+    return flush_count > 0 || active_flush;
+  }
+  void wait_for_flush() {
+    Mutex::Locker l(delay_lock);
+    while (flush_count > 0 || active_flush)
+      delay_cond.Wait(delay_lock);
+  }
+  void stop() {
+    delay_lock.Lock();
+    stop_delayed_delivery = true;
+    delay_cond.Signal();
+    delay_lock.Unlock();
+  }
+  void steal_for_pipe(Pipe *new_owner) {
+    Mutex::Locker l(delay_lock);
+    pipe = new_owner;
+  }
+  /**
+   * We need to stop fast dispatching before we need to stop putting
+   * normal messages into the DispatchQueue.
+   */
+  void stop_fast_dispatching();
+};
+
+/**************************************
+ * Pipe
+ */
+
+Pipe::Pipe(SimpleMessenger *r, int st, PipeConnection *con)
+  : RefCountedObject(r->cct),
+    reader_thread(this),
+    writer_thread(this),
+    delay_thread(NULL),
+    msgr(r),
+    conn_id(r->dispatch_queue.get_id()),
+    recv_ofs(0),
+    recv_len(0),
+    sd(-1), port(0),
+    peer_type(-1),
+    pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
+    state(st),
+    connection_state(NULL),
+    reader_running(false), reader_needs_join(false),
+    reader_dispatching(false), notify_on_dispatch_done(false),
+    writer_running(false),
+    in_q(&(r->dispatch_queue)),
+    send_keepalive(false),
+    send_keepalive_ack(false),
+    connect_seq(0), peer_global_seq(0),
+    out_seq(0), in_seq(0), in_seq_acked(0) {
+  ANNOTATE_BENIGN_RACE_SIZED(&sd, sizeof(sd), "Pipe socket");
+  ANNOTATE_BENIGN_RACE_SIZED(&state, sizeof(state), "Pipe state");
+  ANNOTATE_BENIGN_RACE_SIZED(&recv_len, sizeof(recv_len), "Pipe recv_len");
+  ANNOTATE_BENIGN_RACE_SIZED(&recv_ofs, sizeof(recv_ofs), "Pipe recv_ofs");
+  if (con) {
+    connection_state = con;
+    connection_state->reset_pipe(this);
+  } else {
+    connection_state = new PipeConnection(msgr->cct, msgr);
+    connection_state->pipe = get();
+  }
+
+  if (randomize_out_seq()) {
+    lsubdout(msgr->cct,ms,15) << "Pipe(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
+  }
+    
+
+  msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
+  if (msgr->timeout == 0)
+    msgr->timeout = -1;
+
+  recv_max_prefetch = msgr->cct->_conf->ms_tcp_prefetch_max_size;
+  recv_buf = new char[recv_max_prefetch];
+}
+
+Pipe::~Pipe()
+{
+  assert(out_q.empty());
+  assert(sent.empty());
+  delete delay_thread;
+  delete[] recv_buf;
+}
+
+void Pipe::handle_ack(uint64_t seq)
+{
+  lsubdout(msgr->cct, ms, 15) << "reader got ack seq " << seq << dendl;
+  // trim sent list
+  while (!sent.empty() &&
+        sent.front()->get_seq() <= seq) {
+    Message *m = sent.front();
+    sent.pop_front();
+    lsubdout(msgr->cct, ms, 10) << "reader got ack seq "
+                               << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
+    m->put();
+  }
+}
+
+void Pipe::start_reader()
+{
+  assert(pipe_lock.is_locked());
+  assert(!reader_running);
+  if (reader_needs_join) {
+    reader_thread.join();
+    reader_needs_join = false;
+  }
+  reader_running = true;
+  reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
+}
+
+void Pipe::maybe_start_delay_thread()
+{
+  if (!delay_thread) {
+    auto pos = msgr->cct->_conf->get_val<std::string>("ms_inject_delay_type").find(ceph_entity_type_name(connection_state->peer_type));
+    if (pos != string::npos) {
+      lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
+      delay_thread = new DelayedDelivery(this);
+      delay_thread->create("ms_pipe_delay");
+    }
+  }
+}
+
+void Pipe::start_writer()
+{
+  assert(pipe_lock.is_locked());
+  assert(!writer_running);
+  writer_running = true;
+  writer_thread.create("ms_pipe_write", msgr->cct->_conf->ms_rwthread_stack_bytes);
+}
+
+void Pipe::join_reader()
+{
+  if (!reader_running)
+    return;
+  cond.Signal();
+  pipe_lock.Unlock();
+  reader_thread.join();
+  pipe_lock.Lock();
+  reader_needs_join = false;
+}
+
+void Pipe::DelayedDelivery::discard()
+{
+  lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::discard" << dendl;
+  Mutex::Locker l(delay_lock);
+  while (!delay_queue.empty()) {
+    Message *m = delay_queue.front().second;
+    pipe->in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
+    m->put();
+    delay_queue.pop_front();
+  }
+}
+
+void Pipe::DelayedDelivery::flush()
+{
+  lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::flush" << dendl;
+  Mutex::Locker l(delay_lock);
+  flush_count = delay_queue.size();
+  delay_cond.Signal();
+}
+
+void *Pipe::DelayedDelivery::entry()
+{
+  Mutex::Locker locker(delay_lock);
+  lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry start" << dendl;
+
+  while (!stop_delayed_delivery) {
+    if (delay_queue.empty()) {
+      lgeneric_subdout(pipe->msgr->cct, ms, 30) << *pipe << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl;
+      delay_cond.Wait(delay_lock);
+      continue;
+    }
+    utime_t release = delay_queue.front().first;
+    Message *m = delay_queue.front().second;
+    string delay_msg_type = pipe->msgr->cct->_conf->ms_inject_delay_msg_type;
+    if (!flush_count &&
+        (release > ceph_clock_now() &&
+         (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
+      lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
+      delay_cond.WaitUntil(delay_lock, release);
+      continue;
+    }
+    lgeneric_subdout(pipe->msgr->cct, ms, 10) << *pipe << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
+    delay_queue.pop_front();
+    if (flush_count > 0) {
+      --flush_count;
+      active_flush = true;
+    }
+    if (pipe->in_q->can_fast_dispatch(m)) {
+      if (!stop_fast_dispatching_flag) {
+        delay_dispatching = true;
+        delay_lock.Unlock();
+        pipe->in_q->fast_dispatch(m);
+        delay_lock.Lock();
+        delay_dispatching = false;
+        if (stop_fast_dispatching_flag) {
+          // we need to let the stopping thread proceed
+          delay_cond.Signal();
+          delay_lock.Unlock();
+          delay_lock.Lock();
+        }
+      }
+    } else {
+      pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+    }
+    active_flush = false;
+  }
+  lgeneric_subdout(pipe->msgr->cct, ms, 20) << *pipe << "DelayedDelivery::entry stop" << dendl;
+  return NULL;
+}
+
+void Pipe::DelayedDelivery::stop_fast_dispatching() {
+  Mutex::Locker l(delay_lock);
+  stop_fast_dispatching_flag = true;
+  while (delay_dispatching)
+    delay_cond.Wait(delay_lock);
+}
+
+
+int Pipe::accept()
+{
+  ldout(msgr->cct,10) << "accept" << dendl;
+  assert(pipe_lock.is_locked());
+  assert(state == STATE_ACCEPTING);
+
+  pipe_lock.Unlock();
+
+  // vars
+  bufferlist addrs;
+  entity_addr_t socket_addr;
+  socklen_t len;
+  int r;
+  char banner[strlen(CEPH_BANNER)+1];
+  bufferlist addrbl;
+  ceph_msg_connect connect;
+  ceph_msg_connect_reply reply;
+  Pipe *existing = 0;
+  bufferptr bp;
+  bufferlist authorizer, authorizer_reply;
+  bool authorizer_valid;
+  uint64_t feat_missing;
+  bool replaced = false;
+  // this variable denotes if the connection attempt from peer is a hard 
+  // reset or not, it is true if there is an existing connection and the
+  // connection sequence from peer is equal to zero
+  bool is_reset_from_peer = false;
+  CryptoKey session_key;
+  int removed; // single-use down below
+
+  // this should roughly mirror pseudocode at
+  //  http://ceph.com/wiki/Messaging_protocol
+  int reply_tag = 0;
+  uint64_t existing_seq = -1;
+
+  // used for reading in the remote acked seq on connect
+  uint64_t newly_acked_seq = 0;
+
+  recv_reset();
+
+  set_socket_options();
+
+  // announce myself.
+  r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER));
+  if (r < 0) {
+    ldout(msgr->cct,10) << "accept couldn't write banner" << dendl;
+    goto fail_unlocked;
+  }
+
+  // and my addr
+  ::encode(msgr->my_inst.addr, addrs, 0);  // legacy
+
+  port = msgr->my_inst.addr.get_port();
+
+  // and peer's socket addr (they might not know their ip)
+  sockaddr_storage ss;
+  len = sizeof(ss);
+  r = ::getpeername(sd, (sockaddr*)&ss, &len);
+  if (r < 0) {
+    ldout(msgr->cct,0) << "accept failed to getpeername " << cpp_strerror(errno) << dendl;
+    goto fail_unlocked;
+  }
+  socket_addr.set_sockaddr((sockaddr*)&ss);
+  ::encode(socket_addr, addrs, 0);  // legacy
+
+  r = tcp_write(addrs.c_str(), addrs.length());
+  if (r < 0) {
+    ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl;
+    goto fail_unlocked;
+  }
+
+  ldout(msgr->cct,1) << "accept sd=" << sd << " " << socket_addr << dendl;
+  
+  // identify peer
+  if (tcp_read(banner, strlen(CEPH_BANNER)) < 0) {
+    ldout(msgr->cct,10) << "accept couldn't read banner" << dendl;
+    goto fail_unlocked;
+  }
+  if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
+    banner[strlen(CEPH_BANNER)] = 0;
+    ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl;
+    goto fail_unlocked;
+  }
+  {
+    bufferptr tp(sizeof(ceph_entity_addr));
+    addrbl.push_back(std::move(tp));
+  }
+  if (tcp_read(addrbl.c_str(), addrbl.length()) < 0) {
+    ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl;
+    goto fail_unlocked;
+  }
+  {
+    bufferlist::iterator ti = addrbl.begin();
+    ::decode(peer_addr, ti);
+  }
+
+  ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl;
+  if (peer_addr.is_blank_ip()) {
+    // peer apparently doesn't know what ip they have; figure it out for them.
+    int port = peer_addr.get_port();
+    peer_addr.u = socket_addr.u;
+    peer_addr.set_port(port);
+    ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr
+           << " (socket is " << socket_addr << ")" << dendl;
+  }
+  set_peer_addr(peer_addr);  // so that connection_state gets set up
+  
+  while (1) {
+    if (tcp_read((char*)&connect, sizeof(connect)) < 0) {
+      ldout(msgr->cct,10) << "accept couldn't read connect" << dendl;
+      goto fail_unlocked;
+    }
+
+    authorizer.clear();
+    if (connect.authorizer_len) {
+      bp = buffer::create(connect.authorizer_len);
+      if (tcp_read(bp.c_str(), connect.authorizer_len) < 0) {
+        ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl;
+        goto fail_unlocked;
+      }
+      authorizer.push_back(std::move(bp));
+      authorizer_reply.clear();
+    }
+
+    ldout(msgr->cct,20) << "accept got peer connect_seq " << connect.connect_seq
+            << " global_seq " << connect.global_seq
+            << dendl;
+    
+    msgr->lock.Lock();   // FIXME
+    pipe_lock.Lock();
+    if (msgr->dispatch_queue.stop)
+      goto shutting_down;
+    if (state != STATE_ACCEPTING) {
+      goto shutting_down;
+    }
+
+    // note peer's type, flags
+    set_peer_type(connect.host_type);
+    policy = msgr->get_policy(connect.host_type);
+    ldout(msgr->cct,10) << "accept of host_type " << connect.host_type
+                       << ", policy.lossy=" << policy.lossy
+                       << " policy.server=" << policy.server
+                       << " policy.standby=" << policy.standby
+                       << " policy.resetcheck=" << policy.resetcheck
+                       << dendl;
+
+    memset(&reply, 0, sizeof(reply));
+    reply.protocol_version = msgr->get_proto_version(peer_type, false);
+    msgr->lock.Unlock();
+
+    // mismatch?
+    ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version
+            << ", their proto " << connect.protocol_version << dendl;
+    if (connect.protocol_version != reply.protocol_version) {
+      reply.tag = CEPH_MSGR_TAG_BADPROTOVER;
+      goto reply;
+    }
+
+    // require signatures for cephx?
+    if (connect.authorizer_protocol == CEPH_AUTH_CEPHX) {
+      if (peer_type == CEPH_ENTITY_TYPE_OSD ||
+         peer_type == CEPH_ENTITY_TYPE_MDS) {
+       if (msgr->cct->_conf->cephx_require_signatures ||
+           msgr->cct->_conf->cephx_cluster_require_signatures) {
+         ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for cluster" << dendl;
+         policy.features_required |= CEPH_FEATURE_MSG_AUTH;
+       }
+      } else {
+       if (msgr->cct->_conf->cephx_require_signatures ||
+           msgr->cct->_conf->cephx_service_require_signatures) {
+         ldout(msgr->cct,10) << "using cephx, requiring MSG_AUTH feature bit for service" << dendl;
+         policy.features_required |= CEPH_FEATURE_MSG_AUTH;
+       }
+      }
+    }
+
+    feat_missing = policy.features_required & ~(uint64_t)connect.features;
+    if (feat_missing) {
+      ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl;
+      reply.tag = CEPH_MSGR_TAG_FEATURES;
+      goto reply;
+    }
+    
+    // Check the authorizer.  If not good, bail out.
+
+    pipe_lock.Unlock();
+
+    if (!msgr->verify_authorizer(connection_state.get(), peer_type, connect.authorizer_protocol, authorizer,
+                                authorizer_reply, authorizer_valid, session_key) ||
+       !authorizer_valid) {
+      ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl;
+      pipe_lock.Lock();
+      if (state != STATE_ACCEPTING)
+       goto shutting_down_msgr_unlocked;
+      reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER;
+      session_security.reset();
+      goto reply;
+    } 
+
+    // We've verified the authorizer for this pipe, so set up the session security structure.  PLR
+
+    ldout(msgr->cct,10) << "accept:  setting up session_security." << dendl;
+
+  retry_existing_lookup:
+    msgr->lock.Lock();
+    pipe_lock.Lock();
+    if (msgr->dispatch_queue.stop)
+      goto shutting_down;
+    if (state != STATE_ACCEPTING)
+      goto shutting_down;
+    
+    // existing?
+    existing = msgr->_lookup_pipe(peer_addr);
+    if (existing) {
+      existing->pipe_lock.Lock(true);  // skip lockdep check (we are locking a second Pipe here)
+      if (existing->reader_dispatching) {
+       /** we need to wait, or we can deadlock if downstream
+        *  fast_dispatchers are (naughtily!) waiting on resources
+        *  held by somebody trying to make use of the SimpleMessenger lock.
+        *  So drop locks, wait, and retry. It just looks like a slow network
+        *  to everybody else.
+        *
+        *  We take a ref to existing here since it might get reaped before we
+        *  wake up (see bug #15870).  We can be confident that it lived until
+        *  locked it since we held the msgr lock from _lookup_pipe through to
+        *  locking existing->lock and checking reader_dispatching.
+        */
+       existing->get();
+       pipe_lock.Unlock();
+       msgr->lock.Unlock();
+       existing->notify_on_dispatch_done = true;
+       while (existing->reader_dispatching)
+         existing->cond.Wait(existing->pipe_lock);
+       existing->pipe_lock.Unlock();
+       existing->put();
+       existing = nullptr;
+       goto retry_existing_lookup;
+      }
+
+      if (connect.global_seq < existing->peer_global_seq) {
+       ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
+                << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
+       reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
+       reply.global_seq = existing->peer_global_seq;  // so we can send it below..
+       existing->pipe_lock.Unlock();
+       msgr->lock.Unlock();
+       goto reply;
+      } else {
+       ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
+                << " <= " << connect.global_seq << ", looks ok" << dendl;
+      }
+      
+      if (existing->policy.lossy) {
+       ldout(msgr->cct,0) << "accept replacing existing (lossy) channel (new one lossy="
+               << policy.lossy << ")" << dendl;
+       existing->was_session_reset();
+       goto replace;
+      }
+
+      ldout(msgr->cct,0) << "accept connect_seq " << connect.connect_seq
+                        << " vs existing " << existing->connect_seq
+                        << " state " << existing->get_state_name() << dendl;
+
+      if (connect.connect_seq == 0 && existing->connect_seq > 0) {
+       ldout(msgr->cct,0) << "accept peer reset, then tried to connect to us, replacing" << dendl;
+        // this is a hard reset from peer
+        is_reset_from_peer = true;
+       if (policy.resetcheck)
+         existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
+       goto replace;
+      }
+
+      if (connect.connect_seq < existing->connect_seq) {
+       // old attempt, or we sent READY but they didn't get it.
+       ldout(msgr->cct,10) << "accept existing " << existing << ".cseq " << existing->connect_seq
+                           << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
+       goto retry_session;
+      }
+
+      if (connect.connect_seq == existing->connect_seq) {
+       // if the existing connection successfully opened, and/or
+       // subsequently went to standby, then the peer should bump
+       // their connect_seq and retry: this is not a connection race
+       // we need to resolve here.
+       if (existing->state == STATE_OPEN ||
+           existing->state == STATE_STANDBY) {
+         ldout(msgr->cct,10) << "accept connection race, existing " << existing
+                             << ".cseq " << existing->connect_seq
+                             << " == " << connect.connect_seq
+                             << ", OPEN|STANDBY, RETRY_SESSION" << dendl;
+         goto retry_session;
+       }
+
+       // connection race?
+       if (peer_addr < msgr->my_inst.addr ||
+           existing->policy.server) {
+         // incoming wins
+         ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
+                  << " == " << connect.connect_seq << ", or we are server, replacing my attempt" << dendl;
+         if (!(existing->state == STATE_CONNECTING ||
+               existing->state == STATE_WAIT))
+           lderr(msgr->cct) << "accept race bad state, would replace, existing="
+                            << existing->get_state_name()
+                            << " " << existing << ".cseq=" << existing->connect_seq
+                            << " == " << connect.connect_seq
+                            << dendl;
+         assert(existing->state == STATE_CONNECTING ||
+                existing->state == STATE_WAIT);
+         goto replace;
+       } else {
+         // our existing outgoing wins
+         ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
+                  << " == " << connect.connect_seq << ", sending WAIT" << dendl;
+         assert(peer_addr > msgr->my_inst.addr);
+         if (!(existing->state == STATE_CONNECTING))
+           lderr(msgr->cct) << "accept race bad state, would send wait, existing="
+                            << existing->get_state_name()
+                            << " " << existing << ".cseq=" << existing->connect_seq
+                            << " == " << connect.connect_seq
+                            << dendl;
+         assert(existing->state == STATE_CONNECTING);
+         // make sure our outgoing connection will follow through
+         existing->_send_keepalive();
+         reply.tag = CEPH_MSGR_TAG_WAIT;
+         existing->pipe_lock.Unlock();
+         msgr->lock.Unlock();
+         goto reply;
+       }
+      }
+
+      assert(connect.connect_seq > existing->connect_seq);
+      assert(connect.global_seq >= existing->peer_global_seq);
+      if (policy.resetcheck &&   // RESETSESSION only used by servers; peers do not reset each other
+         existing->connect_seq == 0) {
+       ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq 
+                << ", " << existing << ".cseq = " << existing->connect_seq
+                << "), sending RESETSESSION" << dendl;
+       reply.tag = CEPH_MSGR_TAG_RESETSESSION;
+       msgr->lock.Unlock();
+       existing->pipe_lock.Unlock();
+       goto reply;
+      }
+
+      // reconnect
+      ldout(msgr->cct,10) << "accept peer sent cseq " << connect.connect_seq
+              << " > " << existing->connect_seq << dendl;
+      goto replace;
+    } // existing
+    else if (connect.connect_seq > 0) {
+      // we reset, and they are opening a new session
+      ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
+      msgr->lock.Unlock();
+      reply.tag = CEPH_MSGR_TAG_RESETSESSION;
+      goto reply;
+    } else {
+      // new session
+      ldout(msgr->cct,10) << "accept new session" << dendl;
+      existing = NULL;
+      goto open;
+    }
+    ceph_abort();
+
+  retry_session:
+    assert(existing->pipe_lock.is_locked());
+    assert(pipe_lock.is_locked());
+    reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
+    reply.connect_seq = existing->connect_seq + 1;
+    existing->pipe_lock.Unlock();
+    msgr->lock.Unlock();
+    goto reply;    
+
+  reply:
+    assert(pipe_lock.is_locked());
+    reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
+    reply.authorizer_len = authorizer_reply.length();
+    pipe_lock.Unlock();
+    r = tcp_write((char*)&reply, sizeof(reply));
+    if (r < 0)
+      goto fail_unlocked;
+    if (reply.authorizer_len) {
+      r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
+      if (r < 0)
+       goto fail_unlocked;
+    }
+  }
+  
+ replace:
+  assert(existing->pipe_lock.is_locked());
+  assert(pipe_lock.is_locked());
+  // if it is a hard reset from peer, we don't need a round-trip to negotiate in/out sequence
+  if ((connect.features & CEPH_FEATURE_RECONNECT_SEQ) && !is_reset_from_peer) {
+    reply_tag = CEPH_MSGR_TAG_SEQ;
+    existing_seq = existing->in_seq;
+  }
+  ldout(msgr->cct,10) << "accept replacing " << existing << dendl;
+  existing->stop();
+  existing->unregister_pipe();
+  replaced = true;
+
+  if (existing->policy.lossy) {
+    // disconnect from the Connection
+    assert(existing->connection_state);
+    if (existing->connection_state->clear_pipe(existing))
+      msgr->dispatch_queue.queue_reset(existing->connection_state.get());
+  } else {
+    // queue a reset on the new connection, which we're dumping for the old
+    msgr->dispatch_queue.queue_reset(connection_state.get());
+
+    // drop my Connection, and take a ref to the existing one. do not
+    // clear existing->connection_state, since read_message and
+    // write_message both dereference it without pipe_lock.
+    connection_state = existing->connection_state;
+
+    // make existing Connection reference us
+    connection_state->reset_pipe(this);
+
+    if (existing->delay_thread) {
+      existing->delay_thread->steal_for_pipe(this);
+      delay_thread = existing->delay_thread;
+      existing->delay_thread = NULL;
+      delay_thread->flush();
+    }
+
+    // steal incoming queue
+    uint64_t replaced_conn_id = conn_id;
+    conn_id = existing->conn_id;
+    existing->conn_id = replaced_conn_id;
+
+    // reset the in_seq if this is a hard reset from peer,
+    // otherwise we respect our original connection's value
+    in_seq = is_reset_from_peer ? 0 : existing->in_seq;
+    in_seq_acked = in_seq;
+
+    // steal outgoing queue and out_seq
+    existing->requeue_sent();
+    out_seq = existing->out_seq;
+    ldout(msgr->cct,10) << "accept re-queuing on out_seq " << out_seq << " in_seq " << in_seq << dendl;
+    for (map<int, list<Message*> >::iterator p = existing->out_q.begin();
+         p != existing->out_q.end();
+         ++p)
+      out_q[p->first].splice(out_q[p->first].begin(), p->second);
+  }
+  existing->stop_and_wait();
+  existing->pipe_lock.Unlock();
+
+ open:
+  // open
+  assert(pipe_lock.is_locked());
+  connect_seq = connect.connect_seq + 1;
+  peer_global_seq = connect.global_seq;
+  assert(state == STATE_ACCEPTING);
+  state = STATE_OPEN;
+  ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
+
+  // send READY reply
+  reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY);
+  reply.features = policy.features_supported;
+  reply.global_seq = msgr->get_global_seq();
+  reply.connect_seq = connect_seq;
+  reply.flags = 0;
+  reply.authorizer_len = authorizer_reply.length();
+  if (policy.lossy)
+    reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
+
+  connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
+  ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl;
+
+  session_security.reset(
+      get_auth_session_handler(msgr->cct,
+                              connect.authorizer_protocol,
+                              session_key,
+                              connection_state->get_features()));
+
+  // notify
+  msgr->dispatch_queue.queue_accept(connection_state.get());
+  msgr->ms_deliver_handle_fast_accept(connection_state.get());
+
+  // ok!
+  if (msgr->dispatch_queue.stop)
+    goto shutting_down;
+  removed = msgr->accepting_pipes.erase(this);
+  assert(removed == 1);
+  register_pipe();
+  msgr->lock.Unlock();
+  pipe_lock.Unlock();
+
+  r = tcp_write((char*)&reply, sizeof(reply));
+  if (r < 0) {
+    goto fail_registered;
+  }
+
+  if (reply.authorizer_len) {
+    r = tcp_write(authorizer_reply.c_str(), authorizer_reply.length());
+    if (r < 0) {
+      goto fail_registered;
+    }
+  }
+
+  if (reply_tag == CEPH_MSGR_TAG_SEQ) {
+    if (tcp_write((char*)&existing_seq, sizeof(existing_seq)) < 0) {
+      ldout(msgr->cct,2) << "accept write error on in_seq" << dendl;
+      goto fail_registered;
+    }
+    if (tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) {
+      ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl;
+      goto fail_registered;
+    }
+  }
+
+  pipe_lock.Lock();
+  discard_requeued_up_to(newly_acked_seq);
+  if (state != STATE_CLOSED) {
+    ldout(msgr->cct,10) << "accept starting writer, state " << get_state_name() << dendl;
+    start_writer();
+  }
+  ldout(msgr->cct,20) << "accept done" << dendl;
+
+  maybe_start_delay_thread();
+
+  return 0;   // success.
+
+ fail_registered:
+  ldout(msgr->cct, 10) << "accept fault after register" << dendl;
+
+  if (msgr->cct->_conf->ms_inject_internal_delays) {
+    ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
+    utime_t t;
+    t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
+    t.sleep();
+  }
+
+ fail_unlocked:
+  pipe_lock.Lock();
+  if (state != STATE_CLOSED) {
+    bool queued = is_queued();
+    ldout(msgr->cct, 10) << "  queued = " << (int)queued << dendl;
+    if (queued) {
+      state = policy.server ? STATE_STANDBY : STATE_CONNECTING;
+    } else if (replaced) {
+      state = STATE_STANDBY;
+    } else {
+      state = STATE_CLOSED;
+      state_closed = true;
+    }
+    fault();
+    if (queued || replaced)
+      start_writer();
+  }
+  return -1;
+
+ shutting_down:
+  msgr->lock.Unlock();
+ shutting_down_msgr_unlocked:
+  assert(pipe_lock.is_locked());
+
+  if (msgr->cct->_conf->ms_inject_internal_delays) {
+    ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
+    utime_t t;
+    t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
+    t.sleep();
+  }
+
+  state = STATE_CLOSED;
+  state_closed = true;
+  fault();
+  return -1;
+}
+
+void Pipe::set_socket_options()
+{
+  // disable Nagle algorithm?
+  if (msgr->cct->_conf->ms_tcp_nodelay) {
+    int flag = 1;
+    int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
+    if (r < 0) {
+      r = -errno;
+      ldout(msgr->cct,0) << "couldn't set TCP_NODELAY: "
+                         << cpp_strerror(r) << dendl;
+    }
+  }
+  if (msgr->cct->_conf->ms_tcp_rcvbuf) {
+    int size = msgr->cct->_conf->ms_tcp_rcvbuf;
+    int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size));
+    if (r < 0)  {
+      r = -errno;
+      ldout(msgr->cct,0) << "couldn't set SO_RCVBUF to " << size
+                         << ": " << cpp_strerror(r) << dendl;
+    }
+  }
+
+  // block ESIGPIPE
+#ifdef CEPH_USE_SO_NOSIGPIPE
+  int val = 1;
+  int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val));
+  if (r) {
+    r = -errno;
+    ldout(msgr->cct,0) << "couldn't set SO_NOSIGPIPE: "
+                       << cpp_strerror(r) << dendl;
+  }
+#endif
+
+#ifdef SO_PRIORITY
+  int prio = msgr->get_socket_priority();
+  if (prio >= 0) {
+    int r = -1;
+#ifdef IPTOS_CLASS_CS6
+    int iptos = IPTOS_CLASS_CS6;
+    int addr_family = 0;
+    if (!peer_addr.is_blank_ip()) {
+      addr_family = peer_addr.get_family();
+    } else {
+      addr_family = msgr->get_myaddr().get_family();
+    }
+    switch (addr_family) {
+    case AF_INET:
+      r = ::setsockopt(sd, IPPROTO_IP, IP_TOS, &iptos, sizeof(iptos));
+      break;
+    case AF_INET6:
+      r = ::setsockopt(sd, IPPROTO_IPV6, IPV6_TCLASS, &iptos, sizeof(iptos));
+      break;
+    default:
+      lderr(msgr->cct) << "couldn't set ToS of unknown family ("
+                      << addr_family << ")"
+                      << " to " << iptos << dendl;
+      return;
+    }
+    if (r < 0) {
+      r = -errno;
+      ldout(msgr->cct,0) << "couldn't set TOS to " << iptos
+                        << ": " << cpp_strerror(r) << dendl;
+    }
+#endif
+    // setsockopt(IPTOS_CLASS_CS6) sets the priority of the socket as 0.
+    // See http://goo.gl/QWhvsD and http://goo.gl/laTbjT
+    // We need to call setsockopt(SO_PRIORITY) after it.
+    r = ::setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &prio, sizeof(prio));
+    if (r < 0) {
+      r = -errno;
+      ldout(msgr->cct,0) << "couldn't set SO_PRIORITY to " << prio
+                         << ": " << cpp_strerror(r) << dendl;
+    }
+  }
+#endif
+}
+
+int Pipe::connect()
+{
+  bool got_bad_auth = false;
+
+  ldout(msgr->cct,10) << "connect " << connect_seq << dendl;
+  assert(pipe_lock.is_locked());
+
+  __u32 cseq = connect_seq;
+  __u32 gseq = msgr->get_global_seq();
+
+  // stop reader thread
+  join_reader();
+
+  pipe_lock.Unlock();
+  
+  char tag = -1;
+  int rc = -1;
+  struct msghdr msg;
+  struct iovec msgvec[2];
+  int msglen;
+  char banner[strlen(CEPH_BANNER) + 1];  // extra byte makes coverity happy
+  entity_addr_t paddr;
+  entity_addr_t peer_addr_for_me, socket_addr;
+  AuthAuthorizer *authorizer = NULL;
+  bufferlist addrbl, myaddrbl;
+  const md_config_t *conf = msgr->cct->_conf;
+
+  // close old socket.  this is safe because we stopped the reader thread above.
+  if (sd >= 0)
+    ::close(sd);
+
+  // create socket?
+  sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0);
+  if (sd < 0) {
+    rc = -errno;
+    lderr(msgr->cct) << "connect couldn't create socket " << cpp_strerror(rc) << dendl;
+    goto fail;
+  }
+
+  recv_reset();
+
+  set_socket_options();
+
+  {
+    entity_addr_t addr2bind = msgr->get_myaddr();
+    if (msgr->cct->_conf->ms_bind_before_connect && (!addr2bind.is_blank_ip())) {
+      addr2bind.set_port(0);
+      int r = ::bind(sd , addr2bind.get_sockaddr(), addr2bind.get_sockaddr_len());
+      if (r < 0) {
+        ldout(msgr->cct,2) << "client bind error " << ", " << cpp_strerror(errno) << dendl;
+        goto fail;
+      }
+    }
+  }
+
+  // connect!
+  ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl;
+  rc = ::connect(sd, peer_addr.get_sockaddr(), peer_addr.get_sockaddr_len());
+  if (rc < 0) {
+    int stored_errno = errno;
+    ldout(msgr->cct,2) << "connect error " << peer_addr
+            << ", " << cpp_strerror(stored_errno) << dendl;
+    if (stored_errno == ECONNREFUSED) {
+      ldout(msgr->cct, 2) << "connection refused!" << dendl;
+      msgr->dispatch_queue.queue_refused(connection_state.get());
+    }
+    goto fail;
+  }
+
+  // verify banner
+  // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
+  rc = tcp_read((char*)&banner, strlen(CEPH_BANNER));
+  if (rc < 0) {
+    ldout(msgr->cct,2) << "connect couldn't read banner, " << cpp_strerror(rc) << dendl;
+    goto fail;
+  }
+  if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
+    ldout(msgr->cct,0) << "connect protocol error (bad banner) on peer " << peer_addr << dendl;
+    goto fail;
+  }
+
+  memset(&msg, 0, sizeof(msg));
+  msgvec[0].iov_base = banner;
+  msgvec[0].iov_len = strlen(CEPH_BANNER);
+  msg.msg_iov = msgvec;
+  msg.msg_iovlen = 1;
+  msglen = msgvec[0].iov_len;
+  rc = do_sendmsg(&msg, msglen);
+  if (rc < 0) {
+    ldout(msgr->cct,2) << "connect couldn't write my banner, " << cpp_strerror(rc) << dendl;
+    goto fail;
+  }
+
+  // identify peer
+  {
+#if defined(__linux__) || defined(DARWIN) || defined(__FreeBSD__)
+    bufferptr p(sizeof(ceph_entity_addr) * 2);
+#else
+    int wirelen = sizeof(__u32) * 2 + sizeof(ceph_sockaddr_storage);
+    bufferptr p(wirelen * 2);
+#endif
+    addrbl.push_back(std::move(p));
+  }
+  rc = tcp_read(addrbl.c_str(), addrbl.length());
+  if (rc < 0) {
+    ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << cpp_strerror(rc) << dendl;
+    goto fail;
+  }
+  try {
+    bufferlist::iterator p = addrbl.begin();
+    ::decode(paddr, p);
+    ::decode(peer_addr_for_me, p);
+  }
+  catch (buffer::error& e) {
+    ldout(msgr->cct,2) << "connect couldn't decode peer addrs: " << e.what()
+                      << dendl;
+    goto fail;
+  }
+  port = peer_addr_for_me.get_port();
+
+  ldout(msgr->cct,20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
+  if (peer_addr != paddr) {
+    if (paddr.is_blank_ip() &&
+       peer_addr.get_port() == paddr.get_port() &&
+       peer_addr.get_nonce() == paddr.get_nonce()) {
+      ldout(msgr->cct,0) << "connect claims to be " 
+             << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl;
+    } else {
+      ldout(msgr->cct,10) << "connect claims to be "
+                         << paddr << " not " << peer_addr << dendl;
+      goto fail;
+    }
+  }
+
+  ldout(msgr->cct,20) << "connect peer addr for me is " << peer_addr_for_me << dendl;
+
+  msgr->learned_addr(peer_addr_for_me);
+
+  ::encode(msgr->my_inst.addr, myaddrbl, 0);  // legacy
+
+  memset(&msg, 0, sizeof(msg));
+  msgvec[0].iov_base = myaddrbl.c_str();
+  msgvec[0].iov_len = myaddrbl.length();
+  msg.msg_iov = msgvec;
+  msg.msg_iovlen = 1;
+  msglen = msgvec[0].iov_len;
+  rc = do_sendmsg(&msg, msglen);
+  if (rc < 0) {
+    ldout(msgr->cct,2) << "connect couldn't write my addr, " << cpp_strerror(rc) << dendl;
+    goto fail;
+  }
+  ldout(msgr->cct,10) << "connect sent my addr " << msgr->my_inst.addr << dendl;
+
+
+  while (1) {
+    delete authorizer;
+    authorizer = msgr->get_authorizer(peer_type, false);
+    bufferlist authorizer_reply;
+
+    ceph_msg_connect connect;
+    connect.features = policy.features_supported;
+    connect.host_type = msgr->get_myinst().name.type();
+    connect.global_seq = gseq;
+    connect.connect_seq = cseq;
+    connect.protocol_version = msgr->get_proto_version(peer_type, true);
+    connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
+    connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
+    if (authorizer) 
+      ldout(msgr->cct,10) << "connect.authorizer_len=" << connect.authorizer_len
+              << " protocol=" << connect.authorizer_protocol << dendl;
+    connect.flags = 0;
+    if (policy.lossy)
+      connect.flags |= CEPH_MSG_CONNECT_LOSSY;  // this is fyi, actually, server decides!
+    memset(&msg, 0, sizeof(msg));
+    msgvec[0].iov_base = (char*)&connect;
+    msgvec[0].iov_len = sizeof(connect);
+    msg.msg_iov = msgvec;
+    msg.msg_iovlen = 1;
+    msglen = msgvec[0].iov_len;
+    if (authorizer) {
+      msgvec[1].iov_base = authorizer->bl.c_str();
+      msgvec[1].iov_len = authorizer->bl.length();
+      msg.msg_iovlen++;
+      msglen += msgvec[1].iov_len;
+    }
+
+    ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq
+            << " proto=" << connect.protocol_version << dendl;
+    rc = do_sendmsg(&msg, msglen);
+    if (rc < 0) {
+      ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << cpp_strerror(rc) << dendl;
+      goto fail;
+    }
+
+    ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
+    ceph_msg_connect_reply reply;
+    rc = tcp_read((char*)&reply, sizeof(reply));
+    if (rc < 0) {
+      ldout(msgr->cct,2) << "connect read reply " << cpp_strerror(rc) << dendl;
+      goto fail;
+    }
+
+    ldout(msgr->cct,20) << "connect got reply tag " << (int)reply.tag
+                       << " connect_seq " << reply.connect_seq
+                       << " global_seq " << reply.global_seq
+                       << " proto " << reply.protocol_version
+                       << " flags " << (int)reply.flags
+                       << " features " << reply.features
+                       << dendl;
+
+    authorizer_reply.clear();
+
+    if (reply.authorizer_len) {
+      ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl;
+      bufferptr bp = buffer::create(reply.authorizer_len);
+      rc = tcp_read(bp.c_str(), reply.authorizer_len);
+      if (rc < 0) {
+        ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << cpp_strerror(rc) << dendl;
+       goto fail;
+      }
+      authorizer_reply.push_back(bp);
+    }
+
+    if (authorizer) {
+      bufferlist::iterator iter = authorizer_reply.begin();
+      if (!authorizer->verify_reply(iter)) {
+        ldout(msgr->cct,0) << "failed verifying authorize reply" << dendl;
+       goto fail;
+      }
+    }
+
+    if (conf->ms_inject_internal_delays) {
+      ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
+      utime_t t;
+      t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
+      t.sleep();
+    }
+
+    pipe_lock.Lock();
+    if (state != STATE_CONNECTING) {
+      ldout(msgr->cct,0) << "connect got RESETSESSION but no longer connecting" << dendl;
+      goto stop_locked;
+    }
+
+    if (reply.tag == CEPH_MSGR_TAG_FEATURES) {
+      ldout(msgr->cct,0) << "connect protocol feature mismatch, my " << std::hex
+             << connect.features << " < peer " << reply.features
+             << " missing " << (reply.features & ~policy.features_supported)
+             << std::dec << dendl;
+      goto fail_locked;
+    }
+
+    if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
+      ldout(msgr->cct,0) << "connect protocol version mismatch, my " << connect.protocol_version
+             << " != " << reply.protocol_version << dendl;
+      goto fail_locked;
+    }
+
+    if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) {
+      ldout(msgr->cct,0) << "connect got BADAUTHORIZER" << dendl;
+      if (got_bad_auth)
+        goto stop_locked;
+      got_bad_auth = true;
+      pipe_lock.Unlock();
+      delete authorizer;
+      authorizer = msgr->get_authorizer(peer_type, true);  // try harder
+      continue;
+    }
+    if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
+      ldout(msgr->cct,0) << "connect got RESETSESSION" << dendl;
+      was_session_reset();
+      cseq = 0;
+      pipe_lock.Unlock();
+      continue;
+    }
+    if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
+      gseq = msgr->get_global_seq(reply.global_seq);
+      ldout(msgr->cct,10) << "connect got RETRY_GLOBAL " << reply.global_seq
+              << " chose new " << gseq << dendl;
+      pipe_lock.Unlock();
+      continue;
+    }
+    if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
+      assert(reply.connect_seq > connect_seq);
+      ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq
+              << " -> " << reply.connect_seq << dendl;
+      cseq = connect_seq = reply.connect_seq;
+      pipe_lock.Unlock();
+      continue;
+    }
+
+    if (reply.tag == CEPH_MSGR_TAG_WAIT) {
+      ldout(msgr->cct,3) << "connect got WAIT (connection race)" << dendl;
+      state = STATE_WAIT;
+      goto stop_locked;
+    }
+
+    if (reply.tag == CEPH_MSGR_TAG_READY ||
+        reply.tag == CEPH_MSGR_TAG_SEQ) {
+      uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features;
+      if (feat_missing) {
+       ldout(msgr->cct,1) << "missing required features " << std::hex << feat_missing << std::dec << dendl;
+       goto fail_locked;
+      }
+
+      if (reply.tag == CEPH_MSGR_TAG_SEQ) {
+        ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl;
+        uint64_t newly_acked_seq = 0;
+        rc = tcp_read((char*)&newly_acked_seq, sizeof(newly_acked_seq));
+        if (rc < 0) {
+          ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << cpp_strerror(rc) << dendl;
+          goto fail_locked;
+        }
+       ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq
+                          << " vs out_seq " << out_seq << dendl;
+       while (newly_acked_seq > out_seq) {
+         Message *m = _get_next_outgoing();
+         assert(m);
+         ldout(msgr->cct,2) << " discarding previously sent " << m->get_seq()
+                            << " " << *m << dendl;
+         assert(m->get_seq() <= newly_acked_seq);
+         m->put();
+         ++out_seq;
+       }
+        if (tcp_write((char*)&in_seq, sizeof(in_seq)) < 0) {
+          ldout(msgr->cct,2) << "connect write error on in_seq" << dendl;
+          goto fail_locked;
+        }
+      }
+
+      // hooray!
+      peer_global_seq = reply.global_seq;
+      policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
+      state = STATE_OPEN;
+      connect_seq = cseq + 1;
+      assert(connect_seq == reply.connect_seq);
+      backoff = utime_t();
+      connection_state->set_features((uint64_t)reply.features & (uint64_t)connect.features);
+      ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy
+              << ", features " << connection_state->get_features() << dendl;
+      
+
+      // If we have an authorizer, get a new AuthSessionHandler to deal with ongoing security of the
+      // connection.  PLR
+
+      if (authorizer != NULL) {
+       session_security.reset(
+            get_auth_session_handler(msgr->cct,
+                                    authorizer->protocol,
+                                    authorizer->session_key,
+                                    connection_state->get_features()));
+      }  else {
+        // We have no authorizer, so we shouldn't be applying security to messages in this pipe.  PLR
+       session_security.reset();
+      }
+
+      msgr->dispatch_queue.queue_connect(connection_state.get());
+      msgr->ms_deliver_handle_fast_connect(connection_state.get());
+      
+      if (!reader_running) {
+       ldout(msgr->cct,20) << "connect starting reader" << dendl;
+       start_reader();
+      }
+      maybe_start_delay_thread();
+      delete authorizer;
+      return 0;
+    }
+    
+    // protocol error
+    ldout(msgr->cct,0) << "connect got bad tag " << (int)tag << dendl;
+    goto fail_locked;
+  }
+
+ fail:
+  if (conf->ms_inject_internal_delays) {
+    ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
+    utime_t t;
+    t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
+    t.sleep();
+  }
+
+  pipe_lock.Lock();
+ fail_locked:
+  if (state == STATE_CONNECTING)
+    fault();
+  else
+    ldout(msgr->cct,3) << "connect fault, but state = " << get_state_name()
+                      << " != connecting, stopping" << dendl;
+
+ stop_locked:
+  delete authorizer;
+  return rc;
+}
+
+void Pipe::register_pipe()
+{
+  ldout(msgr->cct,10) << "register_pipe" << dendl;
+  assert(msgr->lock.is_locked());
+  Pipe *existing = msgr->_lookup_pipe(peer_addr);
+  assert(existing == NULL);
+  msgr->rank_pipe[peer_addr] = this;
+}
+
+void Pipe::unregister_pipe()
+{
+  assert(msgr->lock.is_locked());
+  ceph::unordered_map<entity_addr_t,Pipe*>::iterator p = msgr->rank_pipe.find(peer_addr);
+  if (p != msgr->rank_pipe.end() && p->second == this) {
+    ldout(msgr->cct,10) << "unregister_pipe" << dendl;
+    msgr->rank_pipe.erase(p);
+  } else {
+    ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl;
+    msgr->accepting_pipes.erase(this);  // somewhat overkill, but safe.
+  }
+}
+
+void Pipe::join()
+{
+  ldout(msgr->cct, 20) << "join" << dendl;
+  if (writer_thread.is_started())
+    writer_thread.join();
+  if (reader_thread.is_started())
+    reader_thread.join();
+  if (delay_thread) {
+    ldout(msgr->cct, 20) << "joining delay_thread" << dendl;
+    delay_thread->stop();
+    delay_thread->join();
+  }
+}
+
+void Pipe::requeue_sent()
+{
+  if (sent.empty())
+    return;
+
+  list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+  while (!sent.empty()) {
+    Message *m = sent.back();
+    sent.pop_back();
+    ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq
+                       << " (" << m->get_seq() << ")" << dendl;
+    rq.push_front(m);
+    out_seq--;
+  }
+}
+
+void Pipe::discard_requeued_up_to(uint64_t seq)
+{
+  ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl;
+  if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0)
+    return;
+  list<Message*>& rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+  while (!rq.empty()) {
+    Message *m = rq.front();
+    if (m->get_seq() == 0 || m->get_seq() > seq)
+      break;
+    ldout(msgr->cct,10) << "discard_requeued_up_to " << *m << " for resend seq " << out_seq
+                       << " <= " << seq << ", discarding" << dendl;
+    m->put();
+    rq.pop_front();
+    out_seq++;
+  }
+  if (rq.empty())
+    out_q.erase(CEPH_MSG_PRIO_HIGHEST);
+}
+
+/*
+ * Tears down the Pipe's message queues, and removes them from the DispatchQueue
+ * Must hold pipe_lock prior to calling.
+ */
+void Pipe::discard_out_queue()
+{
+  ldout(msgr->cct,10) << "discard_queue" << dendl;
+
+  for (list<Message*>::iterator p = sent.begin(); p != sent.end(); ++p) {
+    ldout(msgr->cct,20) << "  discard " << *p << dendl;
+    (*p)->put();
+  }
+  sent.clear();
+  for (map<int,list<Message*> >::iterator p = out_q.begin(); p != out_q.end(); ++p)
+    for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); ++r) {
+      ldout(msgr->cct,20) << "  discard " << *r << dendl;
+      (*r)->put();
+    }
+  out_q.clear();
+}
+
+void Pipe::fault(bool onread)
+{
+  const md_config_t *conf = msgr->cct->_conf;
+  assert(pipe_lock.is_locked());
+  cond.Signal();
+
+  if (onread && state == STATE_CONNECTING) {
+    ldout(msgr->cct,10) << "fault already connecting, reader shutting down" << dendl;
+    return;
+  }
+  
+  ldout(msgr->cct,2) << "fault " << cpp_strerror(errno) << dendl;
+
+  if (state == STATE_CLOSED ||
+      state == STATE_CLOSING) {
+    ldout(msgr->cct,10) << "fault already closed|closing" << dendl;
+    if (connection_state->clear_pipe(this))
+      msgr->dispatch_queue.queue_reset(connection_state.get());
+    return;
+  }
+
+  shutdown_socket();
+
+  // lossy channel?
+  if (policy.lossy && state != STATE_CONNECTING) {
+    ldout(msgr->cct,10) << "fault on lossy channel, failing" << dendl;
+
+    // disconnect from Connection, and mark it failed.  future messages
+    // will be dropped.
+    assert(connection_state);
+    stop();
+    bool cleared = connection_state->clear_pipe(this);
+
+    // crib locks, blech.  note that Pipe is now STATE_CLOSED and the
+    // rank_pipe entry is ignored by others.
+    pipe_lock.Unlock();
+
+    if (conf->ms_inject_internal_delays) {
+      ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl;
+      utime_t t;
+      t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
+      t.sleep();
+    }
+
+    msgr->lock.Lock();
+    pipe_lock.Lock();
+    unregister_pipe();
+    msgr->lock.Unlock();
+
+    if (delay_thread)
+      delay_thread->discard();
+    in_q->discard_queue(conn_id);
+    discard_out_queue();
+    if (cleared)
+      msgr->dispatch_queue.queue_reset(connection_state.get());
+    return;
+  }
+
+  // queue delayed items immediately
+  if (delay_thread)
+    delay_thread->flush();
+
+  // requeue sent items
+  requeue_sent();
+
+  if (policy.standby && !is_queued()) {
+    ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
+    state = STATE_STANDBY;
+    return;
+  }
+
+  if (state != STATE_CONNECTING) {
+    if (policy.server) {
+      ldout(msgr->cct,0) << "fault, server, going to standby" << dendl;
+      state = STATE_STANDBY;
+    } else {
+      ldout(msgr->cct,0) << "fault, initiating reconnect" << dendl;
+      connect_seq++;
+      state = STATE_CONNECTING;
+    }
+    backoff = utime_t();
+  } else if (backoff == utime_t()) {
+    ldout(msgr->cct,0) << "fault" << dendl;
+    backoff.set_from_double(conf->ms_initial_backoff);
+  } else {
+    ldout(msgr->cct,10) << "fault waiting " << backoff << dendl;
+    cond.WaitInterval(pipe_lock, backoff);
+    backoff += backoff;
+    if (backoff > conf->ms_max_backoff)
+      backoff.set_from_double(conf->ms_max_backoff);
+    ldout(msgr->cct,10) << "fault done waiting or woke up" << dendl;
+  }
+}
+
+int Pipe::randomize_out_seq()
+{
+  if (connection_state->get_features() & CEPH_FEATURE_MSG_AUTH) {
+    // Set out_seq to a random value, so CRC won't be predictable.   Don't bother checking seq_error
+    // here.  We'll check it on the call.  PLR
+    int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq));
+    out_seq &= SEQ_MASK;
+    lsubdout(msgr->cct, ms, 10) << "randomize_out_seq " << out_seq << dendl;
+    return seq_error;
+  } else {
+    // previously, seq #'s always started at 0.
+    out_seq = 0;
+    return 0;
+  }
+}
+
+void Pipe::was_session_reset()
+{
+  assert(pipe_lock.is_locked());
+
+  ldout(msgr->cct,10) << "was_session_reset" << dendl;
+  in_q->discard_queue(conn_id);
+  if (delay_thread)
+    delay_thread->discard();
+  discard_out_queue();
+
+  msgr->dispatch_queue.queue_remote_reset(connection_state.get());
+
+  if (randomize_out_seq()) {
+    lsubdout(msgr->cct,ms,15) << "was_session_reset(): Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
+  }
+
+  in_seq = 0;
+  connect_seq = 0;
+}
+
+void Pipe::stop()
+{
+  ldout(msgr->cct,10) << "stop" << dendl;
+  assert(pipe_lock.is_locked());
+  state = STATE_CLOSED;
+  state_closed = true;
+  cond.Signal();
+  shutdown_socket();
+}
+
+void Pipe::stop_and_wait()
+{
+  assert(pipe_lock.is_locked_by_me());
+  if (state != STATE_CLOSED)
+    stop();
+
+  if (msgr->cct->_conf->ms_inject_internal_delays) {
+    ldout(msgr->cct, 10) << __func__ << " sleep for "
+                        << msgr->cct->_conf->ms_inject_internal_delays
+                        << dendl;
+    utime_t t;
+    t.set_from_double(msgr->cct->_conf->ms_inject_internal_delays);
+    t.sleep();
+  }
+  
+  if (delay_thread) {
+    pipe_lock.Unlock();
+    delay_thread->stop_fast_dispatching();
+    pipe_lock.Lock();
+  }
+  while (reader_running &&
+        reader_dispatching)
+    cond.Wait(pipe_lock);
+}
+
+/* read msgs from socket.
+ * also, server.
+ */
+void Pipe::reader()
+{
+  pipe_lock.Lock();
+
+  if (state == STATE_ACCEPTING) {
+    accept();
+    assert(pipe_lock.is_locked());
+  }
+
+  // loop.
+  while (state != STATE_CLOSED &&
+        state != STATE_CONNECTING) {
+    assert(pipe_lock.is_locked());
+
+    // sleep if (re)connecting
+    if (state == STATE_STANDBY) {
+      ldout(msgr->cct,20) << "reader sleeping during reconnect|standby" << dendl;
+      cond.Wait(pipe_lock);
+      continue;
+    }
+
+    // get a reference to the AuthSessionHandler while we have the pipe_lock
+    ceph::shared_ptr<AuthSessionHandler> auth_handler = session_security;
+
+    pipe_lock.Unlock();
+
+    char tag = -1;
+    ldout(msgr->cct,20) << "reader reading tag..." << dendl;
+    if (tcp_read((char*)&tag, 1) < 0) {
+      pipe_lock.Lock();
+      ldout(msgr->cct,2) << "reader couldn't read tag, " << cpp_strerror(errno) << dendl;
+      fault(true);
+      continue;
+    }
+
+    if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
+      ldout(msgr->cct,2) << "reader got KEEPALIVE" << dendl;
+      pipe_lock.Lock();
+      connection_state->set_last_keepalive(ceph_clock_now());
+      continue;
+    }
+    if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
+      ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl;
+      ceph_timespec t;
+      int rc = tcp_read((char*)&t, sizeof(t));
+      pipe_lock.Lock();
+      if (rc < 0) {
+       ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp "
+                          << cpp_strerror(errno) << dendl;
+       fault(true);
+      } else {
+       send_keepalive_ack = true;
+       keepalive_ack_stamp = utime_t(t);
+       ldout(msgr->cct,2) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
+                          << dendl;
+       connection_state->set_last_keepalive(ceph_clock_now());
+       cond.Signal();
+      }
+      continue;
+    }
+    if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
+      ldout(msgr->cct,2) << "reader got KEEPALIVE_ACK" << dendl;
+      struct ceph_timespec t;
+      int rc = tcp_read((char*)&t, sizeof(t));
+      pipe_lock.Lock();
+      if (rc < 0) {
+       ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl;
+       fault(true);
+      } else {
+       connection_state->set_last_keepalive_ack(utime_t(t));
+      }
+      continue;
+    }
+
+    // open ...
+    if (tag == CEPH_MSGR_TAG_ACK) {
+      ldout(msgr->cct,20) << "reader got ACK" << dendl;
+      ceph_le64 seq;
+      int rc = tcp_read((char*)&seq, sizeof(seq));
+      pipe_lock.Lock();
+      if (rc < 0) {
+       ldout(msgr->cct,2) << "reader couldn't read ack seq, " << cpp_strerror(errno) << dendl;
+       fault(true);
+      } else if (state != STATE_CLOSED) {
+        handle_ack(seq);
+      }
+      continue;
+    }
+
+    else if (tag == CEPH_MSGR_TAG_MSG) {
+      ldout(msgr->cct,20) << "reader got MSG" << dendl;
+      Message *m = 0;
+      int r = read_message(&m, auth_handler.get());
+
+      pipe_lock.Lock();
+      
+      if (!m) {
+       if (r < 0)
+         fault(true);
+       continue;
+      }
+
+      m->trace.event("pipe read message");
+
+      if (state == STATE_CLOSED ||
+         state == STATE_CONNECTING) {
+       in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
+       m->put();
+       continue;
+      }
+
+      // check received seq#.  if it is old, drop the message.  
+      // note that incoming messages may skip ahead.  this is convenient for the client
+      // side queueing because messages can't be renumbered, but the (kernel) client will
+      // occasionally pull a message out of the sent queue to send elsewhere.  in that case
+      // it doesn't matter if we "got" it or not.
+      if (m->get_seq() <= in_seq) {
+       ldout(msgr->cct,0) << "reader got old message "
+               << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
+               << ", discarding" << dendl;
+       in_q->dispatch_throttle_release(m->get_dispatch_throttle_size());
+       m->put();
+       if (connection_state->has_feature(CEPH_FEATURE_RECONNECT_SEQ) &&
+           msgr->cct->_conf->ms_die_on_old_message)
+         assert(0 == "old msgs despite reconnect_seq feature");
+       continue;
+      }
+      if (m->get_seq() > in_seq + 1) {
+       ldout(msgr->cct,0) << "reader missed message?  skipped from seq "
+                          << in_seq << " to " << m->get_seq() << dendl;
+       if (msgr->cct->_conf->ms_die_on_skipped_message)
+         assert(0 == "skipped incoming seq");
+      }
+
+      m->set_connection(connection_state.get());
+
+      // note last received message.
+      in_seq = m->get_seq();
+
+      cond.Signal();  // wake up writer, to ack this
+      
+      ldout(msgr->cct,10) << "reader got message "
+              << m->get_seq() << " " << m << " " << *m
+              << dendl;
+      in_q->fast_preprocess(m);
+
+      if (delay_thread) {
+        utime_t release;
+        if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
+          release = m->get_recv_stamp();
+          release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+          lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
+        }
+        delay_thread->queue(release, m);
+      } else {
+        if (in_q->can_fast_dispatch(m)) {
+         reader_dispatching = true;
+          pipe_lock.Unlock();
+          in_q->fast_dispatch(m);
+          pipe_lock.Lock();
+         reader_dispatching = false;
+         if (state == STATE_CLOSED ||
+             notify_on_dispatch_done) { // there might be somebody waiting
+           notify_on_dispatch_done = false;
+           cond.Signal();
+         }
+        } else {
+          in_q->enqueue(m, m->get_priority(), conn_id);
+        }
+      }
+    }
+    
+    else if (tag == CEPH_MSGR_TAG_CLOSE) {
+      ldout(msgr->cct,20) << "reader got CLOSE" << dendl;
+      pipe_lock.Lock();
+      if (state == STATE_CLOSING) {
+       state = STATE_CLOSED;
+       state_closed = true;
+      } else {
+       state = STATE_CLOSING;
+      }
+      cond.Signal();
+      break;
+    }
+    else {
+      ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl;
+      pipe_lock.Lock();
+      fault(true);
+    }
+  }
+
+  // reap?
+  reader_running = false;
+  reader_needs_join = true;
+  unlock_maybe_reap();
+  ldout(msgr->cct,10) << "reader done" << dendl;
+}
+
+/* write msgs to socket.
+ * also, client.
+ */
+void Pipe::writer()
+{
+  pipe_lock.Lock();
+  while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
+    ldout(msgr->cct,10) << "writer: state = " << get_state_name()
+                       << " policy.server=" << policy.server << dendl;
+
+    // standby?
+    if (is_queued() && state == STATE_STANDBY && !policy.server)
+      state = STATE_CONNECTING;
+
+    // connect?
+    if (state == STATE_CONNECTING) {
+      assert(!policy.server);
+      connect();
+      continue;
+    }
+    
+    if (state == STATE_CLOSING) {
+      // write close tag
+      ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl;
+      char tag = CEPH_MSGR_TAG_CLOSE;
+      state = STATE_CLOSED;
+      state_closed = true;
+      pipe_lock.Unlock();
+      if (sd >= 0) {
+       // we can ignore return value, actually; we don't care if this succeeds.
+       int r = ::write(sd, &tag, 1);
+       (void)r;
+      }
+      pipe_lock.Lock();
+      continue;
+    }
+
+    if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
+       (is_queued() || in_seq > in_seq_acked)) {
+
+      // keepalive?
+      if (send_keepalive) {
+       int rc;
+       if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+         pipe_lock.Unlock();
+         rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2,
+                               ceph_clock_now());
+       } else {
+         pipe_lock.Unlock();
+         rc = write_keepalive();
+       }
+       pipe_lock.Lock();
+       if (rc < 0) {
+         ldout(msgr->cct,2) << "writer couldn't write keepalive[2], "
+                            << cpp_strerror(errno) << dendl;
+         fault();
+         continue;
+       }
+       send_keepalive = false;
+      }
+      if (send_keepalive_ack) {
+       utime_t t = keepalive_ack_stamp;
+       pipe_lock.Unlock();
+       int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t);
+       pipe_lock.Lock();
+       if (rc < 0) {
+         ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << cpp_strerror(errno) << dendl;
+         fault();
+         continue;
+       }
+       send_keepalive_ack = false;
+      }
+
+      // send ack?
+      if (in_seq > in_seq_acked) {
+       uint64_t send_seq = in_seq;
+       pipe_lock.Unlock();
+       int rc = write_ack(send_seq);
+       pipe_lock.Lock();
+       if (rc < 0) {
+         ldout(msgr->cct,2) << "writer couldn't write ack, " << cpp_strerror(errno) << dendl;
+         fault();
+         continue;
+       }
+       in_seq_acked = send_seq;
+      }
+
+      // grab outgoing message
+      Message *m = _get_next_outgoing();
+      if (m) {
+       m->set_seq(++out_seq);
+       if (!policy.lossy) {
+         // put on sent list
+         sent.push_back(m); 
+         m->get();
+       }
+
+       // associate message with Connection (for benefit of encode_payload)
+       m->set_connection(connection_state.get());
+
+       uint64_t features = connection_state->get_features();
+
+       if (m->empty_payload())
+         ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features
+                             << " " << m << " " << *m << dendl;
+       else
+         ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features
+                             << " " << m << " " << *m << dendl;
+
+       // encode and copy out of *m
+       m->encode(features, msgr->crcflags);
+
+       // prepare everything
+       const ceph_msg_header& header = m->get_header();
+       const ceph_msg_footer& footer = m->get_footer();
+
+       // Now that we have all the crcs calculated, handle the
+       // digital signature for the message, if the pipe has session
+       // security set up.  Some session security options do not
+       // actually calculate and check the signature, but they should
+       // handle the calls to sign_message and check_signature.  PLR
+       if (session_security.get() == NULL) {
+         ldout(msgr->cct, 20) << "writer no session security" << dendl;
+       } else {
+         if (session_security->sign_message(m)) {
+           ldout(msgr->cct, 20) << "writer failed to sign seq # " << header.seq
+                                << "): sig = " << footer.sig << dendl;
+         } else {
+           ldout(msgr->cct, 20) << "writer signed seq # " << header.seq
+                                << "): sig = " << footer.sig << dendl;
+         }
+       }
+
+       bufferlist blist = m->get_payload();
+       blist.append(m->get_middle());
+       blist.append(m->get_data());
+
+        pipe_lock.Unlock();
+
+        m->trace.event("pipe writing message");
+
+        ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl;
+       int rc = write_message(header, footer, blist);
+
+       pipe_lock.Lock();
+       if (rc < 0) {
+          ldout(msgr->cct,1) << "writer error sending " << m << ", "
+                 << cpp_strerror(errno) << dendl;
+         fault();
+        }
+       m->put();
+      }
+      continue;
+    }
+    
+    // wait
+    ldout(msgr->cct,20) << "writer sleeping" << dendl;
+    cond.Wait(pipe_lock);
+  }
+  
+  ldout(msgr->cct,20) << "writer finishing" << dendl;
+
+  // reap?
+  writer_running = false;
+  unlock_maybe_reap();
+  ldout(msgr->cct,10) << "writer done" << dendl;
+}
+
+void Pipe::unlock_maybe_reap()
+{
+  if (!reader_running && !writer_running) {
+    shutdown_socket();
+    pipe_lock.Unlock();
+    if (delay_thread && delay_thread->is_flushing()) {
+      delay_thread->wait_for_flush();
+    }
+    msgr->queue_reap(this);
+  } else {
+    pipe_lock.Unlock();
+  }
+}
+
+static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
+{
+  // create a buffer to read into that matches the data alignment
+  unsigned left = len;
+  if (off & ~CEPH_PAGE_MASK) {
+    // head
+    unsigned head = 0;
+    head = MIN(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left);
+    data.push_back(buffer::create(head));
+    left -= head;
+  }
+  unsigned middle = left & CEPH_PAGE_MASK;
+  if (middle > 0) {
+    data.push_back(buffer::create_page_aligned(middle));
+    left -= middle;
+  }
+  if (left) {
+    data.push_back(buffer::create(left));
+  }
+}
+
+int Pipe::read_message(Message **pm, AuthSessionHandler* auth_handler)
+{
+  int ret = -1;
+  // envelope
+  //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd  << dendl;
+  
+  ceph_msg_header header; 
+  ceph_msg_footer footer;
+  __u32 header_crc = 0;
+
+  if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
+    if (tcp_read((char*)&header, sizeof(header)) < 0)
+      return -1;
+    if (msgr->crcflags & MSG_CRC_HEADER) {
+      header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
+    }
+  } else {
+    ceph_msg_header_old oldheader;
+    if (tcp_read((char*)&oldheader, sizeof(oldheader)) < 0)
+      return -1;
+    // this is fugly
+    memcpy(&header, &oldheader, sizeof(header));
+    header.src = oldheader.src.name;
+    header.reserved = oldheader.reserved;
+    if (msgr->crcflags & MSG_CRC_HEADER) {
+      header.crc = oldheader.crc;
+      header_crc = ceph_crc32c(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc));
+    }
+  }
+
+  ldout(msgr->cct,20) << "reader got envelope type=" << header.type
+           << " src " << entity_name_t(header.src)
+           << " front=" << header.front_len
+          << " data=" << header.data_len
+          << " off " << header.data_off
+           << dendl;
+
+  // verify header crc
+  if ((msgr->crcflags & MSG_CRC_HEADER) && header_crc != header.crc) {
+    ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
+    return -1;
+  }
+
+  bufferlist front, middle, data;
+  int front_len, middle_len;
+  unsigned data_len, data_off;
+  int aborted;
+  Message *message;
+  utime_t recv_stamp = ceph_clock_now();
+
+  if (policy.throttler_messages) {
+    ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler "
+                       << policy.throttler_messages->get_current() << "/"
+                       << policy.throttler_messages->get_max() << dendl;
+    policy.throttler_messages->get();
+  }
+
+  uint64_t message_size = header.front_len + header.middle_len + header.data_len;
+  if (message_size) {
+    if (policy.throttler_bytes) {
+      ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler "
+              << policy.throttler_bytes->get_current() << "/"
+              << policy.throttler_bytes->get_max() << dendl;
+      policy.throttler_bytes->get(message_size);
+    }
+
+    // throttle total bytes waiting for dispatch.  do this _after_ the
+    // policy throttle, as this one does not deadlock (unless dispatch
+    // blocks indefinitely, which it shouldn't).  in contrast, the
+    // policy throttle carries for the lifetime of the message.
+    ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
+            << in_q->dispatch_throttler.get_current() << "/"
+            << in_q->dispatch_throttler.get_max() << dendl;
+    in_q->dispatch_throttler.get(message_size);
+  }
+
+  utime_t throttle_stamp = ceph_clock_now();
+
+  // read front
+  front_len = header.front_len;
+  if (front_len) {
+    bufferptr bp = buffer::create(front_len);
+    if (tcp_read(bp.c_str(), front_len) < 0)
+      goto out_dethrottle;
+    front.push_back(std::move(bp));
+    ldout(msgr->cct,20) << "reader got front " << front.length() << dendl;
+  }
+
+  // read middle
+  middle_len = header.middle_len;
+  if (middle_len) {
+    bufferptr bp = buffer::create(middle_len);
+    if (tcp_read(bp.c_str(), middle_len) < 0)
+      goto out_dethrottle;
+    middle.push_back(std::move(bp));
+    ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl;
+  }
+
+
+  // read data
+  data_len = le32_to_cpu(header.data_len);
+  data_off = le32_to_cpu(header.data_off);
+  if (data_len) {
+    unsigned offset = 0;
+    unsigned left = data_len;
+
+    bufferlist newbuf, rxbuf;
+    bufferlist::iterator blp;
+    int rxbuf_version = 0;
+       
+    while (left > 0) {
+      // wait for data
+      if (tcp_read_wait() < 0)
+       goto out_dethrottle;
+
+      // get a buffer
+      connection_state->lock.Lock();
+      map<ceph_tid_t,pair<bufferlist,int> >::iterator p = connection_state->rx_buffers.find(header.tid);
+      if (p != connection_state->rx_buffers.end()) {
+       if (rxbuf.length() == 0 || p->second.second != rxbuf_version) {
+         ldout(msgr->cct,10) << "reader seleting rx buffer v " << p->second.second
+                  << " at offset " << offset
+                  << " len " << p->second.first.length() << dendl;
+         rxbuf = p->second.first;
+         rxbuf_version = p->second.second;
+         // make sure it's big enough
+         if (rxbuf.length() < data_len)
+           rxbuf.push_back(buffer::create(data_len - rxbuf.length()));
+         blp = p->second.first.begin();
+         blp.advance(offset);
+       }
+      } else {
+       if (!newbuf.length()) {
+         ldout(msgr->cct,20) << "reader allocating new rx buffer at offset " << offset << dendl;
+         alloc_aligned_buffer(newbuf, data_len, data_off);
+         blp = newbuf.begin();
+         blp.advance(offset);
+       }
+      }
+      bufferptr bp = blp.get_current_ptr();
+      int read = MIN(bp.length(), left);
+      ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl;
+      ssize_t got = tcp_read_nonblocking(bp.c_str(), read);
+      ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl;
+      connection_state->lock.Unlock();
+      if (got < 0)
+       goto out_dethrottle;
+      if (got > 0) {
+       blp.advance(got);
+       data.append(bp, 0, got);
+       offset += got;
+       left -= got;
+      } // else we got a signal or something; just loop.
+    }
+  }
+
+  // footer
+  if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
+    if (tcp_read((char*)&footer, sizeof(footer)) < 0)
+      goto out_dethrottle;
+  } else {
+    ceph_msg_footer_old old_footer;
+    if (tcp_read((char*)&old_footer, sizeof(old_footer)) < 0)
+      goto out_dethrottle;
+    footer.front_crc = old_footer.front_crc;
+    footer.middle_crc = old_footer.middle_crc;
+    footer.data_crc = old_footer.data_crc;
+    footer.sig = 0;
+    footer.flags = old_footer.flags;
+  }
+  
+  aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;
+  ldout(msgr->cct,10) << "aborted = " << aborted << dendl;
+  if (aborted) {
+    ldout(msgr->cct,0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
+           << " byte message.. ABORTED" << dendl;
+    ret = 0;
+    goto out_dethrottle;
+  }
+
+  ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
+          << " byte message" << dendl;
+  message = decode_message(msgr->cct, msgr->crcflags, header, footer,
+                           front, middle, data, connection_state.get());
+  if (!message) {
+    ret = -EINVAL;
+    goto out_dethrottle;
+  }
+
+  //
+  //  Check the signature if one should be present.  A zero return indicates success. PLR
+  //
+
+  if (auth_handler == NULL) {
+    ldout(msgr->cct, 10) << "No session security set" << dendl;
+  } else {
+    if (auth_handler->check_message_signature(message)) {
+      ldout(msgr->cct, 0) << "Signature check failed" << dendl;
+      message->put();
+      ret = -EINVAL;
+      goto out_dethrottle;
+    } 
+  }
+
+  message->set_byte_throttler(policy.throttler_bytes);
+  message->set_message_throttler(policy.throttler_messages);
+
+  // store reservation size in message, so we don't get confused
+  // by messages entering the dispatch queue through other paths.
+  message->set_dispatch_throttle_size(message_size);
+
+  message->set_recv_stamp(recv_stamp);
+  message->set_throttle_stamp(throttle_stamp);
+  message->set_recv_complete_stamp(ceph_clock_now());
+
+  *pm = message;
+  return 0;
+
+ out_dethrottle:
+  // release bytes reserved from the throttlers on failure
+  if (policy.throttler_messages) {
+    ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler "
+                       << policy.throttler_messages->get_current() << "/"
+                       << policy.throttler_messages->get_max() << dendl;
+    policy.throttler_messages->put();
+  }
+  if (message_size) {
+    if (policy.throttler_bytes) {
+      ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler "
+                         << policy.throttler_bytes->get_current() << "/"
+                         << policy.throttler_bytes->get_max() << dendl;
+      policy.throttler_bytes->put(message_size);
+    }
+
+    in_q->dispatch_throttle_release(message_size);
+  }
+  return ret;
+}
+
+int Pipe::do_sendmsg(struct msghdr *msg, unsigned len, bool more)
+{
+  MSGR_SIGPIPE_STOPPER;
+  while (len > 0) {
+    int r;
+    r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
+    if (r == 0) 
+      ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
+    if (r < 0) {
+      r = -errno; 
+      ldout(msgr->cct,1) << "do_sendmsg error " << cpp_strerror(r) << dendl;
+      return r;
+    }
+    if (state == STATE_CLOSED) {
+      ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
+      return -EINTR; // close enough
+    }
+
+    len -= r;
+    if (len == 0) break;
+    
+    // hrmph.  trim r bytes off the front of our message.
+    ldout(msgr->cct,20) << "do_sendmsg short write did " << r << ", still have " << len << dendl;
+    while (r > 0) {
+      if (msg->msg_iov[0].iov_len <= (size_t)r) {
+       // lose this whole item
+       //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
+       r -= msg->msg_iov[0].iov_len;
+       msg->msg_iov++;
+       msg->msg_iovlen--;
+      } else {
+       // partial!
+       //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
+       msg->msg_iov[0].iov_base = (char *)msg->msg_iov[0].iov_base + r;
+       msg->msg_iov[0].iov_len -= r;
+       break;
+      }
+    }
+  }
+  return 0;
+}
+
+
+int Pipe::write_ack(uint64_t seq)
+{
+  ldout(msgr->cct,10) << "write_ack " << seq << dendl;
+
+  char c = CEPH_MSGR_TAG_ACK;
+  ceph_le64 s;
+  s = seq;
+
+  struct msghdr msg;
+  memset(&msg, 0, sizeof(msg));
+  struct iovec msgvec[2];
+  msgvec[0].iov_base = &c;
+  msgvec[0].iov_len = 1;
+  msgvec[1].iov_base = &s;
+  msgvec[1].iov_len = sizeof(s);
+  msg.msg_iov = msgvec;
+  msg.msg_iovlen = 2;
+  
+  if (do_sendmsg(&msg, 1 + sizeof(s), true) < 0)
+    return -1; 
+  return 0;
+}
+
+int Pipe::write_keepalive()
+{
+  ldout(msgr->cct,10) << "write_keepalive" << dendl;
+
+  char c = CEPH_MSGR_TAG_KEEPALIVE;
+
+  struct msghdr msg;
+  memset(&msg, 0, sizeof(msg));
+  struct iovec msgvec[2];
+  msgvec[0].iov_base = &c;
+  msgvec[0].iov_len = 1;
+  msg.msg_iov = msgvec;
+  msg.msg_iovlen = 1;
+  
+  if (do_sendmsg(&msg, 1) < 0)
+    return -1; 
+  return 0;
+}
+
+int Pipe::write_keepalive2(char tag, const utime_t& t)
+{
+  ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl;
+  struct ceph_timespec ts;
+  t.encode_timeval(&ts);
+  struct msghdr msg;
+  memset(&msg, 0, sizeof(msg));
+  struct iovec msgvec[2];
+  msgvec[0].iov_base = &tag;
+  msgvec[0].iov_len = 1;
+  msgvec[1].iov_base = &ts;
+  msgvec[1].iov_len = sizeof(ts);
+  msg.msg_iov = msgvec;
+  msg.msg_iovlen = 2;
+
+  if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0)
+    return -1;
+  return 0;
+}
+
+
+int Pipe::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, bufferlist& blist)
+{
+  int ret;
+
+  // set up msghdr and iovecs
+  struct msghdr msg;
+  memset(&msg, 0, sizeof(msg));
+  msg.msg_iov = msgvec;
+  int msglen = 0;
+  
+  // send tag
+  char tag = CEPH_MSGR_TAG_MSG;
+  msgvec[msg.msg_iovlen].iov_base = &tag;
+  msgvec[msg.msg_iovlen].iov_len = 1;
+  msglen++;
+  msg.msg_iovlen++;
+
+  // send envelope
+  ceph_msg_header_old oldheader;
+  if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
+    msgvec[msg.msg_iovlen].iov_base = (char*)&header;
+    msgvec[msg.msg_iovlen].iov_len = sizeof(header);
+    msglen += sizeof(header);
+    msg.msg_iovlen++;
+  } else {
+    memcpy(&oldheader, &header, sizeof(header));
+    oldheader.src.name = header.src;
+    oldheader.src.addr = connection_state->get_peer_addr();
+    oldheader.orig_src = oldheader.src;
+    oldheader.reserved = header.reserved;
+    if (msgr->crcflags & MSG_CRC_HEADER) {
+       oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader,
+                                   sizeof(oldheader) - sizeof(oldheader.crc));
+    } else {
+       oldheader.crc = 0;
+    }
+    msgvec[msg.msg_iovlen].iov_base = (char*)&oldheader;
+    msgvec[msg.msg_iovlen].iov_len = sizeof(oldheader);
+    msglen += sizeof(oldheader);
+    msg.msg_iovlen++;
+  }
+
+  // payload (front+data)
+  list<bufferptr>::const_iterator pb = blist.buffers().begin();
+  unsigned b_off = 0;  // carry-over buffer offset, if any
+  unsigned bl_pos = 0; // blist pos
+  unsigned left = blist.length();
+
+  while (left > 0) {
+    unsigned donow = MIN(left, pb->length()-b_off);
+    if (donow == 0) {
+      ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length()
+                         << " b_off " << b_off << dendl;
+    }
+    assert(donow > 0);
+    ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off
+            << " leftinchunk " << left
+            << " buffer len " << pb->length()
+            << " writing " << donow 
+            << dendl;
+    
+    if (msg.msg_iovlen >= SM_IOV_MAX-2) {
+      if (do_sendmsg(&msg, msglen, true))
+       goto fail;
+      
+      // and restart the iov
+      msg.msg_iov = msgvec;
+      msg.msg_iovlen = 0;
+      msglen = 0;
+    }
+    
+    msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
+    msgvec[msg.msg_iovlen].iov_len = donow;
+    msglen += donow;
+    msg.msg_iovlen++;
+    
+    assert(left >= donow);
+    left -= donow;
+    b_off += donow;
+    bl_pos += donow;
+    if (left == 0)
+      break;
+    while (b_off == pb->length()) {
+      ++pb;
+      b_off = 0;
+    }
+  }
+  assert(left == 0);
+
+  // send footer; if receiver doesn't support signatures, use the old footer format
+
+  ceph_msg_footer_old old_footer;
+  if (connection_state->has_feature(CEPH_FEATURE_MSG_AUTH)) {
+    msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
+    msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
+    msglen += sizeof(footer);
+    msg.msg_iovlen++;
+  } else {
+    if (msgr->crcflags & MSG_CRC_HEADER) {
+      old_footer.front_crc = footer.front_crc;
+      old_footer.middle_crc = footer.middle_crc;
+    } else {
+       old_footer.front_crc = old_footer.middle_crc = 0;
+    }
+    old_footer.data_crc = msgr->crcflags & MSG_CRC_DATA ? footer.data_crc : 0;
+    old_footer.flags = footer.flags;   
+    msgvec[msg.msg_iovlen].iov_base = (char*)&old_footer;
+    msgvec[msg.msg_iovlen].iov_len = sizeof(old_footer);
+    msglen += sizeof(old_footer);
+    msg.msg_iovlen++;
+  }
+
+  // send
+  if (do_sendmsg(&msg, msglen))
+    goto fail;
+
+  ret = 0;
+
+ out:
+  return ret;
+
+ fail:
+  ret = -1;
+  goto out;
+}
+
+
+int Pipe::tcp_read(char *buf, unsigned len)
+{
+  if (sd < 0)
+    return -EINVAL;
+
+  while (len > 0) {
+
+    if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
+      if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
+       ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
+       ::shutdown(sd, SHUT_RDWR);
+      }
+    }
+
+    if (tcp_read_wait() < 0)
+      return -1;
+
+    ssize_t got = tcp_read_nonblocking(buf, len);
+
+    if (got < 0)
+      return -1;
+
+    len -= got;
+    buf += got;
+    //lgeneric_dout(cct, DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
+  }
+  return 0;
+}
+
+int Pipe::tcp_read_wait()
+{
+  if (sd < 0)
+    return -EINVAL;
+  struct pollfd pfd;
+  short evmask;
+  pfd.fd = sd;
+  pfd.events = POLLIN;
+#if defined(__linux__)
+  pfd.events |= POLLRDHUP;
+#endif
+
+  if (has_pending_data())
+    return 0;
+
+  int r = poll(&pfd, 1, msgr->timeout);
+  if (r < 0)
+    return -errno;
+  if (r == 0)
+    return -EAGAIN;
+
+  evmask = POLLERR | POLLHUP | POLLNVAL;
+#if defined(__linux__)
+  evmask |= POLLRDHUP;
+#endif
+  if (pfd.revents & evmask)
+    return -1;
+
+  if (!(pfd.revents & POLLIN))
+    return -1;
+
+  return 0;
+}
+
+ssize_t Pipe::do_recv(char *buf, size_t len, int flags)
+{
+again:
+  ssize_t got = ::recv( sd, buf, len, flags );
+  if (got < 0) {
+    if (errno == EINTR) {
+      goto again;
+    }
+    ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
+                    << got << " " << cpp_strerror(errno) << dendl;
+    return -1;
+  }
+  if (got == 0) {
+    return -1;
+  }
+  return got;
+}
+
+ssize_t Pipe::buffered_recv(char *buf, size_t len, int flags)
+{
+  size_t left = len;
+  ssize_t total_recv = 0;
+  if (recv_len > recv_ofs) {
+    int to_read = MIN(recv_len - recv_ofs, left);
+    memcpy(buf, &recv_buf[recv_ofs], to_read);
+    recv_ofs += to_read;
+    left -= to_read;
+    if (left == 0) {
+      return to_read;
+    }
+    buf += to_read;
+    total_recv += to_read;
+  }
+
+  /* nothing left in the prefetch buffer */
+
+  if (left > recv_max_prefetch) {
+    /* this was a large read, we don't prefetch for these */
+    ssize_t ret = do_recv(buf, left, flags );
+    if (ret < 0) {
+      if (total_recv > 0)
+        return total_recv;
+      return ret;
+    }
+    total_recv += ret;
+    return total_recv;
+  }
+
+
+  ssize_t got = do_recv(recv_buf, recv_max_prefetch, flags);
+  if (got < 0) {
+    if (total_recv > 0)
+      return total_recv;
+
+    return got;
+  }
+
+  recv_len = (size_t)got;
+  got = MIN(left, (size_t)got);
+  memcpy(buf, recv_buf, got);
+  recv_ofs = got;
+  total_recv += got;
+  return total_recv;
+}
+
+ssize_t Pipe::tcp_read_nonblocking(char *buf, unsigned len)
+{
+  ssize_t got = buffered_recv(buf, len, MSG_DONTWAIT );
+  if (got < 0) {
+    ldout(msgr->cct, 10) << __func__ << " socket " << sd << " returned "
+                        << got << " " << cpp_strerror(errno) << dendl;
+    return -1;
+  }
+  if (got == 0) {
+    /* poll() said there was data, but we didn't read any - peer
+     * sent a FIN.  Maybe POLLRDHUP signals this, but this is
+     * standard socket behavior as documented by Stevens.
+     */
+    return -1;
+  }
+  return got;
+}
+
+int Pipe::tcp_write(const char *buf, unsigned len)
+{
+  if (sd < 0)
+    return -1;
+  struct pollfd pfd;
+  pfd.fd = sd;
+  pfd.events = POLLOUT | POLLHUP | POLLNVAL | POLLERR;
+#if defined(__linux__)
+  pfd.events |= POLLRDHUP;
+#endif
+
+  if (msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) {
+    if (rand() % msgr->cct->_conf->ms_inject_socket_failures == 0) {
+      ldout(msgr->cct, 0) << "injecting socket failure" << dendl;
+      ::shutdown(sd, SHUT_RDWR);
+    }
+  }
+
+  if (poll(&pfd, 1, -1) < 0)
+    return -1;
+
+  if (!(pfd.revents & POLLOUT))
+    return -1;
+
+  //lgeneric_dout(cct, DBL) << "tcp_write writing " << len << dendl;
+  assert(len > 0);
+  while (len > 0) {
+    MSGR_SIGPIPE_STOPPER;
+    int did = ::send( sd, buf, len, MSG_NOSIGNAL );
+    if (did < 0) {
+      //lgeneric_dout(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
+      //lgeneric_derr(cct, 1) << "tcp_write error did = " << did << " " << cpp_strerror(errno) << dendl;
+      return did;
+    }
+    len -= did;
+    buf += did;
+    //lgeneric_dout(cct, DBL) << "tcp_write did " << did << ", " << len << " left" << dendl;
+  }
+  return 0;
+}