remove ceph code
[stor4nfv.git] / src / ceph / src / msg / xio / XioMessenger.cc
diff --git a/src/ceph/src/msg/xio/XioMessenger.cc b/src/ceph/src/msg/xio/XioMessenger.cc
deleted file mode 100644 (file)
index 6bf4d52..0000000
+++ /dev/null
@@ -1,1137 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- * Portions Copyright (C) 2013 CohortFS, LLC
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#include <arpa/inet.h>
-#include <boost/lexical_cast.hpp>
-#include <set>
-#include <stdlib.h>
-#include <memory>
-
-#include "XioMsg.h"
-#include "XioMessenger.h"
-#include "common/address_helper.h"
-#include "common/code_environment.h"
-#include "messages/MNop.h"
-
-#define dout_subsys ceph_subsys_xio
-#undef dout_prefix
-#define dout_prefix *_dout << "xio."
-
-Mutex mtx("XioMessenger Package Lock");
-std::atomic<bool> initialized = { false };
-
-std::atomic<unsigned> XioMessenger::nInstances = { 0 };
-
-struct xio_mempool *xio_msgr_noreg_mpool;
-
-static struct xio_session_ops xio_msgr_ops;
-
-/* Accelio API callouts */
-
-namespace xio_log
-{
-typedef pair<const char*, int> level_pair;
-static const level_pair LEVELS[] = {
-  make_pair("fatal", 0),
-  make_pair("error", 0),
-  make_pair("warn", 1),
-  make_pair("info", 1),
-  make_pair("debug", 2),
-  make_pair("trace", 20)
-};
-
-static CephContext *context;
-
-int get_level()
-{
-  int level = 0;
-  for (size_t i = 0; i < sizeof(LEVELS); i++) {
-    if (!ldlog_p1(context, dout_subsys, LEVELS[i].second))
-      break;
-    level++;
-  }
-  return level;
-}
-
-void log_dout(const char *file, unsigned line,
-             const char *function, unsigned level,
-             const char *fmt, ...)
-{
-  char buffer[2048];
-  va_list args;
-  va_start(args, fmt);
-  int n = vsnprintf(buffer, sizeof(buffer), fmt, args);
-  va_end(args);
-
-  if (n > 0) {
-    const char *short_file = strrchr(file, '/');
-    short_file = (short_file == NULL) ? file : short_file + 1;
-
-    const level_pair &lvl = LEVELS[level];
-    ldout(context, lvl.second) << '[' << lvl.first << "] "
-      << short_file << ':' << line << ' '
-      << function << " - " << buffer << dendl;
-  }
-}
-}
-
-static int on_session_event(struct xio_session *session,
-                           struct xio_session_event_data *event_data,
-                           void *cb_user_context)
-{
-  XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
-  CephContext *cct = msgr->cct;
-
-  ldout(cct,4) << "session event: " << xio_session_event_str(event_data->event)
-    << ". reason: " << xio_strerror(event_data->reason) << dendl;
-
-  return msgr->session_event(session, event_data, cb_user_context);
-}
-
-static int on_new_session(struct xio_session *session,
-                         struct xio_new_session_req *req,
-                         void *cb_user_context)
-{
-  XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
-  CephContext *cct = msgr->cct;
-
-  ldout(cct,4) << "new session " << session
-    << " user_context " << cb_user_context << dendl;
-
-  return (msgr->new_session(session, req, cb_user_context));
-}
-
-static int on_msg(struct xio_session *session,
-                 struct xio_msg *req,
-                 int more_in_batch,
-                 void *cb_user_context)
-{
-  XioConnection* xcon __attribute__((unused)) =
-    static_cast<XioConnection*>(cb_user_context);
-  CephContext *cct = xcon->get_messenger()->cct;
-
-  ldout(cct,25) << "on_msg session " << session << " xcon " << xcon << dendl;
-
-  if (unlikely(XioPool::trace_mempool)) {
-    static uint32_t nreqs;
-    if (unlikely((++nreqs % 65536) == 0)) {
-      xp_stats.dump(__func__, nreqs);
-    }
-  }
-
-  return xcon->on_msg(session, req, more_in_batch,
-                         cb_user_context);
-}
-
-static int on_ow_msg_send_complete(struct xio_session *session,
-                                  struct xio_msg *msg,
-                                  void *conn_user_context)
-{
-  XioConnection *xcon =
-    static_cast<XioConnection*>(conn_user_context);
-  CephContext *cct = xcon->get_messenger()->cct;
-
-  ldout(cct,25) << "msg delivered session: " << session
-               << " msg: " << msg << " conn_user_context "
-               << conn_user_context << dendl;
-
-  return xcon->on_ow_msg_send_complete(session, msg, conn_user_context);
-}
-
-static int on_msg_error(struct xio_session *session,
-                       enum xio_status error,
-                       enum xio_msg_direction dir,
-                       struct xio_msg  *msg,
-                       void *conn_user_context)
-{
-  /* XIO promises to flush back undelivered messages */
-  XioConnection *xcon =
-    static_cast<XioConnection*>(conn_user_context);
-  CephContext *cct = xcon->get_messenger()->cct;
-
-  ldout(cct,4) << "msg error session: " << session
-    << " error: " << xio_strerror(error) << " msg: " << msg
-    << " conn_user_context " << conn_user_context << dendl;
-
-  return xcon->on_msg_error(session, error, msg, conn_user_context);
-}
-
-static int on_cancel(struct xio_session *session,
-                    struct xio_msg  *msg,
-                    enum xio_status result,
-                    void *conn_user_context)
-{
-  XioConnection* xcon __attribute__((unused)) =
-    static_cast<XioConnection*>(conn_user_context);
-  CephContext *cct = xcon->get_messenger()->cct;
-
-  ldout(cct,25) << "on cancel: session: " << session << " msg: " << msg
-    << " conn_user_context " << conn_user_context << dendl;
-
-  return 0;
-}
-
-static int on_cancel_request(struct xio_session *session,
-                            struct xio_msg  *msg,
-                            void *conn_user_context)
-{
-  XioConnection* xcon __attribute__((unused)) =
-    static_cast<XioConnection*>(conn_user_context);
-  CephContext *cct = xcon->get_messenger()->cct;
-
-  ldout(cct,25) << "on cancel request: session: " << session << " msg: " << msg
-    << " conn_user_context " << conn_user_context << dendl;
-
-  return 0;
-}
-
-/* free functions */
-static string xio_uri_from_entity(const string &type,
-                                 const entity_addr_t& addr, bool want_port)
-{
-  const char *host = NULL;
-  char addr_buf[129];
-  string xio_uri;
-
-  switch(addr.get_family()) {
-  case AF_INET:
-    host = inet_ntop(AF_INET, &addr.in4_addr().sin_addr, addr_buf,
-                    INET_ADDRSTRLEN);
-    break;
-  case AF_INET6:
-    host = inet_ntop(AF_INET6, &addr.in6_addr().sin6_addr, addr_buf,
-                    INET6_ADDRSTRLEN);
-    break;
-  default:
-    abort();
-    break;
-  };
-
-  if (type == "rdma" || type == "tcp")
-      xio_uri = type + "://";
-  else
-      xio_uri = "rdma://";
-
-  /* The following can only succeed if the host is rdma-capable */
-  xio_uri += host;
-  if (want_port) {
-    xio_uri += ":";
-    xio_uri += boost::lexical_cast<std::string>(addr.get_port());
-  }
-
-  return xio_uri;
-} /* xio_uri_from_entity */
-
-void XioInit::package_init(CephContext *cct) {
-   if (! initialized) {
-
-     mtx.Lock();
-     if (! initialized) {
-
-       xio_init();
-
-       // claim a reference to the first context we see
-       xio_log::context = cct->get();
-
-       int xopt;
-       xopt = xio_log::get_level();
-       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_LEVEL,
-                 &xopt, sizeof(xopt));
-       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_FN,
-                 (const void*)xio_log::log_dout, sizeof(xio_log_fn));
-
-       xopt = 1;
-       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_DISABLE_HUGETBL,
-                 &xopt, sizeof(xopt));
-
-       if (g_code_env == CODE_ENVIRONMENT_DAEMON) {
-         xopt = 1;
-         xio_set_opt(NULL, XIO_OPTLEVEL_RDMA, XIO_OPTNAME_ENABLE_FORK_INIT,
-                   &xopt, sizeof(xopt));
-       }
-
-       xopt = XIO_MSGR_IOVLEN;
-       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_IN_IOVLEN,
-                 &xopt, sizeof(xopt));
-       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN,
-                 &xopt, sizeof(xopt));
-
-       /* enable flow-control */
-       xopt = 1;
-       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL,
-                  &xopt, sizeof(xopt));
-
-       /* and set threshold for buffer callouts */
-       xopt = max(cct->_conf->xio_max_send_inline, 512);
-       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_DATA,
-                  &xopt, sizeof(xopt));
-
-       xopt = XioMsgHdr::get_max_encoded_length();
-       ldout(cct,2) << "setting accelio max header size " << xopt << dendl;
-       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_HEADER,
-                  &xopt, sizeof(xopt));
-
-       size_t queue_depth = cct->_conf->xio_queue_depth;
-       struct xio_mempool_config mempool_config = {
-         6,
-         {
-           {1024,  0,  queue_depth,  262144},
-           {4096,  0,  queue_depth,  262144},
-           {16384, 0,  queue_depth,  262144},
-           {65536, 0,  128,  65536},
-           {262144, 0,  32,  16384},
-           {1048576, 0, 8,  8192}
-         }
-       };
-       xio_set_opt(NULL,
-                   XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_CONFIG_MEMPOOL,
-                   &mempool_config, sizeof(mempool_config));
-
-       /* and unregisterd one */
- #define XMSG_MEMPOOL_QUANTUM 4096
-
-       xio_msgr_noreg_mpool =
-       xio_mempool_create(-1 /* nodeid */,
-                          XIO_MEMPOOL_FLAG_REGULAR_PAGES_ALLOC);
-
-       (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 64,
-                                      cct->_conf->xio_mp_min,
-                                      cct->_conf->xio_mp_max_64,
-                                      XMSG_MEMPOOL_QUANTUM, 0);
-       (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 256,
-                                      cct->_conf->xio_mp_min,
-                                      cct->_conf->xio_mp_max_256,
-                                      XMSG_MEMPOOL_QUANTUM, 0);
-       (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 1024,
-                                      cct->_conf->xio_mp_min,
-                                      cct->_conf->xio_mp_max_1k,
-                                      XMSG_MEMPOOL_QUANTUM, 0);
-       (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, getpagesize(),
-                                      cct->_conf->xio_mp_min,
-                                      cct->_conf->xio_mp_max_page,
-                                      XMSG_MEMPOOL_QUANTUM, 0);
-
-       /* initialize ops singleton */
-       xio_msgr_ops.on_session_event = on_session_event;
-       xio_msgr_ops.on_new_session = on_new_session;
-       xio_msgr_ops.on_session_established = NULL;
-       xio_msgr_ops.on_msg = on_msg;
-       xio_msgr_ops.on_ow_msg_send_complete = on_ow_msg_send_complete;
-       xio_msgr_ops.on_msg_error = on_msg_error;
-       xio_msgr_ops.on_cancel = on_cancel;
-       xio_msgr_ops.on_cancel_request = on_cancel_request;
-
-       /* mark initialized */
-       initialized = true;
-     }
-     mtx.Unlock();
-   }
- }
-
-/* XioMessenger */
-#undef dout_prefix
-#define dout_prefix _prefix(_dout, this)
-static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) {
-  return *_dout << "-- " << msgr->get_myaddr() << " ";
-}
-
-XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
-                          string mname, uint64_t _nonce,
-                          uint64_t cflags, DispatchStrategy *ds)
-  : SimplePolicyMessenger(cct, name, mname, _nonce),
-    XioInit(cct),
-    portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)),
-    dispatch_strategy(ds),
-    loop_con(new XioLoopbackConnection(this)),
-    special_handling(0),
-    sh_mtx("XioMessenger session mutex"),
-    sh_cond(),
-    need_addr(true),
-    did_bind(false),
-    nonce(_nonce)
-{
-
-  if (cct->_conf->xio_trace_xcon)
-    magic |= MSG_MAGIC_TRACE_XCON;
-
-  XioPool::trace_mempool = (cct->_conf->xio_trace_mempool);
-  XioPool::trace_msgcnt = (cct->_conf->xio_trace_msgcnt);
-
-  dispatch_strategy->set_messenger(this);
-
-  /* update class instance count */
-  nInstances++;
-
-  loop_con->set_features(CEPH_FEATURES_ALL);
-
-  ldout(cct,2) << "Create msgr: " << this << " instance: "
-    << nInstances << " type: " << name.type_str()
-    << " subtype: " << mname << " nportals: " << get_nportals(cflags)
-    << " nconns_per_portal: " << get_nconns_per_portal(cflags)
-    << dendl;
-
-} /* ctor */
-
-int XioMessenger::pool_hint(uint32_t dsize) {
-  if (dsize > 1024*1024)
-    return 0;
-
-  /* if dsize is already present, returns -EEXIST */
-  return xio_mempool_add_slab(xio_msgr_noreg_mpool, dsize, 0,
-                                  cct->_conf->xio_mp_max_hint,
-                                  XMSG_MEMPOOL_QUANTUM, 0);
-}
-
-int XioMessenger::get_nconns_per_portal(uint64_t cflags)
-{
-  const int XIO_DEFAULT_NUM_CONNS_PER_PORTAL = 8;
-  int nconns = XIO_DEFAULT_NUM_CONNS_PER_PORTAL;
-
-  if (cflags & Messenger::HAS_MANY_CONNECTIONS)
-    nconns = max(cct->_conf->xio_max_conns_per_portal, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
-  else if (cflags & Messenger::HEARTBEAT)
-    nconns = max(cct->_conf->osd_heartbeat_min_peers * 4, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
-
-  return nconns;
-}
-
-int XioMessenger::get_nportals(uint64_t cflags)
-{
-  int nportals = 1;
-
-  if (cflags & Messenger::HAS_HEAVY_TRAFFIC)
-    nportals = max(cct->_conf->xio_portal_threads, 1);
-
-  return nportals;
-}
-
-void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
-{
-  // be careful here: multiple threads may block here, and readers of
-  // my_inst.addr do NOT hold any lock.
-
-  // this always goes from true -> false under the protection of the
-  // mutex.  if it is already false, we need not retake the mutex at
-  // all.
-  if (!need_addr)
-    return;
-
-  sh_mtx.Lock();
-  if (need_addr) {
-    entity_addr_t t = peer_addr_for_me;
-    t.set_port(my_inst.addr.get_port());
-    my_inst.addr.set_sockaddr(t.get_sockaddr());
-    ldout(cct,2) << "learned my addr " << my_inst.addr << dendl;
-    need_addr = false;
-    // init_local_connection();
-  }
-  sh_mtx.Unlock();
-
-}
-
-int XioMessenger::new_session(struct xio_session *session,
-                             struct xio_new_session_req *req,
-                             void *cb_user_context)
-{
-  if (shutdown_called) {
-    return xio_reject(
-      session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */);
-  }
-  int code = portals.accept(session, req, cb_user_context);
-  if (! code)
-    nsessions++;
-  return code;
-} /* new_session */
-
-int XioMessenger::session_event(struct xio_session *session,
-                               struct xio_session_event_data *event_data,
-                               void *cb_user_context)
-{
-  XioConnection *xcon;
-
-  switch (event_data->event) {
-  case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT:
-  {
-    struct xio_connection *conn = event_data->conn;
-    struct xio_connection_attr xcona;
-    entity_addr_t peer_addr_for_me, paddr;
-
-    xcon = static_cast<XioConnection*>(event_data->conn_user_context);
-
-    ldout(cct,2) << "connection established " << event_data->conn
-      << " session " << session << " xcon " << xcon << dendl;
-
-    (void) xio_query_connection(conn, &xcona,
-                               XIO_CONNECTION_ATTR_LOCAL_ADDR|
-                               XIO_CONNECTION_ATTR_PEER_ADDR);
-    peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
-    paddr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
-    //set_myaddr(peer_addr_for_me);
-    learned_addr(peer_addr_for_me);
-    ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl;
-
-    /* notify hook */
-    this->ms_deliver_handle_connect(xcon);
-    this->ms_deliver_handle_fast_connect(xcon);
-  }
-  break;
-
-  case XIO_SESSION_NEW_CONNECTION_EVENT:
-  {
-    struct xio_connection *conn = event_data->conn;
-    struct xio_connection_attr xcona;
-    entity_inst_t s_inst;
-    entity_addr_t peer_addr_for_me;
-
-    (void) xio_query_connection(conn, &xcona,
-                               XIO_CONNECTION_ATTR_CTX|
-                               XIO_CONNECTION_ATTR_PEER_ADDR|
-                               XIO_CONNECTION_ATTR_LOCAL_ADDR);
-    /* XXX assumes RDMA */
-    s_inst.addr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
-    peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
-
-    xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst);
-    xcon->session = session;
-
-    struct xio_context_attr xctxa;
-    (void) xio_query_context(xcona.ctx, &xctxa, XIO_CONTEXT_ATTR_USER_CTX);
-
-    xcon->conn = conn;
-    xcon->portal = static_cast<XioPortal*>(xctxa.user_context);
-    assert(xcon->portal);
-
-    xcona.user_context = xcon;
-    (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX);
-
-    xcon->connected = true;
-
-    /* sentinel ref */
-    xcon->get(); /* xcon->nref == 1 */
-    conns_sp.lock();
-    conns_list.push_back(*xcon);
-    /* XXX we can't put xcon in conns_entity_map becase we don't yet know
-     * it's peer address */
-    conns_sp.unlock();
-
-    /* XXXX pre-merge of session startup negotiation ONLY! */
-    xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
-
-    ldout(cct,2) << "New connection session " << session
-      << " xcon " << xcon << " on msgr: " << this << " portal: " << xcon->portal << dendl;
-    ldout(cct,2) << "Server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl;
-  }
-  break;
-  case XIO_SESSION_CONNECTION_ERROR_EVENT:
-  case XIO_SESSION_CONNECTION_CLOSED_EVENT: /* orderly discon */
-  case XIO_SESSION_CONNECTION_DISCONNECTED_EVENT: /* unexpected discon */
-  case XIO_SESSION_CONNECTION_REFUSED_EVENT:
-    xcon = static_cast<XioConnection*>(event_data->conn_user_context);
-    ldout(cct,2) << xio_session_event_str(event_data->event)
-      << " xcon " << xcon << " session " << session  << dendl;
-    if (likely(!!xcon)) {
-      unregister_xcon(xcon);
-      xcon->on_disconnect_event();
-    }
-    break;
-  case XIO_SESSION_CONNECTION_TEARDOWN_EVENT:
-    xcon = static_cast<XioConnection*>(event_data->conn_user_context);
-    ldout(cct,2) << xio_session_event_str(event_data->event)
-      << " xcon " << xcon << " session " << session << dendl;
-    /*
-     * There are flows where Accelio sends teardown event without going
-     * through disconnect event. so we make sure we cleaned the connection.
-     */
-    unregister_xcon(xcon);
-    xcon->on_teardown_event();
-    break;
-  case XIO_SESSION_TEARDOWN_EVENT:
-    ldout(cct,2) << xio_session_event_str(event_data->event)
-      << " session " << session << dendl;
-    if (unlikely(XioPool::trace_mempool)) {
-      xp_stats.dump("xio session dtor", reinterpret_cast<uint64_t>(session));
-    }
-    xio_session_destroy(session);
-    if (--nsessions == 0) {
-      Mutex::Locker lck(sh_mtx);
-      if (nsessions == 0)
-       sh_cond.Signal();
-    }
-    break;
-  default:
-    break;
-  };
-
-  return 0;
-}
-
-enum bl_type
-{
-  BUFFER_PAYLOAD,
-  BUFFER_MIDDLE,
-  BUFFER_DATA
-};
-
-#define MAX_XIO_BUF_SIZE 1044480
-
-static inline int
-xio_count_buffers(const buffer::list& bl, int& req_size, int& msg_off, int& req_off)
-{
-
-  const std::list<buffer::ptr>& buffers = bl.buffers();
-  list<bufferptr>::const_iterator pb;
-  size_t size, off;
-  int result;
-  int first = 1;
-
-  off = size = 0;
-  result = 0;
-  for (;;) {
-    if (off >= size) {
-      if (first) pb = buffers.begin(); else ++pb;
-      if (pb == buffers.end()) {
-       break;
-      }
-      off = 0;
-      size = pb->length();
-      first = 0;
-    }
-    size_t count = size - off;
-    if (!count) continue;
-    if (req_size + count > MAX_XIO_BUF_SIZE) {
-       count = MAX_XIO_BUF_SIZE - req_size;
-    }
-
-    ++result;
-
-    /* advance iov and perhaps request */
-
-    off += count;
-    req_size += count;
-    ++msg_off;
-    if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
-      ++req_off;
-      msg_off = 0;
-      req_size = 0;
-    }
-  }
-
-  return result;
-}
-
-static inline void
-xio_place_buffers(const buffer::list& bl, XioMsg *xmsg, struct xio_msg*& req,
-                 struct xio_iovec_ex*& msg_iov, int& req_size,
-                 int ex_cnt, int& msg_off, int& req_off, bl_type type)
-{
-
-  const std::list<buffer::ptr>& buffers = bl.buffers();
-  list<bufferptr>::const_iterator pb;
-  struct xio_iovec_ex* iov;
-  size_t size, off;
-  const char *data = NULL;
-  int first = 1;
-
-  off = size = 0;
-  for (;;) {
-    if (off >= size) {
-      if (first) pb = buffers.begin(); else ++pb;
-      if (pb == buffers.end()) {
-       break;
-      }
-      off = 0;
-      size = pb->length();
-      data = pb->c_str();       // is c_str() efficient?
-      first = 0;
-    }
-    size_t count = size - off;
-    if (!count) continue;
-    if (req_size + count > MAX_XIO_BUF_SIZE) {
-       count = MAX_XIO_BUF_SIZE - req_size;
-    }
-
-    /* assign buffer */
-    iov = &msg_iov[msg_off];
-    iov->iov_base = (void *) (&data[off]);
-    iov->iov_len = count;
-
-    switch (type) {
-    case BUFFER_DATA:
-      //break;
-    default:
-    {
-      struct xio_reg_mem *mp = get_xio_mp(*pb);
-      iov->mr = (mp) ? mp->mr : NULL;
-    }
-      break;
-    }
-
-    /* advance iov(s) */
-
-    off += count;
-    req_size += count;
-    ++msg_off;
-
-    /* next request if necessary */
-
-    if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
-      /* finish this request */
-      req->out.pdata_iov.nents = msg_off;
-      /* advance to next, and write in it if it's not the last one. */
-      if (++req_off >= ex_cnt) {
-       req = 0;        /* poison.  trap if we try to use it. */
-       msg_iov = NULL;
-      } else {
-       req = &xmsg->req_arr[req_off].msg;
-       msg_iov = req->out.pdata_iov.sglist;
-      }
-      msg_off = 0;
-      req_size = 0;
-    }
-  }
-}
-
-int XioMessenger::bind(const entity_addr_t& addr)
-{
-  if (addr.is_blank_ip()) {
-      lderr(cct) << "ERROR: need rdma ip for remote use! " << dendl;
-      cout << "Error: xio bind failed. public/cluster ip not specified" << std::endl;
-      return -1;
-  }
-
-  entity_addr_t shift_addr = addr;
-  string base_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
-                                       shift_addr, false /* want_port */);
-  ldout(cct,4) << "XioMessenger " << this << " bind: xio_uri "
-    << base_uri << ':' << shift_addr.get_port() << dendl;
-
-  uint16_t port0;
-  int r = portals.bind(&xio_msgr_ops, base_uri, shift_addr.get_port(), &port0);
-  if (r == 0) {
-    shift_addr.set_port(port0);
-    shift_addr.nonce = nonce;
-    set_myaddr(shift_addr);
-    need_addr = false;
-    did_bind = true;
-  }
-  return r;
-} /* bind */
-
-int XioMessenger::rebind(const set<int>& avoid_ports)
-{
-  ldout(cct,4) << "XioMessenger " << this << " rebind attempt" << dendl;
-  return 0;
-} /* rebind */
-
-int XioMessenger::start()
-{
-  portals.start();
-  dispatch_strategy->start();
-  if (!did_bind) {
-         my_inst.addr.nonce = nonce;
-  }
-  started = true;
-  return 0;
-}
-
-void XioMessenger::wait()
-{
-  portals.join();
-  dispatch_strategy->wait();
-} /* wait */
-
-int XioMessenger::_send_message(Message *m, const entity_inst_t& dest)
-{
-  ConnectionRef conn = get_connection(dest);
-  if (conn)
-    return _send_message(m, &(*conn));
-  else
-    return EINVAL;
-} /* send_message(Message *, const entity_inst_t&) */
-
-static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
-  int ex_cnt)
-{
-  struct xio_reg_mem mp_mem;
-  int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioMsg), &mp_mem);
-  if (!!e)
-    return NULL;
-  XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
-  assert(!!xmsg);
-  new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
-  return xmsg;
-}
-
-XioCommand* pool_alloc_xio_command(XioConnection *xcon)
-{
-  struct xio_reg_mem mp_mem;
-  int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem);
-  if (!!e)
-    return NULL;
-  XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr);
-  assert(!!xcmd);
-  new (xcmd) XioCommand(xcon, mp_mem);
-  return xcmd;
-}
-
-int XioMessenger::_send_message(Message *m, Connection *con)
-{
-  if (con == loop_con.get() /* intrusive_ptr get() */) {
-    m->set_connection(con);
-    m->set_src(get_myinst().name);
-    m->set_seq(loop_con->next_seq());
-    ds_dispatch(m);
-    return 0;
-  }
-
-  XioConnection *xcon = static_cast<XioConnection*>(con);
-
-  /* If con is not in READY state, we have to enforce policy */
-  if (xcon->cstate.session_state.read() != XioConnection::UP) {
-    pthread_spin_lock(&xcon->sp);
-    if (xcon->cstate.session_state.read() != XioConnection::UP) {
-      xcon->outgoing.mqueue.push_back(*m);
-      pthread_spin_unlock(&xcon->sp);
-      return 0;
-    }
-    pthread_spin_unlock(&xcon->sp);
-  }
-
-  return _send_message_impl(m, xcon);
-} /* send_message(Message* m, Connection *con) */
-
-int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon)
-{
-  int code = 0;
-
-  Mutex::Locker l(xcon->lock);
-  if (unlikely(XioPool::trace_mempool)) {
-    static uint32_t nreqs;
-    if (unlikely((++nreqs % 65536) == 0)) {
-      xp_stats.dump(__func__, nreqs);
-    }
-  }
-
-  m->set_seq(xcon->state.next_out_seq());
-  m->set_magic(magic); // trace flags and special handling
-
-  m->encode(xcon->get_features(), this->crcflags);
-
-  buffer::list &payload = m->get_payload();
-  buffer::list &middle = m->get_middle();
-  buffer::list &data = m->get_data();
-
-  int msg_off = 0;
-  int req_off = 0;
-  int req_size = 0;
-  int nbuffers =
-    xio_count_buffers(payload, req_size, msg_off, req_off) +
-    xio_count_buffers(middle, req_size, msg_off, req_off) +
-    xio_count_buffers(data, req_size, msg_off, req_off);
-
-  int ex_cnt = req_off;
-  if (msg_off == 0 && ex_cnt > 0) {
-    // no buffers for last msg
-    ldout(cct,10) << "msg_off 0, ex_cnt " << ex_cnt << " -> " << ex_cnt-1 << dendl;
-    ex_cnt--;
-  }
-
-  /* get an XioMsg frame */
-  XioMsg *xmsg = pool_alloc_xio_msg(m, xcon, ex_cnt);
-  if (! xmsg) {
-    /* could happen if Accelio has been shutdown */
-    return ENOMEM;
-  }
-
-  ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg
-       << " tag " << (int)xmsg->hdr.tag
-       << " req_0 " << xmsg->get_xio_msg() << " msg type " << m->get_type()
-       << " features: " << xcon->get_features()
-       << " conn " << xcon->conn << " sess " << xcon->session << dendl;
-
-  if (magic & (MSG_MAGIC_XIO)) {
-
-    /* XXXX verify */
-    switch (m->get_type()) {
-    case 43:
-    // case 15:
-      ldout(cct,4) << __func__ << "stop 43 " << m->get_type() << " " << *m << dendl;
-      buffer::list &payload = m->get_payload();
-      ldout(cct,4) << __func__ << "payload dump:" << dendl;
-      payload.hexdump(cout);
-    }
-  }
-
-  struct xio_msg *req = xmsg->get_xio_msg();
-  struct xio_iovec_ex *msg_iov = req->out.pdata_iov.sglist;
-
-  if (magic & (MSG_MAGIC_XIO)) {
-    ldout(cct,4) << "payload: " << payload.buffers().size() <<
-      " middle: " << middle.buffers().size() <<
-      " data: " << data.buffers().size() <<
-      dendl;
-  }
-
-  if (unlikely(ex_cnt > 0)) {
-    ldout(cct,4) << __func__ << " buffer cnt > XIO_MSGR_IOVLEN (" <<
-      ((XIO_MSGR_IOVLEN-1) + nbuffers) << ")" << dendl;
-  }
-
-  /* do the invariant part */
-  msg_off = 0;
-  req_off = -1; /* most often, not used */
-  req_size = 0;
-
-  xio_place_buffers(payload, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
-                   req_off, BUFFER_PAYLOAD);
-
-  xio_place_buffers(middle, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
-                   req_off, BUFFER_MIDDLE);
-
-  xio_place_buffers(data, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
-                   req_off, BUFFER_DATA);
-  ldout(cct,10) << "ex_cnt " << ex_cnt << ", req_off " << req_off
-    << ", msg_cnt " << xmsg->get_msg_count() << dendl;
-
-  /* finalize request */
-  if (msg_off)
-    req->out.pdata_iov.nents = msg_off;
-
-  /* fixup first msg */
-  req = xmsg->get_xio_msg();
-
-  const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers();
-  assert(header.size() == 1); /* XXX */
-  list<bufferptr>::const_iterator pb = header.begin();
-  req->out.header.iov_base = (char*) pb->c_str();
-  req->out.header.iov_len = pb->length();
-
-  /* deliver via xio, preserve ordering */
-  if (xmsg->get_msg_count() > 1) {
-    struct xio_msg *head = xmsg->get_xio_msg();
-    struct xio_msg *tail = head;
-    for (req_off = 0; ((unsigned) req_off) < xmsg->get_msg_count()-1; ++req_off) {
-      req = &xmsg->req_arr[req_off].msg;
-assert(!req->in.pdata_iov.nents);
-assert(req->out.pdata_iov.nents || !nbuffers);
-      tail->next = req;
-      tail = req;
-     }
-    tail->next = NULL;
-  }
-  xmsg->trace = m->trace;
-  m->trace.event("xio portal enqueue for send");
-  m->trace.keyval("xio message segments", xmsg->hdr.msg_cnt);
-  xcon->portal->enqueue_for_send(xcon, xmsg);
-
-  return code;
-} /* send_message(Message *, Connection *) */
-
-int XioMessenger::shutdown()
-{
-  shutdown_called = true;
-  conns_sp.lock();
-  XioConnection::ConnList::iterator iter;
-  iter = conns_list.begin();
-  for (iter = conns_list.begin(); iter != conns_list.end(); ++iter) {
-    (void) iter->disconnect(); // XXX mark down?
-  }
-  conns_sp.unlock();
-  while(nsessions > 0) {
-    Mutex::Locker lck(sh_mtx);
-    if (nsessions > 0)
-      sh_cond.Wait(sh_mtx);
-  }
-  portals.shutdown();
-  dispatch_strategy->shutdown();
-  did_bind = false;
-  started = false;
-  return 0;
-} /* shutdown */
-
-ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest)
-{
-  if (shutdown_called)
-    return NULL;
-
-  const entity_inst_t& self_inst = get_myinst();
-  if ((&dest == &self_inst) ||
-      (dest == self_inst)) {
-    return get_loopback_connection();
-  }
-
-  conns_sp.lock();
-  XioConnection::EntitySet::iterator conn_iter =
-    conns_entity_map.find(dest, XioConnection::EntityComp());
-  if (conn_iter != conns_entity_map.end()) {
-    ConnectionRef cref = &(*conn_iter);
-    conns_sp.unlock();
-    return cref;
-  }
-  else {
-    conns_sp.unlock();
-    string xio_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
-                                        dest.addr, true /* want_port */);
-
-    ldout(cct,4) << "XioMessenger " << this << " get_connection: xio_uri "
-      << xio_uri << dendl;
-
-    /* XXX client session creation parameters */
-    struct xio_session_params params = {};
-    params.type         = XIO_SESSION_CLIENT;
-    params.ses_ops      = &xio_msgr_ops;
-    params.user_context = this;
-    params.uri          = xio_uri.c_str();
-
-    XioConnection *xcon = new XioConnection(this, XioConnection::ACTIVE,
-                                           dest);
-
-    xcon->session = xio_session_create(&params);
-    if (! xcon->session) {
-      delete xcon;
-      return NULL;
-    }
-
-    /* this should cause callbacks with user context of conn, but
-     * we can always set it explicitly */
-    struct xio_connection_params xcp = {};
-    xcp.session           = xcon->session;
-    xcp.ctx               = xcon->portal->ctx;
-    xcp.conn_user_context = xcon;
-
-    xcon->conn = xio_connect(&xcp);
-    if (!xcon->conn) {
-      xio_session_destroy(xcon->session);
-      delete xcon;
-      return NULL;
-    }
-
-    nsessions++;
-    xcon->connected = true;
-
-    /* sentinel ref */
-    xcon->get(); /* xcon->nref == 1 */
-    conns_sp.lock();
-    conns_list.push_back(*xcon);
-    conns_entity_map.insert(*xcon);
-    conns_sp.unlock();
-
-    /* XXXX pre-merge of session startup negotiation ONLY! */
-    xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
-
-    ldout(cct,2) << "New connection xcon: " << xcon <<
-      " up_ready on session " << xcon->session <<
-      " on msgr: " << this << " portal: " << xcon->portal << dendl;
-
-    return xcon->get(); /* nref +1 */
-  }
-} /* get_connection */
-
-ConnectionRef XioMessenger::get_loopback_connection()
-{
-  return (loop_con.get());
-} /* get_loopback_connection */
-
-void XioMessenger::unregister_xcon(XioConnection *xcon)
-{
-  Spinlock::Locker lckr(conns_sp);
-
-  XioConnection::EntitySet::iterator conn_iter =
-       conns_entity_map.find(xcon->peer, XioConnection::EntityComp());
-  if (conn_iter != conns_entity_map.end()) {
-       XioConnection *xcon2 = &(*conn_iter);
-       if (xcon == xcon2) {
-         conns_entity_map.erase(conn_iter);
-       }
-  }
-
-  /* check if citer on conn_list */
-  if (xcon->conns_hook.is_linked()) {
-    /* now find xcon on conns_list and erase */
-    XioConnection::ConnList::iterator citer =
-        XioConnection::ConnList::s_iterator_to(*xcon);
-    conns_list.erase(citer);
-  }
-}
-
-void XioMessenger::mark_down(const entity_addr_t& addr)
-{
-  entity_inst_t inst(entity_name_t(), addr);
-  Spinlock::Locker lckr(conns_sp);
-  XioConnection::EntitySet::iterator conn_iter =
-    conns_entity_map.find(inst, XioConnection::EntityComp());
-  if (conn_iter != conns_entity_map.end()) {
-      (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
-    }
-} /* mark_down(const entity_addr_t& */
-
-void XioMessenger::mark_down(Connection* con)
-{
-  XioConnection *xcon = static_cast<XioConnection*>(con);
-  xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
-} /* mark_down(Connection*) */
-
-void XioMessenger::mark_down_all()
-{
-  Spinlock::Locker lckr(conns_sp);
-  XioConnection::EntitySet::iterator conn_iter;
-  for (conn_iter = conns_entity_map.begin(); conn_iter !=
-        conns_entity_map.begin(); ++conn_iter) {
-    (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
-  }
-} /* mark_down_all */
-
-static inline XioMarkDownHook* pool_alloc_markdown_hook(
-  XioConnection *xcon, Message *m)
-{
-  struct xio_reg_mem mp_mem;
-  int e = xio_mempool_alloc(xio_msgr_noreg_mpool,
-                           sizeof(XioMarkDownHook), &mp_mem);
-  if (!!e)
-    return NULL;
-  XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr);
-  new (hook) XioMarkDownHook(xcon, m, mp_mem);
-  return hook;
-}
-
-void XioMessenger::mark_down_on_empty(Connection* con)
-{
-  XioConnection *xcon = static_cast<XioConnection*>(con);
-  MNop* m = new MNop();
-  m->tag = XIO_NOP_TAG_MARKDOWN;
-  m->set_completion_hook(pool_alloc_markdown_hook(xcon, m));
-  // stall new messages
-  xcon->cstate.session_state = XioConnection::session_states::BARRIER;
-  (void) _send_message_impl(m, xcon);
-}
-
-void XioMessenger::mark_disposable(Connection *con)
-{
-  XioConnection *xcon = static_cast<XioConnection*>(con);
-  xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE);
-}
-
-void XioMessenger::try_insert(XioConnection *xcon)
-{
-  Spinlock::Locker lckr(conns_sp);
-  /* already resident in conns_list */
-  conns_entity_map.insert(*xcon);
-}
-
-XioMessenger::~XioMessenger()
-{
-  delete dispatch_strategy;
-  nInstances--;
-} /* dtor */