Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / xio / XioMessenger.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  * Portions Copyright (C) 2013 CohortFS, LLC
8  *
9  * This is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software
12  * Foundation.  See file COPYING.
13  *
14  */
15
16 #include <arpa/inet.h>
17 #include <boost/lexical_cast.hpp>
18 #include <set>
19 #include <stdlib.h>
20 #include <memory>
21
22 #include "XioMsg.h"
23 #include "XioMessenger.h"
24 #include "common/address_helper.h"
25 #include "common/code_environment.h"
26 #include "messages/MNop.h"
27
28 #define dout_subsys ceph_subsys_xio
29 #undef dout_prefix
30 #define dout_prefix *_dout << "xio."
31
32 Mutex mtx("XioMessenger Package Lock");
33 std::atomic<bool> initialized = { false };
34
35 std::atomic<unsigned> XioMessenger::nInstances = { 0 };
36
37 struct xio_mempool *xio_msgr_noreg_mpool;
38
39 static struct xio_session_ops xio_msgr_ops;
40
41 /* Accelio API callouts */
42
43 namespace xio_log
44 {
45 typedef pair<const char*, int> level_pair;
46 static const level_pair LEVELS[] = {
47   make_pair("fatal", 0),
48   make_pair("error", 0),
49   make_pair("warn", 1),
50   make_pair("info", 1),
51   make_pair("debug", 2),
52   make_pair("trace", 20)
53 };
54
55 static CephContext *context;
56
57 int get_level()
58 {
59   int level = 0;
60   for (size_t i = 0; i < sizeof(LEVELS); i++) {
61     if (!ldlog_p1(context, dout_subsys, LEVELS[i].second))
62       break;
63     level++;
64   }
65   return level;
66 }
67
68 void log_dout(const char *file, unsigned line,
69               const char *function, unsigned level,
70               const char *fmt, ...)
71 {
72   char buffer[2048];
73   va_list args;
74   va_start(args, fmt);
75   int n = vsnprintf(buffer, sizeof(buffer), fmt, args);
76   va_end(args);
77
78   if (n > 0) {
79     const char *short_file = strrchr(file, '/');
80     short_file = (short_file == NULL) ? file : short_file + 1;
81
82     const level_pair &lvl = LEVELS[level];
83     ldout(context, lvl.second) << '[' << lvl.first << "] "
84       << short_file << ':' << line << ' '
85       << function << " - " << buffer << dendl;
86   }
87 }
88 }
89
90 static int on_session_event(struct xio_session *session,
91                             struct xio_session_event_data *event_data,
92                             void *cb_user_context)
93 {
94   XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
95   CephContext *cct = msgr->cct;
96
97   ldout(cct,4) << "session event: " << xio_session_event_str(event_data->event)
98     << ". reason: " << xio_strerror(event_data->reason) << dendl;
99
100   return msgr->session_event(session, event_data, cb_user_context);
101 }
102
103 static int on_new_session(struct xio_session *session,
104                           struct xio_new_session_req *req,
105                           void *cb_user_context)
106 {
107   XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
108   CephContext *cct = msgr->cct;
109
110   ldout(cct,4) << "new session " << session
111     << " user_context " << cb_user_context << dendl;
112
113   return (msgr->new_session(session, req, cb_user_context));
114 }
115
116 static int on_msg(struct xio_session *session,
117                   struct xio_msg *req,
118                   int more_in_batch,
119                   void *cb_user_context)
120 {
121   XioConnection* xcon __attribute__((unused)) =
122     static_cast<XioConnection*>(cb_user_context);
123   CephContext *cct = xcon->get_messenger()->cct;
124
125   ldout(cct,25) << "on_msg session " << session << " xcon " << xcon << dendl;
126
127   if (unlikely(XioPool::trace_mempool)) {
128     static uint32_t nreqs;
129     if (unlikely((++nreqs % 65536) == 0)) {
130       xp_stats.dump(__func__, nreqs);
131     }
132   }
133
134   return xcon->on_msg(session, req, more_in_batch,
135                           cb_user_context);
136 }
137
138 static int on_ow_msg_send_complete(struct xio_session *session,
139                                    struct xio_msg *msg,
140                                    void *conn_user_context)
141 {
142   XioConnection *xcon =
143     static_cast<XioConnection*>(conn_user_context);
144   CephContext *cct = xcon->get_messenger()->cct;
145
146   ldout(cct,25) << "msg delivered session: " << session
147                 << " msg: " << msg << " conn_user_context "
148                 << conn_user_context << dendl;
149
150   return xcon->on_ow_msg_send_complete(session, msg, conn_user_context);
151 }
152
153 static int on_msg_error(struct xio_session *session,
154                         enum xio_status error,
155                         enum xio_msg_direction dir,
156                         struct xio_msg  *msg,
157                         void *conn_user_context)
158 {
159   /* XIO promises to flush back undelivered messages */
160   XioConnection *xcon =
161     static_cast<XioConnection*>(conn_user_context);
162   CephContext *cct = xcon->get_messenger()->cct;
163
164   ldout(cct,4) << "msg error session: " << session
165     << " error: " << xio_strerror(error) << " msg: " << msg
166     << " conn_user_context " << conn_user_context << dendl;
167
168   return xcon->on_msg_error(session, error, msg, conn_user_context);
169 }
170
171 static int on_cancel(struct xio_session *session,
172                      struct xio_msg  *msg,
173                      enum xio_status result,
174                      void *conn_user_context)
175 {
176   XioConnection* xcon __attribute__((unused)) =
177     static_cast<XioConnection*>(conn_user_context);
178   CephContext *cct = xcon->get_messenger()->cct;
179
180   ldout(cct,25) << "on cancel: session: " << session << " msg: " << msg
181     << " conn_user_context " << conn_user_context << dendl;
182
183   return 0;
184 }
185
186 static int on_cancel_request(struct xio_session *session,
187                              struct xio_msg  *msg,
188                              void *conn_user_context)
189 {
190   XioConnection* xcon __attribute__((unused)) =
191     static_cast<XioConnection*>(conn_user_context);
192   CephContext *cct = xcon->get_messenger()->cct;
193
194   ldout(cct,25) << "on cancel request: session: " << session << " msg: " << msg
195     << " conn_user_context " << conn_user_context << dendl;
196
197   return 0;
198 }
199
200 /* free functions */
201 static string xio_uri_from_entity(const string &type,
202                                   const entity_addr_t& addr, bool want_port)
203 {
204   const char *host = NULL;
205   char addr_buf[129];
206   string xio_uri;
207
208   switch(addr.get_family()) {
209   case AF_INET:
210     host = inet_ntop(AF_INET, &addr.in4_addr().sin_addr, addr_buf,
211                      INET_ADDRSTRLEN);
212     break;
213   case AF_INET6:
214     host = inet_ntop(AF_INET6, &addr.in6_addr().sin6_addr, addr_buf,
215                      INET6_ADDRSTRLEN);
216     break;
217   default:
218     abort();
219     break;
220   };
221
222   if (type == "rdma" || type == "tcp")
223       xio_uri = type + "://";
224   else
225       xio_uri = "rdma://";
226
227   /* The following can only succeed if the host is rdma-capable */
228   xio_uri += host;
229   if (want_port) {
230     xio_uri += ":";
231     xio_uri += boost::lexical_cast<std::string>(addr.get_port());
232   }
233
234   return xio_uri;
235 } /* xio_uri_from_entity */
236
237 void XioInit::package_init(CephContext *cct) {
238    if (! initialized) {
239
240      mtx.Lock();
241      if (! initialized) {
242
243        xio_init();
244
245        // claim a reference to the first context we see
246        xio_log::context = cct->get();
247
248        int xopt;
249        xopt = xio_log::get_level();
250        xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_LEVEL,
251                   &xopt, sizeof(xopt));
252        xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_FN,
253                   (const void*)xio_log::log_dout, sizeof(xio_log_fn));
254
255        xopt = 1;
256        xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_DISABLE_HUGETBL,
257                   &xopt, sizeof(xopt));
258
259        if (g_code_env == CODE_ENVIRONMENT_DAEMON) {
260          xopt = 1;
261          xio_set_opt(NULL, XIO_OPTLEVEL_RDMA, XIO_OPTNAME_ENABLE_FORK_INIT,
262                     &xopt, sizeof(xopt));
263        }
264
265        xopt = XIO_MSGR_IOVLEN;
266        xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_IN_IOVLEN,
267                   &xopt, sizeof(xopt));
268        xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN,
269                   &xopt, sizeof(xopt));
270
271        /* enable flow-control */
272        xopt = 1;
273        xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL,
274                   &xopt, sizeof(xopt));
275
276        /* and set threshold for buffer callouts */
277        xopt = max(cct->_conf->xio_max_send_inline, 512);
278        xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_DATA,
279                   &xopt, sizeof(xopt));
280
281        xopt = XioMsgHdr::get_max_encoded_length();
282        ldout(cct,2) << "setting accelio max header size " << xopt << dendl;
283        xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_HEADER,
284                   &xopt, sizeof(xopt));
285
286        size_t queue_depth = cct->_conf->xio_queue_depth;
287        struct xio_mempool_config mempool_config = {
288          6,
289          {
290            {1024,  0,  queue_depth,  262144},
291            {4096,  0,  queue_depth,  262144},
292            {16384, 0,  queue_depth,  262144},
293            {65536, 0,  128,  65536},
294            {262144, 0,  32,  16384},
295            {1048576, 0, 8,  8192}
296          }
297        };
298        xio_set_opt(NULL,
299                    XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_CONFIG_MEMPOOL,
300                    &mempool_config, sizeof(mempool_config));
301
302        /* and unregisterd one */
303  #define XMSG_MEMPOOL_QUANTUM 4096
304
305        xio_msgr_noreg_mpool =
306         xio_mempool_create(-1 /* nodeid */,
307                            XIO_MEMPOOL_FLAG_REGULAR_PAGES_ALLOC);
308
309        (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 64,
310                                        cct->_conf->xio_mp_min,
311                                        cct->_conf->xio_mp_max_64,
312                                        XMSG_MEMPOOL_QUANTUM, 0);
313        (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 256,
314                                        cct->_conf->xio_mp_min,
315                                        cct->_conf->xio_mp_max_256,
316                                        XMSG_MEMPOOL_QUANTUM, 0);
317        (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 1024,
318                                        cct->_conf->xio_mp_min,
319                                        cct->_conf->xio_mp_max_1k,
320                                        XMSG_MEMPOOL_QUANTUM, 0);
321        (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, getpagesize(),
322                                        cct->_conf->xio_mp_min,
323                                        cct->_conf->xio_mp_max_page,
324                                        XMSG_MEMPOOL_QUANTUM, 0);
325
326        /* initialize ops singleton */
327        xio_msgr_ops.on_session_event = on_session_event;
328        xio_msgr_ops.on_new_session = on_new_session;
329        xio_msgr_ops.on_session_established = NULL;
330        xio_msgr_ops.on_msg = on_msg;
331        xio_msgr_ops.on_ow_msg_send_complete = on_ow_msg_send_complete;
332        xio_msgr_ops.on_msg_error = on_msg_error;
333        xio_msgr_ops.on_cancel = on_cancel;
334        xio_msgr_ops.on_cancel_request = on_cancel_request;
335
336        /* mark initialized */
337        initialized = true;
338      }
339      mtx.Unlock();
340    }
341  }
342
343 /* XioMessenger */
344 #undef dout_prefix
345 #define dout_prefix _prefix(_dout, this)
346 static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) {
347   return *_dout << "-- " << msgr->get_myaddr() << " ";
348 }
349
350 XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
351                            string mname, uint64_t _nonce,
352                            uint64_t cflags, DispatchStrategy *ds)
353   : SimplePolicyMessenger(cct, name, mname, _nonce),
354     XioInit(cct),
355     portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)),
356     dispatch_strategy(ds),
357     loop_con(new XioLoopbackConnection(this)),
358     special_handling(0),
359     sh_mtx("XioMessenger session mutex"),
360     sh_cond(),
361     need_addr(true),
362     did_bind(false),
363     nonce(_nonce)
364 {
365
366   if (cct->_conf->xio_trace_xcon)
367     magic |= MSG_MAGIC_TRACE_XCON;
368
369   XioPool::trace_mempool = (cct->_conf->xio_trace_mempool);
370   XioPool::trace_msgcnt = (cct->_conf->xio_trace_msgcnt);
371
372   dispatch_strategy->set_messenger(this);
373
374   /* update class instance count */
375   nInstances++;
376
377   loop_con->set_features(CEPH_FEATURES_ALL);
378
379   ldout(cct,2) << "Create msgr: " << this << " instance: "
380     << nInstances << " type: " << name.type_str()
381     << " subtype: " << mname << " nportals: " << get_nportals(cflags)
382     << " nconns_per_portal: " << get_nconns_per_portal(cflags)
383     << dendl;
384
385 } /* ctor */
386
387 int XioMessenger::pool_hint(uint32_t dsize) {
388   if (dsize > 1024*1024)
389     return 0;
390
391   /* if dsize is already present, returns -EEXIST */
392   return xio_mempool_add_slab(xio_msgr_noreg_mpool, dsize, 0,
393                                    cct->_conf->xio_mp_max_hint,
394                                    XMSG_MEMPOOL_QUANTUM, 0);
395 }
396
397 int XioMessenger::get_nconns_per_portal(uint64_t cflags)
398 {
399   const int XIO_DEFAULT_NUM_CONNS_PER_PORTAL = 8;
400   int nconns = XIO_DEFAULT_NUM_CONNS_PER_PORTAL;
401
402   if (cflags & Messenger::HAS_MANY_CONNECTIONS)
403     nconns = max(cct->_conf->xio_max_conns_per_portal, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
404   else if (cflags & Messenger::HEARTBEAT)
405     nconns = max(cct->_conf->osd_heartbeat_min_peers * 4, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
406
407   return nconns;
408 }
409
410 int XioMessenger::get_nportals(uint64_t cflags)
411 {
412   int nportals = 1;
413
414   if (cflags & Messenger::HAS_HEAVY_TRAFFIC)
415     nportals = max(cct->_conf->xio_portal_threads, 1);
416
417   return nportals;
418 }
419
420 void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
421 {
422   // be careful here: multiple threads may block here, and readers of
423   // my_inst.addr do NOT hold any lock.
424
425   // this always goes from true -> false under the protection of the
426   // mutex.  if it is already false, we need not retake the mutex at
427   // all.
428   if (!need_addr)
429     return;
430
431   sh_mtx.Lock();
432   if (need_addr) {
433     entity_addr_t t = peer_addr_for_me;
434     t.set_port(my_inst.addr.get_port());
435     my_inst.addr.set_sockaddr(t.get_sockaddr());
436     ldout(cct,2) << "learned my addr " << my_inst.addr << dendl;
437     need_addr = false;
438     // init_local_connection();
439   }
440   sh_mtx.Unlock();
441
442 }
443
444 int XioMessenger::new_session(struct xio_session *session,
445                               struct xio_new_session_req *req,
446                               void *cb_user_context)
447 {
448   if (shutdown_called) {
449     return xio_reject(
450       session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */);
451   }
452   int code = portals.accept(session, req, cb_user_context);
453   if (! code)
454     nsessions++;
455   return code;
456 } /* new_session */
457
458 int XioMessenger::session_event(struct xio_session *session,
459                                 struct xio_session_event_data *event_data,
460                                 void *cb_user_context)
461 {
462   XioConnection *xcon;
463
464   switch (event_data->event) {
465   case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT:
466   {
467     struct xio_connection *conn = event_data->conn;
468     struct xio_connection_attr xcona;
469     entity_addr_t peer_addr_for_me, paddr;
470
471     xcon = static_cast<XioConnection*>(event_data->conn_user_context);
472
473     ldout(cct,2) << "connection established " << event_data->conn
474       << " session " << session << " xcon " << xcon << dendl;
475
476     (void) xio_query_connection(conn, &xcona,
477                                 XIO_CONNECTION_ATTR_LOCAL_ADDR|
478                                 XIO_CONNECTION_ATTR_PEER_ADDR);
479     peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
480     paddr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
481     //set_myaddr(peer_addr_for_me);
482     learned_addr(peer_addr_for_me);
483     ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl;
484
485     /* notify hook */
486     this->ms_deliver_handle_connect(xcon);
487     this->ms_deliver_handle_fast_connect(xcon);
488   }
489   break;
490
491   case XIO_SESSION_NEW_CONNECTION_EVENT:
492   {
493     struct xio_connection *conn = event_data->conn;
494     struct xio_connection_attr xcona;
495     entity_inst_t s_inst;
496     entity_addr_t peer_addr_for_me;
497
498     (void) xio_query_connection(conn, &xcona,
499                                 XIO_CONNECTION_ATTR_CTX|
500                                 XIO_CONNECTION_ATTR_PEER_ADDR|
501                                 XIO_CONNECTION_ATTR_LOCAL_ADDR);
502     /* XXX assumes RDMA */
503     s_inst.addr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
504     peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
505
506     xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst);
507     xcon->session = session;
508
509     struct xio_context_attr xctxa;
510     (void) xio_query_context(xcona.ctx, &xctxa, XIO_CONTEXT_ATTR_USER_CTX);
511
512     xcon->conn = conn;
513     xcon->portal = static_cast<XioPortal*>(xctxa.user_context);
514     assert(xcon->portal);
515
516     xcona.user_context = xcon;
517     (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX);
518
519     xcon->connected = true;
520
521     /* sentinel ref */
522     xcon->get(); /* xcon->nref == 1 */
523     conns_sp.lock();
524     conns_list.push_back(*xcon);
525     /* XXX we can't put xcon in conns_entity_map becase we don't yet know
526      * it's peer address */
527     conns_sp.unlock();
528
529     /* XXXX pre-merge of session startup negotiation ONLY! */
530     xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
531
532     ldout(cct,2) << "New connection session " << session
533       << " xcon " << xcon << " on msgr: " << this << " portal: " << xcon->portal << dendl;
534     ldout(cct,2) << "Server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl;
535   }
536   break;
537   case XIO_SESSION_CONNECTION_ERROR_EVENT:
538   case XIO_SESSION_CONNECTION_CLOSED_EVENT: /* orderly discon */
539   case XIO_SESSION_CONNECTION_DISCONNECTED_EVENT: /* unexpected discon */
540   case XIO_SESSION_CONNECTION_REFUSED_EVENT:
541     xcon = static_cast<XioConnection*>(event_data->conn_user_context);
542     ldout(cct,2) << xio_session_event_str(event_data->event)
543       << " xcon " << xcon << " session " << session  << dendl;
544     if (likely(!!xcon)) {
545       unregister_xcon(xcon);
546       xcon->on_disconnect_event();
547     }
548     break;
549   case XIO_SESSION_CONNECTION_TEARDOWN_EVENT:
550     xcon = static_cast<XioConnection*>(event_data->conn_user_context);
551     ldout(cct,2) << xio_session_event_str(event_data->event)
552       << " xcon " << xcon << " session " << session << dendl;
553     /*
554      * There are flows where Accelio sends teardown event without going
555      * through disconnect event. so we make sure we cleaned the connection.
556      */
557     unregister_xcon(xcon);
558     xcon->on_teardown_event();
559     break;
560   case XIO_SESSION_TEARDOWN_EVENT:
561     ldout(cct,2) << xio_session_event_str(event_data->event)
562       << " session " << session << dendl;
563     if (unlikely(XioPool::trace_mempool)) {
564       xp_stats.dump("xio session dtor", reinterpret_cast<uint64_t>(session));
565     }
566     xio_session_destroy(session);
567     if (--nsessions == 0) {
568       Mutex::Locker lck(sh_mtx);
569       if (nsessions == 0)
570         sh_cond.Signal();
571     }
572     break;
573   default:
574     break;
575   };
576
577   return 0;
578 }
579
580 enum bl_type
581 {
582   BUFFER_PAYLOAD,
583   BUFFER_MIDDLE,
584   BUFFER_DATA
585 };
586
587 #define MAX_XIO_BUF_SIZE 1044480
588
589 static inline int
590 xio_count_buffers(const buffer::list& bl, int& req_size, int& msg_off, int& req_off)
591 {
592
593   const std::list<buffer::ptr>& buffers = bl.buffers();
594   list<bufferptr>::const_iterator pb;
595   size_t size, off;
596   int result;
597   int first = 1;
598
599   off = size = 0;
600   result = 0;
601   for (;;) {
602     if (off >= size) {
603       if (first) pb = buffers.begin(); else ++pb;
604       if (pb == buffers.end()) {
605         break;
606       }
607       off = 0;
608       size = pb->length();
609       first = 0;
610     }
611     size_t count = size - off;
612     if (!count) continue;
613     if (req_size + count > MAX_XIO_BUF_SIZE) {
614         count = MAX_XIO_BUF_SIZE - req_size;
615     }
616
617     ++result;
618
619     /* advance iov and perhaps request */
620
621     off += count;
622     req_size += count;
623     ++msg_off;
624     if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
625       ++req_off;
626       msg_off = 0;
627       req_size = 0;
628     }
629   }
630
631   return result;
632 }
633
634 static inline void
635 xio_place_buffers(const buffer::list& bl, XioMsg *xmsg, struct xio_msg*& req,
636                   struct xio_iovec_ex*& msg_iov, int& req_size,
637                   int ex_cnt, int& msg_off, int& req_off, bl_type type)
638 {
639
640   const std::list<buffer::ptr>& buffers = bl.buffers();
641   list<bufferptr>::const_iterator pb;
642   struct xio_iovec_ex* iov;
643   size_t size, off;
644   const char *data = NULL;
645   int first = 1;
646
647   off = size = 0;
648   for (;;) {
649     if (off >= size) {
650       if (first) pb = buffers.begin(); else ++pb;
651       if (pb == buffers.end()) {
652         break;
653       }
654       off = 0;
655       size = pb->length();
656       data = pb->c_str();        // is c_str() efficient?
657       first = 0;
658     }
659     size_t count = size - off;
660     if (!count) continue;
661     if (req_size + count > MAX_XIO_BUF_SIZE) {
662         count = MAX_XIO_BUF_SIZE - req_size;
663     }
664
665     /* assign buffer */
666     iov = &msg_iov[msg_off];
667     iov->iov_base = (void *) (&data[off]);
668     iov->iov_len = count;
669
670     switch (type) {
671     case BUFFER_DATA:
672       //break;
673     default:
674     {
675       struct xio_reg_mem *mp = get_xio_mp(*pb);
676       iov->mr = (mp) ? mp->mr : NULL;
677     }
678       break;
679     }
680
681     /* advance iov(s) */
682
683     off += count;
684     req_size += count;
685     ++msg_off;
686
687     /* next request if necessary */
688
689     if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
690       /* finish this request */
691       req->out.pdata_iov.nents = msg_off;
692       /* advance to next, and write in it if it's not the last one. */
693       if (++req_off >= ex_cnt) {
694         req = 0;        /* poison.  trap if we try to use it. */
695         msg_iov = NULL;
696       } else {
697         req = &xmsg->req_arr[req_off].msg;
698         msg_iov = req->out.pdata_iov.sglist;
699       }
700       msg_off = 0;
701       req_size = 0;
702     }
703   }
704 }
705
706 int XioMessenger::bind(const entity_addr_t& addr)
707 {
708   if (addr.is_blank_ip()) {
709       lderr(cct) << "ERROR: need rdma ip for remote use! " << dendl;
710       cout << "Error: xio bind failed. public/cluster ip not specified" << std::endl;
711       return -1;
712   }
713
714   entity_addr_t shift_addr = addr;
715   string base_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
716                                         shift_addr, false /* want_port */);
717   ldout(cct,4) << "XioMessenger " << this << " bind: xio_uri "
718     << base_uri << ':' << shift_addr.get_port() << dendl;
719
720   uint16_t port0;
721   int r = portals.bind(&xio_msgr_ops, base_uri, shift_addr.get_port(), &port0);
722   if (r == 0) {
723     shift_addr.set_port(port0);
724     shift_addr.nonce = nonce;
725     set_myaddr(shift_addr);
726     need_addr = false;
727     did_bind = true;
728   }
729   return r;
730 } /* bind */
731
732 int XioMessenger::rebind(const set<int>& avoid_ports)
733 {
734   ldout(cct,4) << "XioMessenger " << this << " rebind attempt" << dendl;
735   return 0;
736 } /* rebind */
737
738 int XioMessenger::start()
739 {
740   portals.start();
741   dispatch_strategy->start();
742   if (!did_bind) {
743           my_inst.addr.nonce = nonce;
744   }
745   started = true;
746   return 0;
747 }
748
749 void XioMessenger::wait()
750 {
751   portals.join();
752   dispatch_strategy->wait();
753 } /* wait */
754
755 int XioMessenger::_send_message(Message *m, const entity_inst_t& dest)
756 {
757   ConnectionRef conn = get_connection(dest);
758   if (conn)
759     return _send_message(m, &(*conn));
760   else
761     return EINVAL;
762 } /* send_message(Message *, const entity_inst_t&) */
763
764 static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
765   int ex_cnt)
766 {
767   struct xio_reg_mem mp_mem;
768   int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioMsg), &mp_mem);
769   if (!!e)
770     return NULL;
771   XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
772   assert(!!xmsg);
773   new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
774   return xmsg;
775 }
776
777 XioCommand* pool_alloc_xio_command(XioConnection *xcon)
778 {
779   struct xio_reg_mem mp_mem;
780   int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem);
781   if (!!e)
782     return NULL;
783   XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr);
784   assert(!!xcmd);
785   new (xcmd) XioCommand(xcon, mp_mem);
786   return xcmd;
787 }
788
789 int XioMessenger::_send_message(Message *m, Connection *con)
790 {
791   if (con == loop_con.get() /* intrusive_ptr get() */) {
792     m->set_connection(con);
793     m->set_src(get_myinst().name);
794     m->set_seq(loop_con->next_seq());
795     ds_dispatch(m);
796     return 0;
797   }
798
799   XioConnection *xcon = static_cast<XioConnection*>(con);
800
801   /* If con is not in READY state, we have to enforce policy */
802   if (xcon->cstate.session_state.read() != XioConnection::UP) {
803     pthread_spin_lock(&xcon->sp);
804     if (xcon->cstate.session_state.read() != XioConnection::UP) {
805       xcon->outgoing.mqueue.push_back(*m);
806       pthread_spin_unlock(&xcon->sp);
807       return 0;
808     }
809     pthread_spin_unlock(&xcon->sp);
810   }
811
812   return _send_message_impl(m, xcon);
813 } /* send_message(Message* m, Connection *con) */
814
815 int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon)
816 {
817   int code = 0;
818
819   Mutex::Locker l(xcon->lock);
820   if (unlikely(XioPool::trace_mempool)) {
821     static uint32_t nreqs;
822     if (unlikely((++nreqs % 65536) == 0)) {
823       xp_stats.dump(__func__, nreqs);
824     }
825   }
826
827   m->set_seq(xcon->state.next_out_seq());
828   m->set_magic(magic); // trace flags and special handling
829
830   m->encode(xcon->get_features(), this->crcflags);
831
832   buffer::list &payload = m->get_payload();
833   buffer::list &middle = m->get_middle();
834   buffer::list &data = m->get_data();
835
836   int msg_off = 0;
837   int req_off = 0;
838   int req_size = 0;
839   int nbuffers =
840     xio_count_buffers(payload, req_size, msg_off, req_off) +
841     xio_count_buffers(middle, req_size, msg_off, req_off) +
842     xio_count_buffers(data, req_size, msg_off, req_off);
843
844   int ex_cnt = req_off;
845   if (msg_off == 0 && ex_cnt > 0) {
846     // no buffers for last msg
847     ldout(cct,10) << "msg_off 0, ex_cnt " << ex_cnt << " -> " << ex_cnt-1 << dendl;
848     ex_cnt--;
849   }
850
851   /* get an XioMsg frame */
852   XioMsg *xmsg = pool_alloc_xio_msg(m, xcon, ex_cnt);
853   if (! xmsg) {
854     /* could happen if Accelio has been shutdown */
855     return ENOMEM;
856   }
857
858   ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg
859        << " tag " << (int)xmsg->hdr.tag
860        << " req_0 " << xmsg->get_xio_msg() << " msg type " << m->get_type()
861        << " features: " << xcon->get_features()
862        << " conn " << xcon->conn << " sess " << xcon->session << dendl;
863
864   if (magic & (MSG_MAGIC_XIO)) {
865
866     /* XXXX verify */
867     switch (m->get_type()) {
868     case 43:
869     // case 15:
870       ldout(cct,4) << __func__ << "stop 43 " << m->get_type() << " " << *m << dendl;
871       buffer::list &payload = m->get_payload();
872       ldout(cct,4) << __func__ << "payload dump:" << dendl;
873       payload.hexdump(cout);
874     }
875   }
876
877   struct xio_msg *req = xmsg->get_xio_msg();
878   struct xio_iovec_ex *msg_iov = req->out.pdata_iov.sglist;
879
880   if (magic & (MSG_MAGIC_XIO)) {
881     ldout(cct,4) << "payload: " << payload.buffers().size() <<
882       " middle: " << middle.buffers().size() <<
883       " data: " << data.buffers().size() <<
884       dendl;
885   }
886
887   if (unlikely(ex_cnt > 0)) {
888     ldout(cct,4) << __func__ << " buffer cnt > XIO_MSGR_IOVLEN (" <<
889       ((XIO_MSGR_IOVLEN-1) + nbuffers) << ")" << dendl;
890   }
891
892   /* do the invariant part */
893   msg_off = 0;
894   req_off = -1; /* most often, not used */
895   req_size = 0;
896
897   xio_place_buffers(payload, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
898                     req_off, BUFFER_PAYLOAD);
899
900   xio_place_buffers(middle, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
901                     req_off, BUFFER_MIDDLE);
902
903   xio_place_buffers(data, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
904                     req_off, BUFFER_DATA);
905   ldout(cct,10) << "ex_cnt " << ex_cnt << ", req_off " << req_off
906     << ", msg_cnt " << xmsg->get_msg_count() << dendl;
907
908   /* finalize request */
909   if (msg_off)
910     req->out.pdata_iov.nents = msg_off;
911
912   /* fixup first msg */
913   req = xmsg->get_xio_msg();
914
915   const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers();
916   assert(header.size() == 1); /* XXX */
917   list<bufferptr>::const_iterator pb = header.begin();
918   req->out.header.iov_base = (char*) pb->c_str();
919   req->out.header.iov_len = pb->length();
920
921   /* deliver via xio, preserve ordering */
922   if (xmsg->get_msg_count() > 1) {
923     struct xio_msg *head = xmsg->get_xio_msg();
924     struct xio_msg *tail = head;
925     for (req_off = 0; ((unsigned) req_off) < xmsg->get_msg_count()-1; ++req_off) {
926       req = &xmsg->req_arr[req_off].msg;
927 assert(!req->in.pdata_iov.nents);
928 assert(req->out.pdata_iov.nents || !nbuffers);
929       tail->next = req;
930       tail = req;
931      }
932     tail->next = NULL;
933   }
934   xmsg->trace = m->trace;
935   m->trace.event("xio portal enqueue for send");
936   m->trace.keyval("xio message segments", xmsg->hdr.msg_cnt);
937   xcon->portal->enqueue_for_send(xcon, xmsg);
938
939   return code;
940 } /* send_message(Message *, Connection *) */
941
942 int XioMessenger::shutdown()
943 {
944   shutdown_called = true;
945   conns_sp.lock();
946   XioConnection::ConnList::iterator iter;
947   iter = conns_list.begin();
948   for (iter = conns_list.begin(); iter != conns_list.end(); ++iter) {
949     (void) iter->disconnect(); // XXX mark down?
950   }
951   conns_sp.unlock();
952   while(nsessions > 0) {
953     Mutex::Locker lck(sh_mtx);
954     if (nsessions > 0)
955       sh_cond.Wait(sh_mtx);
956   }
957   portals.shutdown();
958   dispatch_strategy->shutdown();
959   did_bind = false;
960   started = false;
961   return 0;
962 } /* shutdown */
963
964 ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest)
965 {
966   if (shutdown_called)
967     return NULL;
968
969   const entity_inst_t& self_inst = get_myinst();
970   if ((&dest == &self_inst) ||
971       (dest == self_inst)) {
972     return get_loopback_connection();
973   }
974
975   conns_sp.lock();
976   XioConnection::EntitySet::iterator conn_iter =
977     conns_entity_map.find(dest, XioConnection::EntityComp());
978   if (conn_iter != conns_entity_map.end()) {
979     ConnectionRef cref = &(*conn_iter);
980     conns_sp.unlock();
981     return cref;
982   }
983   else {
984     conns_sp.unlock();
985     string xio_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
986                                          dest.addr, true /* want_port */);
987
988     ldout(cct,4) << "XioMessenger " << this << " get_connection: xio_uri "
989       << xio_uri << dendl;
990
991     /* XXX client session creation parameters */
992     struct xio_session_params params = {};
993     params.type         = XIO_SESSION_CLIENT;
994     params.ses_ops      = &xio_msgr_ops;
995     params.user_context = this;
996     params.uri          = xio_uri.c_str();
997
998     XioConnection *xcon = new XioConnection(this, XioConnection::ACTIVE,
999                                             dest);
1000
1001     xcon->session = xio_session_create(&params);
1002     if (! xcon->session) {
1003       delete xcon;
1004       return NULL;
1005     }
1006
1007     /* this should cause callbacks with user context of conn, but
1008      * we can always set it explicitly */
1009     struct xio_connection_params xcp = {};
1010     xcp.session           = xcon->session;
1011     xcp.ctx               = xcon->portal->ctx;
1012     xcp.conn_user_context = xcon;
1013
1014     xcon->conn = xio_connect(&xcp);
1015     if (!xcon->conn) {
1016       xio_session_destroy(xcon->session);
1017       delete xcon;
1018       return NULL;
1019     }
1020
1021     nsessions++;
1022     xcon->connected = true;
1023
1024     /* sentinel ref */
1025     xcon->get(); /* xcon->nref == 1 */
1026     conns_sp.lock();
1027     conns_list.push_back(*xcon);
1028     conns_entity_map.insert(*xcon);
1029     conns_sp.unlock();
1030
1031     /* XXXX pre-merge of session startup negotiation ONLY! */
1032     xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
1033
1034     ldout(cct,2) << "New connection xcon: " << xcon <<
1035       " up_ready on session " << xcon->session <<
1036       " on msgr: " << this << " portal: " << xcon->portal << dendl;
1037
1038     return xcon->get(); /* nref +1 */
1039   }
1040 } /* get_connection */
1041
1042 ConnectionRef XioMessenger::get_loopback_connection()
1043 {
1044   return (loop_con.get());
1045 } /* get_loopback_connection */
1046
1047 void XioMessenger::unregister_xcon(XioConnection *xcon)
1048 {
1049   Spinlock::Locker lckr(conns_sp);
1050
1051   XioConnection::EntitySet::iterator conn_iter =
1052         conns_entity_map.find(xcon->peer, XioConnection::EntityComp());
1053   if (conn_iter != conns_entity_map.end()) {
1054         XioConnection *xcon2 = &(*conn_iter);
1055         if (xcon == xcon2) {
1056           conns_entity_map.erase(conn_iter);
1057         }
1058   }
1059
1060   /* check if citer on conn_list */
1061   if (xcon->conns_hook.is_linked()) {
1062     /* now find xcon on conns_list and erase */
1063     XioConnection::ConnList::iterator citer =
1064         XioConnection::ConnList::s_iterator_to(*xcon);
1065     conns_list.erase(citer);
1066   }
1067 }
1068
1069 void XioMessenger::mark_down(const entity_addr_t& addr)
1070 {
1071   entity_inst_t inst(entity_name_t(), addr);
1072   Spinlock::Locker lckr(conns_sp);
1073   XioConnection::EntitySet::iterator conn_iter =
1074     conns_entity_map.find(inst, XioConnection::EntityComp());
1075   if (conn_iter != conns_entity_map.end()) {
1076       (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
1077     }
1078 } /* mark_down(const entity_addr_t& */
1079
1080 void XioMessenger::mark_down(Connection* con)
1081 {
1082   XioConnection *xcon = static_cast<XioConnection*>(con);
1083   xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
1084 } /* mark_down(Connection*) */
1085
1086 void XioMessenger::mark_down_all()
1087 {
1088   Spinlock::Locker lckr(conns_sp);
1089   XioConnection::EntitySet::iterator conn_iter;
1090   for (conn_iter = conns_entity_map.begin(); conn_iter !=
1091          conns_entity_map.begin(); ++conn_iter) {
1092     (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
1093   }
1094 } /* mark_down_all */
1095
1096 static inline XioMarkDownHook* pool_alloc_markdown_hook(
1097   XioConnection *xcon, Message *m)
1098 {
1099   struct xio_reg_mem mp_mem;
1100   int e = xio_mempool_alloc(xio_msgr_noreg_mpool,
1101                             sizeof(XioMarkDownHook), &mp_mem);
1102   if (!!e)
1103     return NULL;
1104   XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr);
1105   new (hook) XioMarkDownHook(xcon, m, mp_mem);
1106   return hook;
1107 }
1108
1109 void XioMessenger::mark_down_on_empty(Connection* con)
1110 {
1111   XioConnection *xcon = static_cast<XioConnection*>(con);
1112   MNop* m = new MNop();
1113   m->tag = XIO_NOP_TAG_MARKDOWN;
1114   m->set_completion_hook(pool_alloc_markdown_hook(xcon, m));
1115   // stall new messages
1116   xcon->cstate.session_state = XioConnection::session_states::BARRIER;
1117   (void) _send_message_impl(m, xcon);
1118 }
1119
1120 void XioMessenger::mark_disposable(Connection *con)
1121 {
1122   XioConnection *xcon = static_cast<XioConnection*>(con);
1123   xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE);
1124 }
1125
1126 void XioMessenger::try_insert(XioConnection *xcon)
1127 {
1128   Spinlock::Locker lckr(conns_sp);
1129   /* already resident in conns_list */
1130   conns_entity_map.insert(*xcon);
1131 }
1132
1133 XioMessenger::~XioMessenger()
1134 {
1135   delete dispatch_strategy;
1136   nInstances--;
1137 } /* dtor */