1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
16 #include <arpa/inet.h>
17 #include <boost/lexical_cast.hpp>
23 #include "XioMessenger.h"
24 #include "common/address_helper.h"
25 #include "common/code_environment.h"
26 #include "messages/MNop.h"
28 #define dout_subsys ceph_subsys_xio
30 #define dout_prefix *_dout << "xio."
32 Mutex mtx("XioMessenger Package Lock");
33 std::atomic<bool> initialized = { false };
35 std::atomic<unsigned> XioMessenger::nInstances = { 0 };
37 struct xio_mempool *xio_msgr_noreg_mpool;
39 static struct xio_session_ops xio_msgr_ops;
41 /* Accelio API callouts */
45 typedef pair<const char*, int> level_pair;
46 static const level_pair LEVELS[] = {
47 make_pair("fatal", 0),
48 make_pair("error", 0),
51 make_pair("debug", 2),
52 make_pair("trace", 20)
55 static CephContext *context;
60 for (size_t i = 0; i < sizeof(LEVELS); i++) {
61 if (!ldlog_p1(context, dout_subsys, LEVELS[i].second))
68 void log_dout(const char *file, unsigned line,
69 const char *function, unsigned level,
75 int n = vsnprintf(buffer, sizeof(buffer), fmt, args);
79 const char *short_file = strrchr(file, '/');
80 short_file = (short_file == NULL) ? file : short_file + 1;
82 const level_pair &lvl = LEVELS[level];
83 ldout(context, lvl.second) << '[' << lvl.first << "] "
84 << short_file << ':' << line << ' '
85 << function << " - " << buffer << dendl;
90 static int on_session_event(struct xio_session *session,
91 struct xio_session_event_data *event_data,
92 void *cb_user_context)
94 XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
95 CephContext *cct = msgr->cct;
97 ldout(cct,4) << "session event: " << xio_session_event_str(event_data->event)
98 << ". reason: " << xio_strerror(event_data->reason) << dendl;
100 return msgr->session_event(session, event_data, cb_user_context);
103 static int on_new_session(struct xio_session *session,
104 struct xio_new_session_req *req,
105 void *cb_user_context)
107 XioMessenger *msgr = static_cast<XioMessenger*>(cb_user_context);
108 CephContext *cct = msgr->cct;
110 ldout(cct,4) << "new session " << session
111 << " user_context " << cb_user_context << dendl;
113 return (msgr->new_session(session, req, cb_user_context));
116 static int on_msg(struct xio_session *session,
119 void *cb_user_context)
121 XioConnection* xcon __attribute__((unused)) =
122 static_cast<XioConnection*>(cb_user_context);
123 CephContext *cct = xcon->get_messenger()->cct;
125 ldout(cct,25) << "on_msg session " << session << " xcon " << xcon << dendl;
127 if (unlikely(XioPool::trace_mempool)) {
128 static uint32_t nreqs;
129 if (unlikely((++nreqs % 65536) == 0)) {
130 xp_stats.dump(__func__, nreqs);
134 return xcon->on_msg(session, req, more_in_batch,
138 static int on_ow_msg_send_complete(struct xio_session *session,
140 void *conn_user_context)
142 XioConnection *xcon =
143 static_cast<XioConnection*>(conn_user_context);
144 CephContext *cct = xcon->get_messenger()->cct;
146 ldout(cct,25) << "msg delivered session: " << session
147 << " msg: " << msg << " conn_user_context "
148 << conn_user_context << dendl;
150 return xcon->on_ow_msg_send_complete(session, msg, conn_user_context);
153 static int on_msg_error(struct xio_session *session,
154 enum xio_status error,
155 enum xio_msg_direction dir,
157 void *conn_user_context)
159 /* XIO promises to flush back undelivered messages */
160 XioConnection *xcon =
161 static_cast<XioConnection*>(conn_user_context);
162 CephContext *cct = xcon->get_messenger()->cct;
164 ldout(cct,4) << "msg error session: " << session
165 << " error: " << xio_strerror(error) << " msg: " << msg
166 << " conn_user_context " << conn_user_context << dendl;
168 return xcon->on_msg_error(session, error, msg, conn_user_context);
171 static int on_cancel(struct xio_session *session,
173 enum xio_status result,
174 void *conn_user_context)
176 XioConnection* xcon __attribute__((unused)) =
177 static_cast<XioConnection*>(conn_user_context);
178 CephContext *cct = xcon->get_messenger()->cct;
180 ldout(cct,25) << "on cancel: session: " << session << " msg: " << msg
181 << " conn_user_context " << conn_user_context << dendl;
186 static int on_cancel_request(struct xio_session *session,
188 void *conn_user_context)
190 XioConnection* xcon __attribute__((unused)) =
191 static_cast<XioConnection*>(conn_user_context);
192 CephContext *cct = xcon->get_messenger()->cct;
194 ldout(cct,25) << "on cancel request: session: " << session << " msg: " << msg
195 << " conn_user_context " << conn_user_context << dendl;
201 static string xio_uri_from_entity(const string &type,
202 const entity_addr_t& addr, bool want_port)
204 const char *host = NULL;
208 switch(addr.get_family()) {
210 host = inet_ntop(AF_INET, &addr.in4_addr().sin_addr, addr_buf,
214 host = inet_ntop(AF_INET6, &addr.in6_addr().sin6_addr, addr_buf,
222 if (type == "rdma" || type == "tcp")
223 xio_uri = type + "://";
227 /* The following can only succeed if the host is rdma-capable */
231 xio_uri += boost::lexical_cast<std::string>(addr.get_port());
235 } /* xio_uri_from_entity */
237 void XioInit::package_init(CephContext *cct) {
245 // claim a reference to the first context we see
246 xio_log::context = cct->get();
249 xopt = xio_log::get_level();
250 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_LEVEL,
251 &xopt, sizeof(xopt));
252 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_FN,
253 (const void*)xio_log::log_dout, sizeof(xio_log_fn));
256 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_DISABLE_HUGETBL,
257 &xopt, sizeof(xopt));
259 if (g_code_env == CODE_ENVIRONMENT_DAEMON) {
261 xio_set_opt(NULL, XIO_OPTLEVEL_RDMA, XIO_OPTNAME_ENABLE_FORK_INIT,
262 &xopt, sizeof(xopt));
265 xopt = XIO_MSGR_IOVLEN;
266 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_IN_IOVLEN,
267 &xopt, sizeof(xopt));
268 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN,
269 &xopt, sizeof(xopt));
271 /* enable flow-control */
273 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL,
274 &xopt, sizeof(xopt));
276 /* and set threshold for buffer callouts */
277 xopt = max(cct->_conf->xio_max_send_inline, 512);
278 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_DATA,
279 &xopt, sizeof(xopt));
281 xopt = XioMsgHdr::get_max_encoded_length();
282 ldout(cct,2) << "setting accelio max header size " << xopt << dendl;
283 xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_HEADER,
284 &xopt, sizeof(xopt));
286 size_t queue_depth = cct->_conf->xio_queue_depth;
287 struct xio_mempool_config mempool_config = {
290 {1024, 0, queue_depth, 262144},
291 {4096, 0, queue_depth, 262144},
292 {16384, 0, queue_depth, 262144},
293 {65536, 0, 128, 65536},
294 {262144, 0, 32, 16384},
295 {1048576, 0, 8, 8192}
299 XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_CONFIG_MEMPOOL,
300 &mempool_config, sizeof(mempool_config));
302 /* and unregisterd one */
303 #define XMSG_MEMPOOL_QUANTUM 4096
305 xio_msgr_noreg_mpool =
306 xio_mempool_create(-1 /* nodeid */,
307 XIO_MEMPOOL_FLAG_REGULAR_PAGES_ALLOC);
309 (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 64,
310 cct->_conf->xio_mp_min,
311 cct->_conf->xio_mp_max_64,
312 XMSG_MEMPOOL_QUANTUM, 0);
313 (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 256,
314 cct->_conf->xio_mp_min,
315 cct->_conf->xio_mp_max_256,
316 XMSG_MEMPOOL_QUANTUM, 0);
317 (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 1024,
318 cct->_conf->xio_mp_min,
319 cct->_conf->xio_mp_max_1k,
320 XMSG_MEMPOOL_QUANTUM, 0);
321 (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, getpagesize(),
322 cct->_conf->xio_mp_min,
323 cct->_conf->xio_mp_max_page,
324 XMSG_MEMPOOL_QUANTUM, 0);
326 /* initialize ops singleton */
327 xio_msgr_ops.on_session_event = on_session_event;
328 xio_msgr_ops.on_new_session = on_new_session;
329 xio_msgr_ops.on_session_established = NULL;
330 xio_msgr_ops.on_msg = on_msg;
331 xio_msgr_ops.on_ow_msg_send_complete = on_ow_msg_send_complete;
332 xio_msgr_ops.on_msg_error = on_msg_error;
333 xio_msgr_ops.on_cancel = on_cancel;
334 xio_msgr_ops.on_cancel_request = on_cancel_request;
336 /* mark initialized */
345 #define dout_prefix _prefix(_dout, this)
346 static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) {
347 return *_dout << "-- " << msgr->get_myaddr() << " ";
350 XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
351 string mname, uint64_t _nonce,
352 uint64_t cflags, DispatchStrategy *ds)
353 : SimplePolicyMessenger(cct, name, mname, _nonce),
355 portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)),
356 dispatch_strategy(ds),
357 loop_con(new XioLoopbackConnection(this)),
359 sh_mtx("XioMessenger session mutex"),
366 if (cct->_conf->xio_trace_xcon)
367 magic |= MSG_MAGIC_TRACE_XCON;
369 XioPool::trace_mempool = (cct->_conf->xio_trace_mempool);
370 XioPool::trace_msgcnt = (cct->_conf->xio_trace_msgcnt);
372 dispatch_strategy->set_messenger(this);
374 /* update class instance count */
377 loop_con->set_features(CEPH_FEATURES_ALL);
379 ldout(cct,2) << "Create msgr: " << this << " instance: "
380 << nInstances << " type: " << name.type_str()
381 << " subtype: " << mname << " nportals: " << get_nportals(cflags)
382 << " nconns_per_portal: " << get_nconns_per_portal(cflags)
387 int XioMessenger::pool_hint(uint32_t dsize) {
388 if (dsize > 1024*1024)
391 /* if dsize is already present, returns -EEXIST */
392 return xio_mempool_add_slab(xio_msgr_noreg_mpool, dsize, 0,
393 cct->_conf->xio_mp_max_hint,
394 XMSG_MEMPOOL_QUANTUM, 0);
397 int XioMessenger::get_nconns_per_portal(uint64_t cflags)
399 const int XIO_DEFAULT_NUM_CONNS_PER_PORTAL = 8;
400 int nconns = XIO_DEFAULT_NUM_CONNS_PER_PORTAL;
402 if (cflags & Messenger::HAS_MANY_CONNECTIONS)
403 nconns = max(cct->_conf->xio_max_conns_per_portal, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
404 else if (cflags & Messenger::HEARTBEAT)
405 nconns = max(cct->_conf->osd_heartbeat_min_peers * 4, XIO_DEFAULT_NUM_CONNS_PER_PORTAL);
410 int XioMessenger::get_nportals(uint64_t cflags)
414 if (cflags & Messenger::HAS_HEAVY_TRAFFIC)
415 nportals = max(cct->_conf->xio_portal_threads, 1);
420 void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
422 // be careful here: multiple threads may block here, and readers of
423 // my_inst.addr do NOT hold any lock.
425 // this always goes from true -> false under the protection of the
426 // mutex. if it is already false, we need not retake the mutex at
433 entity_addr_t t = peer_addr_for_me;
434 t.set_port(my_inst.addr.get_port());
435 my_inst.addr.set_sockaddr(t.get_sockaddr());
436 ldout(cct,2) << "learned my addr " << my_inst.addr << dendl;
438 // init_local_connection();
444 int XioMessenger::new_session(struct xio_session *session,
445 struct xio_new_session_req *req,
446 void *cb_user_context)
448 if (shutdown_called) {
450 session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */);
452 int code = portals.accept(session, req, cb_user_context);
458 int XioMessenger::session_event(struct xio_session *session,
459 struct xio_session_event_data *event_data,
460 void *cb_user_context)
464 switch (event_data->event) {
465 case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT:
467 struct xio_connection *conn = event_data->conn;
468 struct xio_connection_attr xcona;
469 entity_addr_t peer_addr_for_me, paddr;
471 xcon = static_cast<XioConnection*>(event_data->conn_user_context);
473 ldout(cct,2) << "connection established " << event_data->conn
474 << " session " << session << " xcon " << xcon << dendl;
476 (void) xio_query_connection(conn, &xcona,
477 XIO_CONNECTION_ATTR_LOCAL_ADDR|
478 XIO_CONNECTION_ATTR_PEER_ADDR);
479 peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
480 paddr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
481 //set_myaddr(peer_addr_for_me);
482 learned_addr(peer_addr_for_me);
483 ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl;
486 this->ms_deliver_handle_connect(xcon);
487 this->ms_deliver_handle_fast_connect(xcon);
491 case XIO_SESSION_NEW_CONNECTION_EVENT:
493 struct xio_connection *conn = event_data->conn;
494 struct xio_connection_attr xcona;
495 entity_inst_t s_inst;
496 entity_addr_t peer_addr_for_me;
498 (void) xio_query_connection(conn, &xcona,
499 XIO_CONNECTION_ATTR_CTX|
500 XIO_CONNECTION_ATTR_PEER_ADDR|
501 XIO_CONNECTION_ATTR_LOCAL_ADDR);
502 /* XXX assumes RDMA */
503 s_inst.addr.set_sockaddr((struct sockaddr *)&xcona.peer_addr);
504 peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr);
506 xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst);
507 xcon->session = session;
509 struct xio_context_attr xctxa;
510 (void) xio_query_context(xcona.ctx, &xctxa, XIO_CONTEXT_ATTR_USER_CTX);
513 xcon->portal = static_cast<XioPortal*>(xctxa.user_context);
514 assert(xcon->portal);
516 xcona.user_context = xcon;
517 (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX);
519 xcon->connected = true;
522 xcon->get(); /* xcon->nref == 1 */
524 conns_list.push_back(*xcon);
525 /* XXX we can't put xcon in conns_entity_map becase we don't yet know
526 * it's peer address */
529 /* XXXX pre-merge of session startup negotiation ONLY! */
530 xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
532 ldout(cct,2) << "New connection session " << session
533 << " xcon " << xcon << " on msgr: " << this << " portal: " << xcon->portal << dendl;
534 ldout(cct,2) << "Server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl;
537 case XIO_SESSION_CONNECTION_ERROR_EVENT:
538 case XIO_SESSION_CONNECTION_CLOSED_EVENT: /* orderly discon */
539 case XIO_SESSION_CONNECTION_DISCONNECTED_EVENT: /* unexpected discon */
540 case XIO_SESSION_CONNECTION_REFUSED_EVENT:
541 xcon = static_cast<XioConnection*>(event_data->conn_user_context);
542 ldout(cct,2) << xio_session_event_str(event_data->event)
543 << " xcon " << xcon << " session " << session << dendl;
544 if (likely(!!xcon)) {
545 unregister_xcon(xcon);
546 xcon->on_disconnect_event();
549 case XIO_SESSION_CONNECTION_TEARDOWN_EVENT:
550 xcon = static_cast<XioConnection*>(event_data->conn_user_context);
551 ldout(cct,2) << xio_session_event_str(event_data->event)
552 << " xcon " << xcon << " session " << session << dendl;
554 * There are flows where Accelio sends teardown event without going
555 * through disconnect event. so we make sure we cleaned the connection.
557 unregister_xcon(xcon);
558 xcon->on_teardown_event();
560 case XIO_SESSION_TEARDOWN_EVENT:
561 ldout(cct,2) << xio_session_event_str(event_data->event)
562 << " session " << session << dendl;
563 if (unlikely(XioPool::trace_mempool)) {
564 xp_stats.dump("xio session dtor", reinterpret_cast<uint64_t>(session));
566 xio_session_destroy(session);
567 if (--nsessions == 0) {
568 Mutex::Locker lck(sh_mtx);
587 #define MAX_XIO_BUF_SIZE 1044480
590 xio_count_buffers(const buffer::list& bl, int& req_size, int& msg_off, int& req_off)
593 const std::list<buffer::ptr>& buffers = bl.buffers();
594 list<bufferptr>::const_iterator pb;
603 if (first) pb = buffers.begin(); else ++pb;
604 if (pb == buffers.end()) {
611 size_t count = size - off;
612 if (!count) continue;
613 if (req_size + count > MAX_XIO_BUF_SIZE) {
614 count = MAX_XIO_BUF_SIZE - req_size;
619 /* advance iov and perhaps request */
624 if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
635 xio_place_buffers(const buffer::list& bl, XioMsg *xmsg, struct xio_msg*& req,
636 struct xio_iovec_ex*& msg_iov, int& req_size,
637 int ex_cnt, int& msg_off, int& req_off, bl_type type)
640 const std::list<buffer::ptr>& buffers = bl.buffers();
641 list<bufferptr>::const_iterator pb;
642 struct xio_iovec_ex* iov;
644 const char *data = NULL;
650 if (first) pb = buffers.begin(); else ++pb;
651 if (pb == buffers.end()) {
656 data = pb->c_str(); // is c_str() efficient?
659 size_t count = size - off;
660 if (!count) continue;
661 if (req_size + count > MAX_XIO_BUF_SIZE) {
662 count = MAX_XIO_BUF_SIZE - req_size;
666 iov = &msg_iov[msg_off];
667 iov->iov_base = (void *) (&data[off]);
668 iov->iov_len = count;
675 struct xio_reg_mem *mp = get_xio_mp(*pb);
676 iov->mr = (mp) ? mp->mr : NULL;
687 /* next request if necessary */
689 if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) {
690 /* finish this request */
691 req->out.pdata_iov.nents = msg_off;
692 /* advance to next, and write in it if it's not the last one. */
693 if (++req_off >= ex_cnt) {
694 req = 0; /* poison. trap if we try to use it. */
697 req = &xmsg->req_arr[req_off].msg;
698 msg_iov = req->out.pdata_iov.sglist;
706 int XioMessenger::bind(const entity_addr_t& addr)
708 if (addr.is_blank_ip()) {
709 lderr(cct) << "ERROR: need rdma ip for remote use! " << dendl;
710 cout << "Error: xio bind failed. public/cluster ip not specified" << std::endl;
714 entity_addr_t shift_addr = addr;
715 string base_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
716 shift_addr, false /* want_port */);
717 ldout(cct,4) << "XioMessenger " << this << " bind: xio_uri "
718 << base_uri << ':' << shift_addr.get_port() << dendl;
721 int r = portals.bind(&xio_msgr_ops, base_uri, shift_addr.get_port(), &port0);
723 shift_addr.set_port(port0);
724 shift_addr.nonce = nonce;
725 set_myaddr(shift_addr);
732 int XioMessenger::rebind(const set<int>& avoid_ports)
734 ldout(cct,4) << "XioMessenger " << this << " rebind attempt" << dendl;
738 int XioMessenger::start()
741 dispatch_strategy->start();
743 my_inst.addr.nonce = nonce;
749 void XioMessenger::wait()
752 dispatch_strategy->wait();
755 int XioMessenger::_send_message(Message *m, const entity_inst_t& dest)
757 ConnectionRef conn = get_connection(dest);
759 return _send_message(m, &(*conn));
762 } /* send_message(Message *, const entity_inst_t&) */
764 static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
767 struct xio_reg_mem mp_mem;
768 int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioMsg), &mp_mem);
771 XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
773 new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
777 XioCommand* pool_alloc_xio_command(XioConnection *xcon)
779 struct xio_reg_mem mp_mem;
780 int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem);
783 XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr);
785 new (xcmd) XioCommand(xcon, mp_mem);
789 int XioMessenger::_send_message(Message *m, Connection *con)
791 if (con == loop_con.get() /* intrusive_ptr get() */) {
792 m->set_connection(con);
793 m->set_src(get_myinst().name);
794 m->set_seq(loop_con->next_seq());
799 XioConnection *xcon = static_cast<XioConnection*>(con);
801 /* If con is not in READY state, we have to enforce policy */
802 if (xcon->cstate.session_state.read() != XioConnection::UP) {
803 pthread_spin_lock(&xcon->sp);
804 if (xcon->cstate.session_state.read() != XioConnection::UP) {
805 xcon->outgoing.mqueue.push_back(*m);
806 pthread_spin_unlock(&xcon->sp);
809 pthread_spin_unlock(&xcon->sp);
812 return _send_message_impl(m, xcon);
813 } /* send_message(Message* m, Connection *con) */
815 int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon)
819 Mutex::Locker l(xcon->lock);
820 if (unlikely(XioPool::trace_mempool)) {
821 static uint32_t nreqs;
822 if (unlikely((++nreqs % 65536) == 0)) {
823 xp_stats.dump(__func__, nreqs);
827 m->set_seq(xcon->state.next_out_seq());
828 m->set_magic(magic); // trace flags and special handling
830 m->encode(xcon->get_features(), this->crcflags);
832 buffer::list &payload = m->get_payload();
833 buffer::list &middle = m->get_middle();
834 buffer::list &data = m->get_data();
840 xio_count_buffers(payload, req_size, msg_off, req_off) +
841 xio_count_buffers(middle, req_size, msg_off, req_off) +
842 xio_count_buffers(data, req_size, msg_off, req_off);
844 int ex_cnt = req_off;
845 if (msg_off == 0 && ex_cnt > 0) {
846 // no buffers for last msg
847 ldout(cct,10) << "msg_off 0, ex_cnt " << ex_cnt << " -> " << ex_cnt-1 << dendl;
851 /* get an XioMsg frame */
852 XioMsg *xmsg = pool_alloc_xio_msg(m, xcon, ex_cnt);
854 /* could happen if Accelio has been shutdown */
858 ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg
859 << " tag " << (int)xmsg->hdr.tag
860 << " req_0 " << xmsg->get_xio_msg() << " msg type " << m->get_type()
861 << " features: " << xcon->get_features()
862 << " conn " << xcon->conn << " sess " << xcon->session << dendl;
864 if (magic & (MSG_MAGIC_XIO)) {
867 switch (m->get_type()) {
870 ldout(cct,4) << __func__ << "stop 43 " << m->get_type() << " " << *m << dendl;
871 buffer::list &payload = m->get_payload();
872 ldout(cct,4) << __func__ << "payload dump:" << dendl;
873 payload.hexdump(cout);
877 struct xio_msg *req = xmsg->get_xio_msg();
878 struct xio_iovec_ex *msg_iov = req->out.pdata_iov.sglist;
880 if (magic & (MSG_MAGIC_XIO)) {
881 ldout(cct,4) << "payload: " << payload.buffers().size() <<
882 " middle: " << middle.buffers().size() <<
883 " data: " << data.buffers().size() <<
887 if (unlikely(ex_cnt > 0)) {
888 ldout(cct,4) << __func__ << " buffer cnt > XIO_MSGR_IOVLEN (" <<
889 ((XIO_MSGR_IOVLEN-1) + nbuffers) << ")" << dendl;
892 /* do the invariant part */
894 req_off = -1; /* most often, not used */
897 xio_place_buffers(payload, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
898 req_off, BUFFER_PAYLOAD);
900 xio_place_buffers(middle, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
901 req_off, BUFFER_MIDDLE);
903 xio_place_buffers(data, xmsg, req, msg_iov, req_size, ex_cnt, msg_off,
904 req_off, BUFFER_DATA);
905 ldout(cct,10) << "ex_cnt " << ex_cnt << ", req_off " << req_off
906 << ", msg_cnt " << xmsg->get_msg_count() << dendl;
908 /* finalize request */
910 req->out.pdata_iov.nents = msg_off;
912 /* fixup first msg */
913 req = xmsg->get_xio_msg();
915 const std::list<buffer::ptr>& header = xmsg->hdr.get_bl().buffers();
916 assert(header.size() == 1); /* XXX */
917 list<bufferptr>::const_iterator pb = header.begin();
918 req->out.header.iov_base = (char*) pb->c_str();
919 req->out.header.iov_len = pb->length();
921 /* deliver via xio, preserve ordering */
922 if (xmsg->get_msg_count() > 1) {
923 struct xio_msg *head = xmsg->get_xio_msg();
924 struct xio_msg *tail = head;
925 for (req_off = 0; ((unsigned) req_off) < xmsg->get_msg_count()-1; ++req_off) {
926 req = &xmsg->req_arr[req_off].msg;
927 assert(!req->in.pdata_iov.nents);
928 assert(req->out.pdata_iov.nents || !nbuffers);
934 xmsg->trace = m->trace;
935 m->trace.event("xio portal enqueue for send");
936 m->trace.keyval("xio message segments", xmsg->hdr.msg_cnt);
937 xcon->portal->enqueue_for_send(xcon, xmsg);
940 } /* send_message(Message *, Connection *) */
942 int XioMessenger::shutdown()
944 shutdown_called = true;
946 XioConnection::ConnList::iterator iter;
947 iter = conns_list.begin();
948 for (iter = conns_list.begin(); iter != conns_list.end(); ++iter) {
949 (void) iter->disconnect(); // XXX mark down?
952 while(nsessions > 0) {
953 Mutex::Locker lck(sh_mtx);
955 sh_cond.Wait(sh_mtx);
958 dispatch_strategy->shutdown();
964 ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest)
969 const entity_inst_t& self_inst = get_myinst();
970 if ((&dest == &self_inst) ||
971 (dest == self_inst)) {
972 return get_loopback_connection();
976 XioConnection::EntitySet::iterator conn_iter =
977 conns_entity_map.find(dest, XioConnection::EntityComp());
978 if (conn_iter != conns_entity_map.end()) {
979 ConnectionRef cref = &(*conn_iter);
985 string xio_uri = xio_uri_from_entity(cct->_conf->xio_transport_type,
986 dest.addr, true /* want_port */);
988 ldout(cct,4) << "XioMessenger " << this << " get_connection: xio_uri "
991 /* XXX client session creation parameters */
992 struct xio_session_params params = {};
993 params.type = XIO_SESSION_CLIENT;
994 params.ses_ops = &xio_msgr_ops;
995 params.user_context = this;
996 params.uri = xio_uri.c_str();
998 XioConnection *xcon = new XioConnection(this, XioConnection::ACTIVE,
1001 xcon->session = xio_session_create(¶ms);
1002 if (! xcon->session) {
1007 /* this should cause callbacks with user context of conn, but
1008 * we can always set it explicitly */
1009 struct xio_connection_params xcp = {};
1010 xcp.session = xcon->session;
1011 xcp.ctx = xcon->portal->ctx;
1012 xcp.conn_user_context = xcon;
1014 xcon->conn = xio_connect(&xcp);
1016 xio_session_destroy(xcon->session);
1022 xcon->connected = true;
1025 xcon->get(); /* xcon->nref == 1 */
1027 conns_list.push_back(*xcon);
1028 conns_entity_map.insert(*xcon);
1031 /* XXXX pre-merge of session startup negotiation ONLY! */
1032 xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
1034 ldout(cct,2) << "New connection xcon: " << xcon <<
1035 " up_ready on session " << xcon->session <<
1036 " on msgr: " << this << " portal: " << xcon->portal << dendl;
1038 return xcon->get(); /* nref +1 */
1040 } /* get_connection */
1042 ConnectionRef XioMessenger::get_loopback_connection()
1044 return (loop_con.get());
1045 } /* get_loopback_connection */
1047 void XioMessenger::unregister_xcon(XioConnection *xcon)
1049 Spinlock::Locker lckr(conns_sp);
1051 XioConnection::EntitySet::iterator conn_iter =
1052 conns_entity_map.find(xcon->peer, XioConnection::EntityComp());
1053 if (conn_iter != conns_entity_map.end()) {
1054 XioConnection *xcon2 = &(*conn_iter);
1055 if (xcon == xcon2) {
1056 conns_entity_map.erase(conn_iter);
1060 /* check if citer on conn_list */
1061 if (xcon->conns_hook.is_linked()) {
1062 /* now find xcon on conns_list and erase */
1063 XioConnection::ConnList::iterator citer =
1064 XioConnection::ConnList::s_iterator_to(*xcon);
1065 conns_list.erase(citer);
1069 void XioMessenger::mark_down(const entity_addr_t& addr)
1071 entity_inst_t inst(entity_name_t(), addr);
1072 Spinlock::Locker lckr(conns_sp);
1073 XioConnection::EntitySet::iterator conn_iter =
1074 conns_entity_map.find(inst, XioConnection::EntityComp());
1075 if (conn_iter != conns_entity_map.end()) {
1076 (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
1078 } /* mark_down(const entity_addr_t& */
1080 void XioMessenger::mark_down(Connection* con)
1082 XioConnection *xcon = static_cast<XioConnection*>(con);
1083 xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
1084 } /* mark_down(Connection*) */
1086 void XioMessenger::mark_down_all()
1088 Spinlock::Locker lckr(conns_sp);
1089 XioConnection::EntitySet::iterator conn_iter;
1090 for (conn_iter = conns_entity_map.begin(); conn_iter !=
1091 conns_entity_map.begin(); ++conn_iter) {
1092 (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
1094 } /* mark_down_all */
1096 static inline XioMarkDownHook* pool_alloc_markdown_hook(
1097 XioConnection *xcon, Message *m)
1099 struct xio_reg_mem mp_mem;
1100 int e = xio_mempool_alloc(xio_msgr_noreg_mpool,
1101 sizeof(XioMarkDownHook), &mp_mem);
1104 XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr);
1105 new (hook) XioMarkDownHook(xcon, m, mp_mem);
1109 void XioMessenger::mark_down_on_empty(Connection* con)
1111 XioConnection *xcon = static_cast<XioConnection*>(con);
1112 MNop* m = new MNop();
1113 m->tag = XIO_NOP_TAG_MARKDOWN;
1114 m->set_completion_hook(pool_alloc_markdown_hook(xcon, m));
1115 // stall new messages
1116 xcon->cstate.session_state = XioConnection::session_states::BARRIER;
1117 (void) _send_message_impl(m, xcon);
1120 void XioMessenger::mark_disposable(Connection *con)
1122 XioConnection *xcon = static_cast<XioConnection*>(con);
1123 xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE);
1126 void XioMessenger::try_insert(XioConnection *xcon)
1128 Spinlock::Locker lckr(conns_sp);
1129 /* already resident in conns_list */
1130 conns_entity_map.insert(*xcon);
1133 XioMessenger::~XioMessenger()
1135 delete dispatch_strategy;