initial code repo
[stor4nfv.git] / src / ceph / src / msg / xio / XioMessenger.cc
diff --git a/src/ceph/src/msg/xio/XioMessenger.cc b/src/ceph/src/msg/xio/XioMessenger.cc
new file mode 100644 (file)
index 0000000..6bf4d52
--- /dev/null
@@ -0,0 +1,1137 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <arpa/inet.h>
+#include <boost/lexical_cast.hpp>
+#include <set>
+#include <stdlib.h>
+#include <memory>
+
+#include "XioMsg.h"
+#include "XioMessenger.h"
+#include "common/address_helper.h"
+#include "common/code_environment.h"
+#include "messages/MNop.h"
+
+#define dout_subsys ceph_subsys_xio
+#undef dout_prefix
+#define dout_prefix *_dout << "xio."
+
+Mutex mtx("XioMessenger Package Lock");
+std::atomic<bool> initialized = { false };
+
+std::atomic<unsigned> XioMessenger::nInstances = { 0 };
+
+struct xio_mempool *xio_msgr_noreg_mpool;
+
+static struct xio_session_ops xio_msgr_ops;
+
+/* Accelio API callouts */
+
+namespace xio_log
+{
+typedef pair<const char*, int> level_pair;
+static const level_pair LEVELS[] = {
+  make_pair("fatal", 0),
+  make_pair("error", 0),
+  make_pair("warn", 1),
+  make_pair("info", 1),
+  make_pair("debug", 2),
+  make_pair("trace", 20)
+};
+
+static CephContext *context;
+
+int get_level()
+{
+  int level = 0;
+  for (size_t i = 0; i < sizeof(LEVELS); i++) {
+    if (!ldlog_p1(context, dout_subsys, LEVELS[i].second))
+      break;
+    level++;
+  }
+  return level;
+}
+
+void log_dout(const char *file, unsigned line,
+             const char *function, unsigned level,
+             const char *fmt, ...)
+{
+  char buffer[2048];
+  va_list args;
+  va_start(args, fmt);
+  int n = vsnprintf(buffer, sizeof(buffer), fmt, args);
+  va_end(args);
+
+  if (n > 0) {
+    const char *short_file = strrchr(file, '/');
+    short_file = (short_file == NULL) ? file : short_file + 1;
+
+    const level_pair &lvl = LEVELS[level];
+    ldout(context, lvl.second) << '[' << lvl.first << "] "
+      << short_file << ':' << line << ' '
+      << function << " - " << buffer << dendl;
+  }
+}
+}
+
+static int on_session_event(struct xio_session *session,
+                           struct xio_session_event_data *event_data,
+                           void *cb_user_context)
+{
+  XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
+  CephContext *cct = msgr->cct;
+
+  ldout(cct,4) << "session event: " << xio_session_event_str(event_data->event)
+    << ". reason: " << xio_strerror(event_data->reason) << dendl;
+
+  return msgr->session_event(session, event_data, cb_user_context);
+}
+
+static int on_new_session(struct xio_session *session,
+                         struct xio_new_session_req *req,
+                         void *cb_user_context)
+{
+  XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
+  CephContext *cct = msgr->cct;
+
+  ldout(cct,4) << "new session " << session
+    << " user_context " << cb_user_context << dendl;
+
+  return (msgr->new_session(session, req, cb_user_context));
+}
+
+static int on_msg(struct xio_session *session,
+                 struct xio_msg *req,
+                 int more_in_batch,
+                 void *cb_user_context)
+{
+  XioConnection* xcon __attribute__((unused)) =
+    static_cast<XioConnection*>(cb_user_context);
+  CephContext *cct = xcon->get_messenger()->cct;
+
+  ldout(cct,25) << "on_msg session " << session << " xcon " << xcon << dendl;
+
+  if (unlikely(XioPool::trace_mempool)) {
+    static uint32_t nreqs;
+    if (unlikely((++nreqs % 65536) == 0)) {
+      xp_stats.dump(__func__, nreqs);
+    }
+  }
+
+  return xcon->on_msg(session, req, more_in_batch,
+                         cb_user_context);
+}
+
+static int on_ow_msg_send_complete(struct xio_session *session,
+                                  struct xio_msg *msg,
+                                  void *conn_user_context)
+{
+  XioConnection *xcon =
+    static_cast<XioConnection*>(conn_user_context);
+  CephContext *cct = xcon->get_messenger()->cct;
+
+  ldout(cct,25) << "msg delivered session: " << session
+               << " msg: " << msg << " conn_user_context "
+               << conn_user_context << dendl;
+
+  return xcon->on_ow_msg_send_complete(session, msg, conn_user_context);
+}
+
+static int on_msg_error(struct xio_session *session,
+                       enum xio_status error,
+                       enum xio_msg_direction dir,
+                       struct xio_msg  *msg,
+                       void *conn_user_context)
+{
+  /* XIO promises to flush back undelivered messages */
+  XioConnection *xcon =
+    static_cast<XioConnection*>(conn_user_context);
+  CephContext *cct = xcon->get_messenger()->cct;
+
+  ldout(cct,4) << "msg error session: " << session
+    << " error: " << xio_strerror(error) << " msg: " << msg
+    << " conn_user_context " << conn_user_context << dendl;
+
+  return xcon->on_msg_error(session, error, msg, conn_user_context);
+}
+
+static int on_cancel(struct xio_session *session,
+                    struct xio_msg  *msg,
+                    enum xio_status result,
+                    void *conn_user_context)
+{
+  XioConnection* xcon __attribute__((unused)) =
+    static_cast<XioConnection*>(conn_user_context);
+  CephContext *cct = xcon->get_messenger()->cct;
+
+  ldout(cct,25) << "on cancel: session: " << session << " msg: " << msg
+    << " conn_user_context " << conn_user_context << dendl;
+
+  return 0;
+}
+
+static int on_cancel_request(struct xio_session *session,
+                            struct xio_msg  *msg,
+                            void *conn_user_context)
+{
+  XioConnection* xcon __attribute__((unused)) =
+    static_cast<XioConnection*>(conn_user_context);
+  CephContext *cct = xcon->get_messenger()->cct;
+
+  ldout(cct,25) << "on cancel request: session: " << session << " msg: " << msg
+    << " conn_user_context " << conn_user_context << dendl;
+
+  return 0;
+}
+
+/* free functions */
+static string xio_uri_from_entity(const string &type,
+                                 const entity_addr_t& addr, bool want_port)
+{
+  const char *host = NULL;
+  char addr_buf[129];
+  string xio_uri;
+
+  switch(addr.get_family()) {
+  case AF_INET:
+    host = inet_ntop(AF_INET, &addr.in4_addr().sin_addr, addr_buf,
+                    INET_ADDRSTRLEN);
+    break;
+  case AF_INET6:
+    host = inet_ntop(AF_INET6, &addr.in6_addr().sin6_addr, addr_buf,
+                    INET6_ADDRSTRLEN);
+    break;
+  default:
+    abort();
+    break;
+  };
+
+  if (type == "rdma" || type == "tcp")
+      xio_uri = type + "://";
+  else
+      xio_uri = "rdma://";
+
+  /* The following can only succeed if the host is rdma-capable */
+  xio_uri += host;
+  if (want_port) {
+    xio_uri += ":";
+    xio_uri += boost::lexical_cast<std::string>(addr.get_port());
+  }
+
+  return xio_uri;
+} /* xio_uri_from_entity */
+
+void XioInit::package_init(CephContext *cct) {
+   if (! initialized) {
+
+     mtx.Lock();
+     if (! initialized) {
+
+       xio_init();
+
+       // claim a reference to the first context we see
+       xio_log::context = cct->get();
+
+       int xopt;
+       xopt = xio_log::get_level();
+       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_LEVEL,
+                 &xopt, sizeof(xopt));
+       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_FN,
+                 (const void*)xio_log::log_dout, sizeof(xio_log_fn));
+
+       xopt = 1;
+       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_DISABLE_HUGETBL,
+                 &xopt, sizeof(xopt));
+
+       if (g_code_env == CODE_ENVIRONMENT_DAEMON) {
+         xopt = 1;
+         xio_set_opt(NULL, XIO_OPTLEVEL_RDMA, XIO_OPTNAME_ENABLE_FORK_INIT,
+                   &xopt, sizeof(xopt));
+       }
+
+       xopt = XIO_MSGR_IOVLEN;
+       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_IN_IOVLEN,
+                 &xopt, sizeof(xopt));
+       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN,
+                 &xopt, sizeof(xopt));
+
+       /* enable flow-control */
+       xopt = 1;
+       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL,
+                  &xopt, sizeof(xopt));
+
+       /* and set threshold for buffer callouts */
+       xopt = max(cct->_conf->xio_max_send_inline, 512);
+       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_DATA,
+                  &xopt, sizeof(xopt));
+
+       xopt = XioMsgHdr::get_max_encoded_length();
+       ldout(cct,2) << "setting accelio max header size " << xopt << dendl;
+       xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_HEADER,
+                  &xopt, sizeof(xopt));
+
+       size_t queue_depth = cct->_conf->xio_queue_depth;
+       struct xio_mempool_config mempool_config = {
+         6,
+         {
+           {1024,  0,  queue_depth,  262144},
+           {4096,  0,  queue_depth,  262144},
+           {16384, 0,  queue_depth,  262144},
+           {65536, 0,  128,  65536},
+           {262144, 0,  32,  16384},
+           {1048576, 0, 8,  8192}
+         }
+       };
+       xio_set_opt(NULL,
+                   XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_CONFIG_MEMPOOL,
+                   &mempool_config, sizeof(mempool_config));
+
+       /* and unregisterd one */
+ #define XMSG_MEMPOOL_QUANTUM 4096
+
+       xio_msgr_noreg_mpool =
+       xio_mempool_create(-1 /* nodeid */,
+                          XIO_MEMPOOL_FLAG_REGULAR_PAGES_ALLOC);
+
+       (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 64,
+                                      cct->_conf->xio_mp_min,
+                                      cct->_conf->xio_mp_max_64,
+                                      XMSG_MEMPOOL_QUANTUM, 0);
+       (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 256,
+                                      cct->_conf->xio_mp_min,
+                                      cct->_conf->xio_mp_max_256,
+                                      XMSG_MEMPOOL_QUANTUM, 0);
+       (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 1024,
+                                      cct->_conf->xio_mp_min,
+                                      cct->_conf->xio_mp_max_1k,
+                                      XMSG_MEMPOOL_QUANTUM, 0);
+       (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, getpagesize(),
+                                      cct->_conf->xio_mp_min,
+                                      cct->_conf->xio_mp_max_page,
+                                      XMSG_MEMPOOL_QUANTUM, 0);
+
+       /* initialize ops singleton */
+       xio_msgr_ops.on_session_event = on_session_event;
+       xio_msgr_ops.on_new_session = on_new_session;
+       xio_msgr_ops.on_session_established = NULL;
+       xio_msgr_ops.on_msg = on_msg;
+       xio_msgr_ops.on_ow_msg_send_complete = on_ow_msg_send_complete;
+       xio_msgr_ops.on_msg_error = on_msg_error;
+       xio_msgr_ops.on_cancel = on_cancel;
+       xio_msgr_ops.on_cancel_request = on_cancel_request;
+
+       /* mark initialized */
+       initialized = true;
+     }
+     mtx.Unlock();
+   }
+ }
+
+/* XioMessenger */
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) {
+  return *_dout << "-- " << msgr->get_myaddr() << " ";
+}
+
+XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
+                          string mname, uint64_t _nonce,
+                          uint64_t cflags, DispatchStrategy *ds)
+  : SimplePolicyMessenger(cct, name, mname, _nonce),
+    XioInit(cct),
+    portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)),
+    dispatch_strategy(ds),
+    loop_con(new XioLoopbackConnection(this)),
+    special_handling(0),
+    sh_mtx("XioMessenger session mutex"),
+    sh_cond(),
+    need_addr(true),
+    did_bind(false),
+    nonce(_nonce)
+{
+
+  if (cct->_conf->xio_trace_xcon)
+    magic |= MSG_MAGIC_TRACE_XCON;
+
+  XioPool::trace_mempool = (cct->_conf->xio_trace_mempool);
+  XioPool::trace_msgcnt = (cct->_conf->xio_trace_msgcnt);
+
+  dispatch_strategy->set_messenger(this);
+
+  /* update class instance count */
+  nInstances++;
+
+  loop_con->set_features(CEPH_FEATURES_ALL);
+
+  ldout(cct,2) << "Create msgr: " << this << " instance: "
+    << nInstances << " type: " << name.type_str()
+    << " subtype: " << mname << " nportals: " << get_nportals(cflags)
+    << " nconns_per_portal: " << get_nconns_per_portal(cflags)
+    << dendl;
+
+} /* ctor */
+
+int XioMessenger::pool_hint(uint32_t dsize) {
+  if (dsize > 1024*1024)
+    return 0;
+
+  /* if dsize is already present, returns -EEXIST */
+  return xio_mempool_add_slab(xio_msgr_noreg_mpool, dsize, 0,
+                                  cct->_conf->xio_mp_max_hint,
+                                  XMSG_MEMPOOL_QUANTUM, 0);
+}
+
+int XioMessenger::get_nconns_per_portal(uint64_t cflags)
+{
+  const int XIO_DEFAULT_NUM_CONNS_PER_PORTAL = 8;
+  int nconns = XIO_DEFAULT_NUM_CONNS_PER_PORTAL;
+
+  if (cflags & Messenger::HAS_MANY_CONNECTIONS)
+    nconns = max(cct->_conf->xio_max_conns_per_portal, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
+  else if (cflags & Messenger::HEARTBEAT)
+    nconns = max(cct->_conf->osd_heartbeat_min_peers * 4, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
+
+  return nconns;
+}
+
+int XioMessenger::get_nportals(uint64_t cflags)
+{
+  int nportals = 1;
+
+  if (cflags & Messenger::HAS_HEAVY_TRAFFIC)
+    nportals = max(cct->_conf->xio_portal_threads, 1);
+
+  return nportals;
+}
+
+void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
+{
+  // be careful here: multiple threads may block here, and readers of
+  // my_inst.addr do NOT hold any lock.
+
+  // this always goes from true -> false under the protection of the
+  // mutex.  if it is already false, we need not retake the mutex at
+  // all.
+  if (!need_addr)
+    return;
+
+  sh_mtx.Lock();
+  if (need_addr) {
+    entity_addr_t t = peer_addr_for_me;
+    t.set_port(my_inst.addr.get_port());
+    my_inst.addr.set_sockaddr(t.get_sockaddr());
+    ldout(cct,2) << "learned my addr " << my_inst.addr << dendl;
+    need_addr = false;
+    // init_local_connection();
+  }
+  sh_mtx.Unlock();
+
+}
+
+int XioMessenger::new_session(struct xio_session *session,
+                             struct xio_new_session_req *req,
+                             void *cb_user_context)
+{
+  if (shutdown_called) {
+    return xio_reject(
+      session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */);
+  }
+  int code = portals.accept(session, req, cb_user_context);
+  if (! code)
+    nsessions++;
+  return code;
+} /* new_session */
+
+int XioMessenger::session_event(struct xio_session *session,
+                               struct xio_session_event_data *event_data,
+                               void *cb_user_context)
+{
+  XioConnection *xcon;
+
+  switch (event_data->event) {
+  case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT:
+  {
+    struct xio_connection *conn = event_data->conn;
+    struct xio_connection_attr xcona;
+    entity_addr_t peer_addr_for_me, paddr;
+
+    xcon = static_cast<XioConnection*>(event_data->conn_user_context);
+
+    ldout(cct,2) << "connection established " << event_data->conn
+      << " session " << session << " xcon " << xcon << dendl;
+
+    (void) xio_query_connection(conn, &xcona,
+                               XIO_CONNECTION_ATTR_LOCAL_ADDR|
+                               XIO_CONNECTION_ATTR_PEER_ADDR);
+    peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
+    paddr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
+    //set_myaddr(peer_addr_for_me);
+    learned_addr(peer_addr_for_me);
+    ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl;
+
+    /* notify hook */
+    this->ms_deliver_handle_connect(xcon);
+    this->ms_deliver_handle_fast_connect(xcon);
+  }
+  break;
+
+  case XIO_SESSION_NEW_CONNECTION_EVENT:
+  {
+    struct xio_connection *conn = event_data->conn;
+    struct xio_connection_attr xcona;
+    entity_inst_t s_inst;
+    entity_addr_t peer_addr_for_me;
+
+    (void) xio_query_connection(conn, &xcona,
+                               XIO_CONNECTION_ATTR_CTX|
+                               XIO_CONNECTION_ATTR_PEER_ADDR|
+                               XIO_CONNECTION_ATTR_LOCAL_ADDR);
+    /* XXX assumes RDMA */
+    s_inst.addr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
+    peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
+
+    xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst);
+    xcon->session = session;
+
+    struct xio_context_attr xctxa;
+    (void) xio_query_context(xcona.ctx, &xctxa, XIO_CONTEXT_ATTR_USER_CTX);
+
+    xcon->conn = conn;
+    xcon->portal = static_cast<XioPortal*>(xctxa.user_context);
+    assert(xcon->portal);
+
+    xcona.user_context = xcon;
+    (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX);
+
+    xcon->connected = true;
+
+    /* sentinel ref */
+    xcon->get(); /* xcon->nref == 1 */
+    conns_sp.lock();
+    conns_list.push_back(*xcon);
+    /* XXX we can't put xcon in conns_entity_map becase we don't yet know
+     * it's peer address */
+    conns_sp.unlock();
+
+    /* XXXX pre-merge of session startup negotiation ONLY! */
+    xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+
+    ldout(cct,2) << "New connection session " << session
+      << " xcon " << xcon << " on msgr: " << this << " portal: " << xcon->portal << dendl;
+    ldout(cct,2) << "Server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl;
+  }
+  break;
+  case XIO_SESSION_CONNECTION_ERROR_EVENT:
+  case XIO_SESSION_CONNECTION_CLOSED_EVENT: /* orderly discon */
+  case XIO_SESSION_CONNECTION_DISCONNECTED_EVENT: /* unexpected discon */
+  case XIO_SESSION_CONNECTION_REFUSED_EVENT:
+    xcon = static_cast<XioConnection*>(event_data->conn_user_context);
+    ldout(cct,2) << xio_session_event_str(event_data->event)
+      << " xcon " << xcon << " session " << session  << dendl;
+    if (likely(!!xcon)) {
+      unregister_xcon(xcon);
+      xcon->on_disconnect_event();
+    }
+    break;
+  case XIO_SESSION_CONNECTION_TEARDOWN_EVENT:
+    xcon = static_cast<XioConnection*>(event_data->conn_user_context);
+    ldout(cct,2) << xio_session_event_str(event_data->event)
+      << " xcon " << xcon << " session " << session << dendl;
+    /*
+     * There are flows where Accelio sends teardown event without going
+     * through disconnect event. so we make sure we cleaned the connection.
+     */
+    unregister_xcon(xcon);
+    xcon->on_teardown_event();
+    break;
+  case XIO_SESSION_TEARDOWN_EVENT:
+    ldout(cct,2) << xio_session_event_str(event_data->event)
+      << " session " << session << dendl;
+    if (unlikely(XioPool::trace_mempool)) {
+      xp_stats.dump("xio session dtor", reinterpret_cast<uint64_t>(session));
+    }
+    xio_session_destroy(session);
+    if (--nsessions == 0) {
+      Mutex::Locker lck(sh_mtx);
+      if (nsessions == 0)
+       sh_cond.Signal();
+    }
+    break;
+  default:
+    break;
+  };
+
+  return 0;
+}
+
+enum bl_type
+{
+  BUFFER_PAYLOAD,
+  BUFFER_MIDDLE,
+  BUFFER_DATA
+};
+
+#define MAX_XIO_BUF_SIZE 1044480
+
+static inline int
+xio_count_buffers(const buffer::list& bl, int& req_size, int& msg_off, int& req_off)
+{
+
+  const std::list<buffer::ptr>& buffers = bl.buffers();
+  list<bufferptr>::const_iterator pb;
+  size_t size, off;
+  int result;
+  int first = 1;
+
+  off = size = 0;
+  result = 0;
+  for (;;) {
+    if (off >= size) {
+      if (first) pb = buffers.begin(); else ++pb;
+      if (pb == buffers.end()) {
+       break;
+      }
+      off = 0;
+      size = pb->length();
+      first = 0;
+    }
+    size_t count = size - off;
+    if (!count) continue;
+    if (req_size + count > MAX_XIO_BUF_SIZE) {
+       count = MAX_XIO_BUF_SIZE - req_size;
+    }
+
+    ++result;
+
+    /* advance iov and perhaps request */
+
+    off += count;
+    req_size += count;
+    ++msg_off;
+    if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
+      ++req_off;
+      msg_off = 0;
+      req_size = 0;
+    }
+  }
+
+  return result;
+}
+
+static inline void
+xio_place_buffers(const buffer::list& bl, XioMsg *xmsg, struct xio_msg*& req,
+                 struct xio_iovec_ex*& msg_iov, int& req_size,
+                 int ex_cnt, int& msg_off, int& req_off, bl_type type)
+{
+
+  const std::list<buffer::ptr>& buffers = bl.buffers();
+  list<bufferptr>::const_iterator pb;
+  struct xio_iovec_ex* iov;
+  size_t size, off;
+  const char *data = NULL;
+  int first = 1;
+
+  off = size = 0;
+  for (;;) {
+    if (off >= size) {
+      if (first) pb = buffers.begin(); else ++pb;
+      if (pb == buffers.end()) {
+       break;
+      }
+      off = 0;
+      size = pb->length();
+      data = pb->c_str();       // is c_str() efficient?
+      first = 0;
+    }
+    size_t count = size - off;
+    if (!count) continue;
+    if (req_size + count > MAX_XIO_BUF_SIZE) {
+       count = MAX_XIO_BUF_SIZE - req_size;
+    }
+
+    /* assign buffer */
+    iov = &msg_iov[msg_off];
+    iov->iov_base = (void *) (&data[off]);
+    iov->iov_len = count;
+
+    switch (type) {
+    case BUFFER_DATA:
+      //break;
+    default:
+    {
+      struct xio_reg_mem *mp = get_xio_mp(*pb);
+      iov->mr = (mp) ? mp->mr : NULL;
+    }
+      break;
+    }
+
+    /* advance iov(s) */
+
+    off += count;
+    req_size += count;
+    ++msg_off;
+
+    /* next request if necessary */
+
+    if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
+      /* finish this request */
+      req->out.pdata_iov.nents = msg_off;
+      /* advance to next, and write in it if it's not the last one. */
+      if (++req_off >= ex_cnt) {
+       req = 0;        /* poison.  trap if we try to use it. */
+       msg_iov = NULL;
+      } else {
+       req = &xmsg->req_arr[req_off].msg;
+       msg_iov = req->out.pdata_iov.sglist;
+      }
+      msg_off = 0;
+      req_size = 0;
+    }
+  }
+}
+
+int XioMessenger::bind(const entity_addr_t& addr)
+{
+  if (addr.is_blank_ip()) {
+      lderr(cct) << "ERROR: need rdma ip for remote use! " << dendl;
+      cout << "Error: xio bind failed. public/cluster ip not specified" << std::endl;
+      return -1;
+  }
+
+  entity_addr_t shift_addr = addr;
+  string base_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
+                                       shift_addr, false /* want_port */);
+  ldout(cct,4) << "XioMessenger " << this << " bind: xio_uri "
+    << base_uri << ':' << shift_addr.get_port() << dendl;
+
+  uint16_t port0;
+  int r = portals.bind(&xio_msgr_ops, base_uri, shift_addr.get_port(), &port0);
+  if (r == 0) {
+    shift_addr.set_port(port0);
+    shift_addr.nonce = nonce;
+    set_myaddr(shift_addr);
+    need_addr = false;
+    did_bind = true;
+  }
+  return r;
+} /* bind */
+
+int XioMessenger::rebind(const set<int>& avoid_ports)
+{
+  ldout(cct,4) << "XioMessenger " << this << " rebind attempt" << dendl;
+  return 0;
+} /* rebind */
+
+int XioMessenger::start()
+{
+  portals.start();
+  dispatch_strategy->start();
+  if (!did_bind) {
+         my_inst.addr.nonce = nonce;
+  }
+  started = true;
+  return 0;
+}
+
+void XioMessenger::wait()
+{
+  portals.join();
+  dispatch_strategy->wait();
+} /* wait */
+
+int XioMessenger::_send_message(Message *m, const entity_inst_t& dest)
+{
+  ConnectionRef conn = get_connection(dest);
+  if (conn)
+    return _send_message(m, &(*conn));
+  else
+    return EINVAL;
+} /* send_message(Message *, const entity_inst_t&) */
+
+static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
+  int ex_cnt)
+{
+  struct xio_reg_mem mp_mem;
+  int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioMsg), &mp_mem);
+  if (!!e)
+    return NULL;
+  XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
+  assert(!!xmsg);
+  new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
+  return xmsg;
+}
+
+XioCommand* pool_alloc_xio_command(XioConnection *xcon)
+{
+  struct xio_reg_mem mp_mem;
+  int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem);
+  if (!!e)
+    return NULL;
+  XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr);
+  assert(!!xcmd);
+  new (xcmd) XioCommand(xcon, mp_mem);
+  return xcmd;
+}
+
+int XioMessenger::_send_message(Message *m, Connection *con)
+{
+  if (con == loop_con.get() /* intrusive_ptr get() */) {
+    m->set_connection(con);
+    m->set_src(get_myinst().name);
+    m->set_seq(loop_con->next_seq());
+    ds_dispatch(m);
+    return 0;
+  }
+
+  XioConnection *xcon = static_cast<XioConnection*>(con);
+
+  /* If con is not in READY state, we have to enforce policy */
+  if (xcon->cstate.session_state.read() != XioConnection::UP) {
+    pthread_spin_lock(&xcon->sp);
+    if (xcon->cstate.session_state.read() != XioConnection::UP) {
+      xcon->outgoing.mqueue.push_back(*m);
+      pthread_spin_unlock(&xcon->sp);
+      return 0;
+    }
+    pthread_spin_unlock(&xcon->sp);
+  }
+
+  return _send_message_impl(m, xcon);
+} /* send_message(Message* m, Connection *con) */
+
+int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon)
+{
+  int code = 0;
+
+  Mutex::Locker l(xcon->lock);
+  if (unlikely(XioPool::trace_mempool)) {
+    static uint32_t nreqs;
+    if (unlikely((++nreqs % 65536) == 0)) {
+      xp_stats.dump(__func__, nreqs);
+    }
+  }
+
+  m->set_seq(xcon->state.next_out_seq());
+  m->set_magic(magic); // trace flags and special handling
+
+  m->encode(xcon->get_features(), this->crcflags);
+
+  buffer::list &payload = m->get_payload();
+  buffer::list &middle = m->get_middle();
+  buffer::list &data = m->get_data();
+
+  int msg_off = 0;
+  int req_off = 0;
+  int req_size = 0;
+  int nbuffers =
+    xio_count_buffers(payload, req_size, msg_off, req_off) +
+    xio_count_buffers(middle, req_size, msg_off, req_off) +
+    xio_count_buffers(data, req_size, msg_off, req_off);
+
+  int ex_cnt = req_off;
+  if (msg_off == 0 && ex_cnt > 0) {
+    // no buffers for last msg
+    ldout(cct,10) << "msg_off 0, ex_cnt " << ex_cnt << " -> " << ex_cnt-1 << dendl;
+    ex_cnt--;
+  }
+
+  /* get an XioMsg frame */
+  XioMsg *xmsg = pool_alloc_xio_msg(m, xcon, ex_cnt);
+  if (! xmsg) {
+    /* could happen if Accelio has been shutdown */
+    return ENOMEM;
+  }
+
+  ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg
+       << " tag " << (int)xmsg->hdr.tag
+       << " req_0 " << xmsg->get_xio_msg() << " msg type " << m->get_type()
+       << " features: " << xcon->get_features()
+       << " conn " << xcon->conn << " sess " << xcon->session << dendl;
+
+  if (magic & (MSG_MAGIC_XIO)) {
+
+    /* XXXX verify */
+    switch (m->get_type()) {
+    case 43:
+    // case 15:
+      ldout(cct,4) << __func__ << "stop 43 " << m->get_type() << " " << *m << dendl;
+      buffer::list &payload = m->get_payload();
+      ldout(cct,4) << __func__ << "payload dump:" << dendl;
+      payload.hexdump(cout);
+    }
+  }
+
+  struct xio_msg *req = xmsg->get_xio_msg();
+  struct xio_iovec_ex *msg_iov = req->out.pdata_iov.sglist;
+
+  if (magic & (MSG_MAGIC_XIO)) {
+    ldout(cct,4) << "payload: " << payload.buffers().size() <<
+      " middle: " << middle.buffers().size() <<
+      " data: " << data.buffers().size() <<
+      dendl;
+  }
+
+  if (unlikely(ex_cnt > 0)) {
+    ldout(cct,4) << __func__ << " buffer cnt > XIO_MSGR_IOVLEN (" <<
+      ((XIO_MSGR_IOVLEN-1) + nbuffers) << ")" << dendl;
+  }
+
+  /* do the invariant part */
+  msg_off = 0;
+  req_off = -1; /* most often, not used */
+  req_size = 0;
+
+  xio_place_buffers(payload, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
+                   req_off, BUFFER_PAYLOAD);
+
+  xio_place_buffers(middle, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
+                   req_off, BUFFER_MIDDLE);
+
+  xio_place_buffers(data, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
+                   req_off, BUFFER_DATA);
+  ldout(cct,10) << "ex_cnt " << ex_cnt << ", req_off " << req_off
+    << ", msg_cnt " << xmsg->get_msg_count() << dendl;
+
+  /* finalize request */
+  if (msg_off)
+    req->out.pdata_iov.nents = msg_off;
+
+  /* fixup first msg */
+  req = xmsg->get_xio_msg();
+
+  const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers();
+  assert(header.size() == 1); /* XXX */
+  list<bufferptr>::const_iterator pb = header.begin();
+  req->out.header.iov_base = (char*) pb->c_str();
+  req->out.header.iov_len = pb->length();
+
+  /* deliver via xio, preserve ordering */
+  if (xmsg->get_msg_count() > 1) {
+    struct xio_msg *head = xmsg->get_xio_msg();
+    struct xio_msg *tail = head;
+    for (req_off = 0; ((unsigned) req_off) < xmsg->get_msg_count()-1; ++req_off) {
+      req = &xmsg->req_arr[req_off].msg;
+assert(!req->in.pdata_iov.nents);
+assert(req->out.pdata_iov.nents || !nbuffers);
+      tail->next = req;
+      tail = req;
+     }
+    tail->next = NULL;
+  }
+  xmsg->trace = m->trace;
+  m->trace.event("xio portal enqueue for send");
+  m->trace.keyval("xio message segments", xmsg->hdr.msg_cnt);
+  xcon->portal->enqueue_for_send(xcon, xmsg);
+
+  return code;
+} /* send_message(Message *, Connection *) */
+
+int XioMessenger::shutdown()
+{
+  shutdown_called = true;
+  conns_sp.lock();
+  XioConnection::ConnList::iterator iter;
+  iter = conns_list.begin();
+  for (iter = conns_list.begin(); iter != conns_list.end(); ++iter) {
+    (void) iter->disconnect(); // XXX mark down?
+  }
+  conns_sp.unlock();
+  while(nsessions > 0) {
+    Mutex::Locker lck(sh_mtx);
+    if (nsessions > 0)
+      sh_cond.Wait(sh_mtx);
+  }
+  portals.shutdown();
+  dispatch_strategy->shutdown();
+  did_bind = false;
+  started = false;
+  return 0;
+} /* shutdown */
+
+ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest)
+{
+  if (shutdown_called)
+    return NULL;
+
+  const entity_inst_t& self_inst = get_myinst();
+  if ((&dest == &self_inst) ||
+      (dest == self_inst)) {
+    return get_loopback_connection();
+  }
+
+  conns_sp.lock();
+  XioConnection::EntitySet::iterator conn_iter =
+    conns_entity_map.find(dest, XioConnection::EntityComp());
+  if (conn_iter != conns_entity_map.end()) {
+    ConnectionRef cref = &(*conn_iter);
+    conns_sp.unlock();
+    return cref;
+  }
+  else {
+    conns_sp.unlock();
+    string xio_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
+                                        dest.addr, true /* want_port */);
+
+    ldout(cct,4) << "XioMessenger " << this << " get_connection: xio_uri "
+      << xio_uri << dendl;
+
+    /* XXX client session creation parameters */
+    struct xio_session_params params = {};
+    params.type         = XIO_SESSION_CLIENT;
+    params.ses_ops      = &xio_msgr_ops;
+    params.user_context = this;
+    params.uri          = xio_uri.c_str();
+
+    XioConnection *xcon = new XioConnection(this, XioConnection::ACTIVE,
+                                           dest);
+
+    xcon->session = xio_session_create(&params);
+    if (! xcon->session) {
+      delete xcon;
+      return NULL;
+    }
+
+    /* this should cause callbacks with user context of conn, but
+     * we can always set it explicitly */
+    struct xio_connection_params xcp = {};
+    xcp.session           = xcon->session;
+    xcp.ctx               = xcon->portal->ctx;
+    xcp.conn_user_context = xcon;
+
+    xcon->conn = xio_connect(&xcp);
+    if (!xcon->conn) {
+      xio_session_destroy(xcon->session);
+      delete xcon;
+      return NULL;
+    }
+
+    nsessions++;
+    xcon->connected = true;
+
+    /* sentinel ref */
+    xcon->get(); /* xcon->nref == 1 */
+    conns_sp.lock();
+    conns_list.push_back(*xcon);
+    conns_entity_map.insert(*xcon);
+    conns_sp.unlock();
+
+    /* XXXX pre-merge of session startup negotiation ONLY! */
+    xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
+
+    ldout(cct,2) << "New connection xcon: " << xcon <<
+      " up_ready on session " << xcon->session <<
+      " on msgr: " << this << " portal: " << xcon->portal << dendl;
+
+    return xcon->get(); /* nref +1 */
+  }
+} /* get_connection */
+
+ConnectionRef XioMessenger::get_loopback_connection()
+{
+  return (loop_con.get());
+} /* get_loopback_connection */
+
+void XioMessenger::unregister_xcon(XioConnection *xcon)
+{
+  Spinlock::Locker lckr(conns_sp);
+
+  XioConnection::EntitySet::iterator conn_iter =
+       conns_entity_map.find(xcon->peer, XioConnection::EntityComp());
+  if (conn_iter != conns_entity_map.end()) {
+       XioConnection *xcon2 = &(*conn_iter);
+       if (xcon == xcon2) {
+         conns_entity_map.erase(conn_iter);
+       }
+  }
+
+  /* check if citer on conn_list */
+  if (xcon->conns_hook.is_linked()) {
+    /* now find xcon on conns_list and erase */
+    XioConnection::ConnList::iterator citer =
+        XioConnection::ConnList::s_iterator_to(*xcon);
+    conns_list.erase(citer);
+  }
+}
+
+void XioMessenger::mark_down(const entity_addr_t& addr)
+{
+  entity_inst_t inst(entity_name_t(), addr);
+  Spinlock::Locker lckr(conns_sp);
+  XioConnection::EntitySet::iterator conn_iter =
+    conns_entity_map.find(inst, XioConnection::EntityComp());
+  if (conn_iter != conns_entity_map.end()) {
+      (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
+    }
+} /* mark_down(const entity_addr_t& */
+
+void XioMessenger::mark_down(Connection* con)
+{
+  XioConnection *xcon = static_cast<XioConnection*>(con);
+  xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
+} /* mark_down(Connection*) */
+
+void XioMessenger::mark_down_all()
+{
+  Spinlock::Locker lckr(conns_sp);
+  XioConnection::EntitySet::iterator conn_iter;
+  for (conn_iter = conns_entity_map.begin(); conn_iter !=
+        conns_entity_map.begin(); ++conn_iter) {
+    (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
+  }
+} /* mark_down_all */
+
+static inline XioMarkDownHook* pool_alloc_markdown_hook(
+  XioConnection *xcon, Message *m)
+{
+  struct xio_reg_mem mp_mem;
+  int e = xio_mempool_alloc(xio_msgr_noreg_mpool,
+                           sizeof(XioMarkDownHook), &mp_mem);
+  if (!!e)
+    return NULL;
+  XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr);
+  new (hook) XioMarkDownHook(xcon, m, mp_mem);
+  return hook;
+}
+
+void XioMessenger::mark_down_on_empty(Connection* con)
+{
+  XioConnection *xcon = static_cast<XioConnection*>(con);
+  MNop* m = new MNop();
+  m->tag = XIO_NOP_TAG_MARKDOWN;
+  m->set_completion_hook(pool_alloc_markdown_hook(xcon, m));
+  // stall new messages
+  xcon->cstate.session_state = XioConnection::session_states::BARRIER;
+  (void) _send_message_impl(m, xcon);
+}
+
+void XioMessenger::mark_disposable(Connection *con)
+{
+  XioConnection *xcon = static_cast<XioConnection*>(con);
+  xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE);
+}
+
+void XioMessenger::try_insert(XioConnection *xcon)
+{
+  Spinlock::Locker lckr(conns_sp);
+  /* already resident in conns_list */
+  conns_entity_map.insert(*xcon);
+}
+
+XioMessenger::~XioMessenger()
+{
+  delete dispatch_strategy;
+  nInstances--;
+} /* dtor */