X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fxio%2FXioMessenger.cc;fp=src%2Fceph%2Fsrc%2Fmsg%2Fxio%2FXioMessenger.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=6bf4d52c726def8df952e7570f8c81d24736a87d;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/xio/XioMessenger.cc b/src/ceph/src/msg/xio/XioMessenger.cc deleted file mode 100644 index 6bf4d52..0000000 --- a/src/ceph/src/msg/xio/XioMessenger.cc +++ /dev/null @@ -1,1137 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * Portions Copyright (C) 2013 CohortFS, LLC - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include -#include -#include -#include -#include - -#include "XioMsg.h" -#include "XioMessenger.h" -#include "common/address_helper.h" -#include "common/code_environment.h" -#include "messages/MNop.h" - -#define dout_subsys ceph_subsys_xio -#undef dout_prefix -#define dout_prefix *_dout << "xio." - -Mutex mtx("XioMessenger Package Lock"); -std::atomic initialized = { false }; - -std::atomic XioMessenger::nInstances = { 0 }; - -struct xio_mempool *xio_msgr_noreg_mpool; - -static struct xio_session_ops xio_msgr_ops; - -/* Accelio API callouts */ - -namespace xio_log -{ -typedef pair level_pair; -static const level_pair LEVELS[] = { - make_pair("fatal", 0), - make_pair("error", 0), - make_pair("warn", 1), - make_pair("info", 1), - make_pair("debug", 2), - make_pair("trace", 20) -}; - -static CephContext *context; - -int get_level() -{ - int level = 0; - for (size_t i = 0; i < sizeof(LEVELS); i++) { - if (!ldlog_p1(context, dout_subsys, LEVELS[i].second)) - break; - level++; - } - return level; -} - -void log_dout(const char *file, unsigned line, - const char *function, unsigned level, - const char *fmt, ...) -{ - char buffer[2048]; - va_list args; - va_start(args, fmt); - int n = vsnprintf(buffer, sizeof(buffer), fmt, args); - va_end(args); - - if (n > 0) { - const char *short_file = strrchr(file, '/'); - short_file = (short_file == NULL) ? file : short_file + 1; - - const level_pair &lvl = LEVELS[level]; - ldout(context, lvl.second) << '[' << lvl.first << "] " - << short_file << ':' << line << ' ' - << function << " - " << buffer << dendl; - } -} -} - -static int on_session_event(struct xio_session *session, - struct xio_session_event_data *event_data, - void *cb_user_context) -{ - XioMessenger *msgr = static_cast(cb_user_context); - CephContext *cct = msgr->cct; - - ldout(cct,4) << "session event: " << xio_session_event_str(event_data->event) - << ". reason: " << xio_strerror(event_data->reason) << dendl; - - return msgr->session_event(session, event_data, cb_user_context); -} - -static int on_new_session(struct xio_session *session, - struct xio_new_session_req *req, - void *cb_user_context) -{ - XioMessenger *msgr = static_cast(cb_user_context); - CephContext *cct = msgr->cct; - - ldout(cct,4) << "new session " << session - << " user_context " << cb_user_context << dendl; - - return (msgr->new_session(session, req, cb_user_context)); -} - -static int on_msg(struct xio_session *session, - struct xio_msg *req, - int more_in_batch, - void *cb_user_context) -{ - XioConnection* xcon __attribute__((unused)) = - static_cast(cb_user_context); - CephContext *cct = xcon->get_messenger()->cct; - - ldout(cct,25) << "on_msg session " << session << " xcon " << xcon << dendl; - - if (unlikely(XioPool::trace_mempool)) { - static uint32_t nreqs; - if (unlikely((++nreqs % 65536) == 0)) { - xp_stats.dump(__func__, nreqs); - } - } - - return xcon->on_msg(session, req, more_in_batch, - cb_user_context); -} - -static int on_ow_msg_send_complete(struct xio_session *session, - struct xio_msg *msg, - void *conn_user_context) -{ - XioConnection *xcon = - static_cast(conn_user_context); - CephContext *cct = xcon->get_messenger()->cct; - - ldout(cct,25) << "msg delivered session: " << session - << " msg: " << msg << " conn_user_context " - << conn_user_context << dendl; - - return xcon->on_ow_msg_send_complete(session, msg, conn_user_context); -} - -static int on_msg_error(struct xio_session *session, - enum xio_status error, - enum xio_msg_direction dir, - struct xio_msg *msg, - void *conn_user_context) -{ - /* XIO promises to flush back undelivered messages */ - XioConnection *xcon = - static_cast(conn_user_context); - CephContext *cct = xcon->get_messenger()->cct; - - ldout(cct,4) << "msg error session: " << session - << " error: " << xio_strerror(error) << " msg: " << msg - << " conn_user_context " << conn_user_context << dendl; - - return xcon->on_msg_error(session, error, msg, conn_user_context); -} - -static int on_cancel(struct xio_session *session, - struct xio_msg *msg, - enum xio_status result, - void *conn_user_context) -{ - XioConnection* xcon __attribute__((unused)) = - static_cast(conn_user_context); - CephContext *cct = xcon->get_messenger()->cct; - - ldout(cct,25) << "on cancel: session: " << session << " msg: " << msg - << " conn_user_context " << conn_user_context << dendl; - - return 0; -} - -static int on_cancel_request(struct xio_session *session, - struct xio_msg *msg, - void *conn_user_context) -{ - XioConnection* xcon __attribute__((unused)) = - static_cast(conn_user_context); - CephContext *cct = xcon->get_messenger()->cct; - - ldout(cct,25) << "on cancel request: session: " << session << " msg: " << msg - << " conn_user_context " << conn_user_context << dendl; - - return 0; -} - -/* free functions */ -static string xio_uri_from_entity(const string &type, - const entity_addr_t& addr, bool want_port) -{ - const char *host = NULL; - char addr_buf[129]; - string xio_uri; - - switch(addr.get_family()) { - case AF_INET: - host = inet_ntop(AF_INET, &addr.in4_addr().sin_addr, addr_buf, - INET_ADDRSTRLEN); - break; - case AF_INET6: - host = inet_ntop(AF_INET6, &addr.in6_addr().sin6_addr, addr_buf, - INET6_ADDRSTRLEN); - break; - default: - abort(); - break; - }; - - if (type == "rdma" || type == "tcp") - xio_uri = type + "://"; - else - xio_uri = "rdma://"; - - /* The following can only succeed if the host is rdma-capable */ - xio_uri += host; - if (want_port) { - xio_uri += ":"; - xio_uri += boost::lexical_cast(addr.get_port()); - } - - return xio_uri; -} /* xio_uri_from_entity */ - -void XioInit::package_init(CephContext *cct) { - if (! initialized) { - - mtx.Lock(); - if (! initialized) { - - xio_init(); - - // claim a reference to the first context we see - xio_log::context = cct->get(); - - int xopt; - xopt = xio_log::get_level(); - xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_LEVEL, - &xopt, sizeof(xopt)); - xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_LOG_FN, - (const void*)xio_log::log_dout, sizeof(xio_log_fn)); - - xopt = 1; - xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_DISABLE_HUGETBL, - &xopt, sizeof(xopt)); - - if (g_code_env == CODE_ENVIRONMENT_DAEMON) { - xopt = 1; - xio_set_opt(NULL, XIO_OPTLEVEL_RDMA, XIO_OPTNAME_ENABLE_FORK_INIT, - &xopt, sizeof(xopt)); - } - - xopt = XIO_MSGR_IOVLEN; - xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_IN_IOVLEN, - &xopt, sizeof(xopt)); - xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN, - &xopt, sizeof(xopt)); - - /* enable flow-control */ - xopt = 1; - xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL, - &xopt, sizeof(xopt)); - - /* and set threshold for buffer callouts */ - xopt = max(cct->_conf->xio_max_send_inline, 512); - xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_DATA, - &xopt, sizeof(xopt)); - - xopt = XioMsgHdr::get_max_encoded_length(); - ldout(cct,2) << "setting accelio max header size " << xopt << dendl; - xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_INLINE_XIO_HEADER, - &xopt, sizeof(xopt)); - - size_t queue_depth = cct->_conf->xio_queue_depth; - struct xio_mempool_config mempool_config = { - 6, - { - {1024, 0, queue_depth, 262144}, - {4096, 0, queue_depth, 262144}, - {16384, 0, queue_depth, 262144}, - {65536, 0, 128, 65536}, - {262144, 0, 32, 16384}, - {1048576, 0, 8, 8192} - } - }; - xio_set_opt(NULL, - XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_CONFIG_MEMPOOL, - &mempool_config, sizeof(mempool_config)); - - /* and unregisterd one */ - #define XMSG_MEMPOOL_QUANTUM 4096 - - xio_msgr_noreg_mpool = - xio_mempool_create(-1 /* nodeid */, - XIO_MEMPOOL_FLAG_REGULAR_PAGES_ALLOC); - - (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 64, - cct->_conf->xio_mp_min, - cct->_conf->xio_mp_max_64, - XMSG_MEMPOOL_QUANTUM, 0); - (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 256, - cct->_conf->xio_mp_min, - cct->_conf->xio_mp_max_256, - XMSG_MEMPOOL_QUANTUM, 0); - (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, 1024, - cct->_conf->xio_mp_min, - cct->_conf->xio_mp_max_1k, - XMSG_MEMPOOL_QUANTUM, 0); - (void) xio_mempool_add_slab(xio_msgr_noreg_mpool, getpagesize(), - cct->_conf->xio_mp_min, - cct->_conf->xio_mp_max_page, - XMSG_MEMPOOL_QUANTUM, 0); - - /* initialize ops singleton */ - xio_msgr_ops.on_session_event = on_session_event; - xio_msgr_ops.on_new_session = on_new_session; - xio_msgr_ops.on_session_established = NULL; - xio_msgr_ops.on_msg = on_msg; - xio_msgr_ops.on_ow_msg_send_complete = on_ow_msg_send_complete; - xio_msgr_ops.on_msg_error = on_msg_error; - xio_msgr_ops.on_cancel = on_cancel; - xio_msgr_ops.on_cancel_request = on_cancel_request; - - /* mark initialized */ - initialized = true; - } - mtx.Unlock(); - } - } - -/* XioMessenger */ -#undef dout_prefix -#define dout_prefix _prefix(_dout, this) -static ostream& _prefix(std::ostream *_dout, XioMessenger *msgr) { - return *_dout << "-- " << msgr->get_myaddr() << " "; -} - -XioMessenger::XioMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t _nonce, - uint64_t cflags, DispatchStrategy *ds) - : SimplePolicyMessenger(cct, name, mname, _nonce), - XioInit(cct), - portals(this, get_nportals(cflags), get_nconns_per_portal(cflags)), - dispatch_strategy(ds), - loop_con(new XioLoopbackConnection(this)), - special_handling(0), - sh_mtx("XioMessenger session mutex"), - sh_cond(), - need_addr(true), - did_bind(false), - nonce(_nonce) -{ - - if (cct->_conf->xio_trace_xcon) - magic |= MSG_MAGIC_TRACE_XCON; - - XioPool::trace_mempool = (cct->_conf->xio_trace_mempool); - XioPool::trace_msgcnt = (cct->_conf->xio_trace_msgcnt); - - dispatch_strategy->set_messenger(this); - - /* update class instance count */ - nInstances++; - - loop_con->set_features(CEPH_FEATURES_ALL); - - ldout(cct,2) << "Create msgr: " << this << " instance: " - << nInstances << " type: " << name.type_str() - << " subtype: " << mname << " nportals: " << get_nportals(cflags) - << " nconns_per_portal: " << get_nconns_per_portal(cflags) - << dendl; - -} /* ctor */ - -int XioMessenger::pool_hint(uint32_t dsize) { - if (dsize > 1024*1024) - return 0; - - /* if dsize is already present, returns -EEXIST */ - return xio_mempool_add_slab(xio_msgr_noreg_mpool, dsize, 0, - cct->_conf->xio_mp_max_hint, - XMSG_MEMPOOL_QUANTUM, 0); -} - -int XioMessenger::get_nconns_per_portal(uint64_t cflags) -{ - const int XIO_DEFAULT_NUM_CONNS_PER_PORTAL = 8; - int nconns = XIO_DEFAULT_NUM_CONNS_PER_PORTAL; - - if (cflags & Messenger::HAS_MANY_CONNECTIONS) - nconns = max(cct->_conf->xio_max_conns_per_portal, XIO_DEFAULT_NUM_CONNS_PER_PORTAL); - else if (cflags & Messenger::HEARTBEAT) - nconns = max(cct->_conf->osd_heartbeat_min_peers * 4, XIO_DEFAULT_NUM_CONNS_PER_PORTAL); - - return nconns; -} - -int XioMessenger::get_nportals(uint64_t cflags) -{ - int nportals = 1; - - if (cflags & Messenger::HAS_HEAVY_TRAFFIC) - nportals = max(cct->_conf->xio_portal_threads, 1); - - return nportals; -} - -void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) -{ - // be careful here: multiple threads may block here, and readers of - // my_inst.addr do NOT hold any lock. - - // this always goes from true -> false under the protection of the - // mutex. if it is already false, we need not retake the mutex at - // all. - if (!need_addr) - return; - - sh_mtx.Lock(); - if (need_addr) { - entity_addr_t t = peer_addr_for_me; - t.set_port(my_inst.addr.get_port()); - my_inst.addr.set_sockaddr(t.get_sockaddr()); - ldout(cct,2) << "learned my addr " << my_inst.addr << dendl; - need_addr = false; - // init_local_connection(); - } - sh_mtx.Unlock(); - -} - -int XioMessenger::new_session(struct xio_session *session, - struct xio_new_session_req *req, - void *cb_user_context) -{ - if (shutdown_called) { - return xio_reject( - session, XIO_E_SESSION_REFUSED, NULL /* udata */, 0 /* udata len */); - } - int code = portals.accept(session, req, cb_user_context); - if (! code) - nsessions++; - return code; -} /* new_session */ - -int XioMessenger::session_event(struct xio_session *session, - struct xio_session_event_data *event_data, - void *cb_user_context) -{ - XioConnection *xcon; - - switch (event_data->event) { - case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT: - { - struct xio_connection *conn = event_data->conn; - struct xio_connection_attr xcona; - entity_addr_t peer_addr_for_me, paddr; - - xcon = static_cast(event_data->conn_user_context); - - ldout(cct,2) << "connection established " << event_data->conn - << " session " << session << " xcon " << xcon << dendl; - - (void) xio_query_connection(conn, &xcona, - XIO_CONNECTION_ATTR_LOCAL_ADDR| - XIO_CONNECTION_ATTR_PEER_ADDR); - peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr); - paddr.set_sockaddr((struct sockaddr *)&xcona.peer_addr); - //set_myaddr(peer_addr_for_me); - learned_addr(peer_addr_for_me); - ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl; - - /* notify hook */ - this->ms_deliver_handle_connect(xcon); - this->ms_deliver_handle_fast_connect(xcon); - } - break; - - case XIO_SESSION_NEW_CONNECTION_EVENT: - { - struct xio_connection *conn = event_data->conn; - struct xio_connection_attr xcona; - entity_inst_t s_inst; - entity_addr_t peer_addr_for_me; - - (void) xio_query_connection(conn, &xcona, - XIO_CONNECTION_ATTR_CTX| - XIO_CONNECTION_ATTR_PEER_ADDR| - XIO_CONNECTION_ATTR_LOCAL_ADDR); - /* XXX assumes RDMA */ - s_inst.addr.set_sockaddr((struct sockaddr *)&xcona.peer_addr); - peer_addr_for_me.set_sockaddr((struct sockaddr *)&xcona.local_addr); - - xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst); - xcon->session = session; - - struct xio_context_attr xctxa; - (void) xio_query_context(xcona.ctx, &xctxa, XIO_CONTEXT_ATTR_USER_CTX); - - xcon->conn = conn; - xcon->portal = static_cast(xctxa.user_context); - assert(xcon->portal); - - xcona.user_context = xcon; - (void) xio_modify_connection(conn, &xcona, XIO_CONNECTION_ATTR_USER_CTX); - - xcon->connected = true; - - /* sentinel ref */ - xcon->get(); /* xcon->nref == 1 */ - conns_sp.lock(); - conns_list.push_back(*xcon); - /* XXX we can't put xcon in conns_entity_map becase we don't yet know - * it's peer address */ - conns_sp.unlock(); - - /* XXXX pre-merge of session startup negotiation ONLY! */ - xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE); - - ldout(cct,2) << "New connection session " << session - << " xcon " << xcon << " on msgr: " << this << " portal: " << xcon->portal << dendl; - ldout(cct,2) << "Server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl; - } - break; - case XIO_SESSION_CONNECTION_ERROR_EVENT: - case XIO_SESSION_CONNECTION_CLOSED_EVENT: /* orderly discon */ - case XIO_SESSION_CONNECTION_DISCONNECTED_EVENT: /* unexpected discon */ - case XIO_SESSION_CONNECTION_REFUSED_EVENT: - xcon = static_cast(event_data->conn_user_context); - ldout(cct,2) << xio_session_event_str(event_data->event) - << " xcon " << xcon << " session " << session << dendl; - if (likely(!!xcon)) { - unregister_xcon(xcon); - xcon->on_disconnect_event(); - } - break; - case XIO_SESSION_CONNECTION_TEARDOWN_EVENT: - xcon = static_cast(event_data->conn_user_context); - ldout(cct,2) << xio_session_event_str(event_data->event) - << " xcon " << xcon << " session " << session << dendl; - /* - * There are flows where Accelio sends teardown event without going - * through disconnect event. so we make sure we cleaned the connection. - */ - unregister_xcon(xcon); - xcon->on_teardown_event(); - break; - case XIO_SESSION_TEARDOWN_EVENT: - ldout(cct,2) << xio_session_event_str(event_data->event) - << " session " << session << dendl; - if (unlikely(XioPool::trace_mempool)) { - xp_stats.dump("xio session dtor", reinterpret_cast(session)); - } - xio_session_destroy(session); - if (--nsessions == 0) { - Mutex::Locker lck(sh_mtx); - if (nsessions == 0) - sh_cond.Signal(); - } - break; - default: - break; - }; - - return 0; -} - -enum bl_type -{ - BUFFER_PAYLOAD, - BUFFER_MIDDLE, - BUFFER_DATA -}; - -#define MAX_XIO_BUF_SIZE 1044480 - -static inline int -xio_count_buffers(const buffer::list& bl, int& req_size, int& msg_off, int& req_off) -{ - - const std::list& buffers = bl.buffers(); - list::const_iterator pb; - size_t size, off; - int result; - int first = 1; - - off = size = 0; - result = 0; - for (;;) { - if (off >= size) { - if (first) pb = buffers.begin(); else ++pb; - if (pb == buffers.end()) { - break; - } - off = 0; - size = pb->length(); - first = 0; - } - size_t count = size - off; - if (!count) continue; - if (req_size + count > MAX_XIO_BUF_SIZE) { - count = MAX_XIO_BUF_SIZE - req_size; - } - - ++result; - - /* advance iov and perhaps request */ - - off += count; - req_size += count; - ++msg_off; - if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) { - ++req_off; - msg_off = 0; - req_size = 0; - } - } - - return result; -} - -static inline void -xio_place_buffers(const buffer::list& bl, XioMsg *xmsg, struct xio_msg*& req, - struct xio_iovec_ex*& msg_iov, int& req_size, - int ex_cnt, int& msg_off, int& req_off, bl_type type) -{ - - const std::list& buffers = bl.buffers(); - list::const_iterator pb; - struct xio_iovec_ex* iov; - size_t size, off; - const char *data = NULL; - int first = 1; - - off = size = 0; - for (;;) { - if (off >= size) { - if (first) pb = buffers.begin(); else ++pb; - if (pb == buffers.end()) { - break; - } - off = 0; - size = pb->length(); - data = pb->c_str(); // is c_str() efficient? - first = 0; - } - size_t count = size - off; - if (!count) continue; - if (req_size + count > MAX_XIO_BUF_SIZE) { - count = MAX_XIO_BUF_SIZE - req_size; - } - - /* assign buffer */ - iov = &msg_iov[msg_off]; - iov->iov_base = (void *) (&data[off]); - iov->iov_len = count; - - switch (type) { - case BUFFER_DATA: - //break; - default: - { - struct xio_reg_mem *mp = get_xio_mp(*pb); - iov->mr = (mp) ? mp->mr : NULL; - } - break; - } - - /* advance iov(s) */ - - off += count; - req_size += count; - ++msg_off; - - /* next request if necessary */ - - if (unlikely(msg_off >= XIO_MSGR_IOVLEN || req_size >= MAX_XIO_BUF_SIZE)) { - /* finish this request */ - req->out.pdata_iov.nents = msg_off; - /* advance to next, and write in it if it's not the last one. */ - if (++req_off >= ex_cnt) { - req = 0; /* poison. trap if we try to use it. */ - msg_iov = NULL; - } else { - req = &xmsg->req_arr[req_off].msg; - msg_iov = req->out.pdata_iov.sglist; - } - msg_off = 0; - req_size = 0; - } - } -} - -int XioMessenger::bind(const entity_addr_t& addr) -{ - if (addr.is_blank_ip()) { - lderr(cct) << "ERROR: need rdma ip for remote use! " << dendl; - cout << "Error: xio bind failed. public/cluster ip not specified" << std::endl; - return -1; - } - - entity_addr_t shift_addr = addr; - string base_uri = xio_uri_from_entity(cct->_conf->xio_transport_type, - shift_addr, false /* want_port */); - ldout(cct,4) << "XioMessenger " << this << " bind: xio_uri " - << base_uri << ':' << shift_addr.get_port() << dendl; - - uint16_t port0; - int r = portals.bind(&xio_msgr_ops, base_uri, shift_addr.get_port(), &port0); - if (r == 0) { - shift_addr.set_port(port0); - shift_addr.nonce = nonce; - set_myaddr(shift_addr); - need_addr = false; - did_bind = true; - } - return r; -} /* bind */ - -int XioMessenger::rebind(const set& avoid_ports) -{ - ldout(cct,4) << "XioMessenger " << this << " rebind attempt" << dendl; - return 0; -} /* rebind */ - -int XioMessenger::start() -{ - portals.start(); - dispatch_strategy->start(); - if (!did_bind) { - my_inst.addr.nonce = nonce; - } - started = true; - return 0; -} - -void XioMessenger::wait() -{ - portals.join(); - dispatch_strategy->wait(); -} /* wait */ - -int XioMessenger::_send_message(Message *m, const entity_inst_t& dest) -{ - ConnectionRef conn = get_connection(dest); - if (conn) - return _send_message(m, &(*conn)); - else - return EINVAL; -} /* send_message(Message *, const entity_inst_t&) */ - -static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon, - int ex_cnt) -{ - struct xio_reg_mem mp_mem; - int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioMsg), &mp_mem); - if (!!e) - return NULL; - XioMsg *xmsg = reinterpret_cast(mp_mem.addr); - assert(!!xmsg); - new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL); - return xmsg; -} - -XioCommand* pool_alloc_xio_command(XioConnection *xcon) -{ - struct xio_reg_mem mp_mem; - int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem); - if (!!e) - return NULL; - XioCommand *xcmd = reinterpret_cast(mp_mem.addr); - assert(!!xcmd); - new (xcmd) XioCommand(xcon, mp_mem); - return xcmd; -} - -int XioMessenger::_send_message(Message *m, Connection *con) -{ - if (con == loop_con.get() /* intrusive_ptr get() */) { - m->set_connection(con); - m->set_src(get_myinst().name); - m->set_seq(loop_con->next_seq()); - ds_dispatch(m); - return 0; - } - - XioConnection *xcon = static_cast(con); - - /* If con is not in READY state, we have to enforce policy */ - if (xcon->cstate.session_state.read() != XioConnection::UP) { - pthread_spin_lock(&xcon->sp); - if (xcon->cstate.session_state.read() != XioConnection::UP) { - xcon->outgoing.mqueue.push_back(*m); - pthread_spin_unlock(&xcon->sp); - return 0; - } - pthread_spin_unlock(&xcon->sp); - } - - return _send_message_impl(m, xcon); -} /* send_message(Message* m, Connection *con) */ - -int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon) -{ - int code = 0; - - Mutex::Locker l(xcon->lock); - if (unlikely(XioPool::trace_mempool)) { - static uint32_t nreqs; - if (unlikely((++nreqs % 65536) == 0)) { - xp_stats.dump(__func__, nreqs); - } - } - - m->set_seq(xcon->state.next_out_seq()); - m->set_magic(magic); // trace flags and special handling - - m->encode(xcon->get_features(), this->crcflags); - - buffer::list &payload = m->get_payload(); - buffer::list &middle = m->get_middle(); - buffer::list &data = m->get_data(); - - int msg_off = 0; - int req_off = 0; - int req_size = 0; - int nbuffers = - xio_count_buffers(payload, req_size, msg_off, req_off) + - xio_count_buffers(middle, req_size, msg_off, req_off) + - xio_count_buffers(data, req_size, msg_off, req_off); - - int ex_cnt = req_off; - if (msg_off == 0 && ex_cnt > 0) { - // no buffers for last msg - ldout(cct,10) << "msg_off 0, ex_cnt " << ex_cnt << " -> " << ex_cnt-1 << dendl; - ex_cnt--; - } - - /* get an XioMsg frame */ - XioMsg *xmsg = pool_alloc_xio_msg(m, xcon, ex_cnt); - if (! xmsg) { - /* could happen if Accelio has been shutdown */ - return ENOMEM; - } - - ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg - << " tag " << (int)xmsg->hdr.tag - << " req_0 " << xmsg->get_xio_msg() << " msg type " << m->get_type() - << " features: " << xcon->get_features() - << " conn " << xcon->conn << " sess " << xcon->session << dendl; - - if (magic & (MSG_MAGIC_XIO)) { - - /* XXXX verify */ - switch (m->get_type()) { - case 43: - // case 15: - ldout(cct,4) << __func__ << "stop 43 " << m->get_type() << " " << *m << dendl; - buffer::list &payload = m->get_payload(); - ldout(cct,4) << __func__ << "payload dump:" << dendl; - payload.hexdump(cout); - } - } - - struct xio_msg *req = xmsg->get_xio_msg(); - struct xio_iovec_ex *msg_iov = req->out.pdata_iov.sglist; - - if (magic & (MSG_MAGIC_XIO)) { - ldout(cct,4) << "payload: " << payload.buffers().size() << - " middle: " << middle.buffers().size() << - " data: " << data.buffers().size() << - dendl; - } - - if (unlikely(ex_cnt > 0)) { - ldout(cct,4) << __func__ << " buffer cnt > XIO_MSGR_IOVLEN (" << - ((XIO_MSGR_IOVLEN-1) + nbuffers) << ")" << dendl; - } - - /* do the invariant part */ - msg_off = 0; - req_off = -1; /* most often, not used */ - req_size = 0; - - xio_place_buffers(payload, xmsg, req, msg_iov, req_size, ex_cnt, msg_off, - req_off, BUFFER_PAYLOAD); - - xio_place_buffers(middle, xmsg, req, msg_iov, req_size, ex_cnt, msg_off, - req_off, BUFFER_MIDDLE); - - xio_place_buffers(data, xmsg, req, msg_iov, req_size, ex_cnt, msg_off, - req_off, BUFFER_DATA); - ldout(cct,10) << "ex_cnt " << ex_cnt << ", req_off " << req_off - << ", msg_cnt " << xmsg->get_msg_count() << dendl; - - /* finalize request */ - if (msg_off) - req->out.pdata_iov.nents = msg_off; - - /* fixup first msg */ - req = xmsg->get_xio_msg(); - - const std::list& header = xmsg->hdr.get_bl().buffers(); - assert(header.size() == 1); /* XXX */ - list::const_iterator pb = header.begin(); - req->out.header.iov_base = (char*) pb->c_str(); - req->out.header.iov_len = pb->length(); - - /* deliver via xio, preserve ordering */ - if (xmsg->get_msg_count() > 1) { - struct xio_msg *head = xmsg->get_xio_msg(); - struct xio_msg *tail = head; - for (req_off = 0; ((unsigned) req_off) < xmsg->get_msg_count()-1; ++req_off) { - req = &xmsg->req_arr[req_off].msg; -assert(!req->in.pdata_iov.nents); -assert(req->out.pdata_iov.nents || !nbuffers); - tail->next = req; - tail = req; - } - tail->next = NULL; - } - xmsg->trace = m->trace; - m->trace.event("xio portal enqueue for send"); - m->trace.keyval("xio message segments", xmsg->hdr.msg_cnt); - xcon->portal->enqueue_for_send(xcon, xmsg); - - return code; -} /* send_message(Message *, Connection *) */ - -int XioMessenger::shutdown() -{ - shutdown_called = true; - conns_sp.lock(); - XioConnection::ConnList::iterator iter; - iter = conns_list.begin(); - for (iter = conns_list.begin(); iter != conns_list.end(); ++iter) { - (void) iter->disconnect(); // XXX mark down? - } - conns_sp.unlock(); - while(nsessions > 0) { - Mutex::Locker lck(sh_mtx); - if (nsessions > 0) - sh_cond.Wait(sh_mtx); - } - portals.shutdown(); - dispatch_strategy->shutdown(); - did_bind = false; - started = false; - return 0; -} /* shutdown */ - -ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest) -{ - if (shutdown_called) - return NULL; - - const entity_inst_t& self_inst = get_myinst(); - if ((&dest == &self_inst) || - (dest == self_inst)) { - return get_loopback_connection(); - } - - conns_sp.lock(); - XioConnection::EntitySet::iterator conn_iter = - conns_entity_map.find(dest, XioConnection::EntityComp()); - if (conn_iter != conns_entity_map.end()) { - ConnectionRef cref = &(*conn_iter); - conns_sp.unlock(); - return cref; - } - else { - conns_sp.unlock(); - string xio_uri = xio_uri_from_entity(cct->_conf->xio_transport_type, - dest.addr, true /* want_port */); - - ldout(cct,4) << "XioMessenger " << this << " get_connection: xio_uri " - << xio_uri << dendl; - - /* XXX client session creation parameters */ - struct xio_session_params params = {}; - params.type = XIO_SESSION_CLIENT; - params.ses_ops = &xio_msgr_ops; - params.user_context = this; - params.uri = xio_uri.c_str(); - - XioConnection *xcon = new XioConnection(this, XioConnection::ACTIVE, - dest); - - xcon->session = xio_session_create(¶ms); - if (! xcon->session) { - delete xcon; - return NULL; - } - - /* this should cause callbacks with user context of conn, but - * we can always set it explicitly */ - struct xio_connection_params xcp = {}; - xcp.session = xcon->session; - xcp.ctx = xcon->portal->ctx; - xcp.conn_user_context = xcon; - - xcon->conn = xio_connect(&xcp); - if (!xcon->conn) { - xio_session_destroy(xcon->session); - delete xcon; - return NULL; - } - - nsessions++; - xcon->connected = true; - - /* sentinel ref */ - xcon->get(); /* xcon->nref == 1 */ - conns_sp.lock(); - conns_list.push_back(*xcon); - conns_entity_map.insert(*xcon); - conns_sp.unlock(); - - /* XXXX pre-merge of session startup negotiation ONLY! */ - xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE); - - ldout(cct,2) << "New connection xcon: " << xcon << - " up_ready on session " << xcon->session << - " on msgr: " << this << " portal: " << xcon->portal << dendl; - - return xcon->get(); /* nref +1 */ - } -} /* get_connection */ - -ConnectionRef XioMessenger::get_loopback_connection() -{ - return (loop_con.get()); -} /* get_loopback_connection */ - -void XioMessenger::unregister_xcon(XioConnection *xcon) -{ - Spinlock::Locker lckr(conns_sp); - - XioConnection::EntitySet::iterator conn_iter = - conns_entity_map.find(xcon->peer, XioConnection::EntityComp()); - if (conn_iter != conns_entity_map.end()) { - XioConnection *xcon2 = &(*conn_iter); - if (xcon == xcon2) { - conns_entity_map.erase(conn_iter); - } - } - - /* check if citer on conn_list */ - if (xcon->conns_hook.is_linked()) { - /* now find xcon on conns_list and erase */ - XioConnection::ConnList::iterator citer = - XioConnection::ConnList::s_iterator_to(*xcon); - conns_list.erase(citer); - } -} - -void XioMessenger::mark_down(const entity_addr_t& addr) -{ - entity_inst_t inst(entity_name_t(), addr); - Spinlock::Locker lckr(conns_sp); - XioConnection::EntitySet::iterator conn_iter = - conns_entity_map.find(inst, XioConnection::EntityComp()); - if (conn_iter != conns_entity_map.end()) { - (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE); - } -} /* mark_down(const entity_addr_t& */ - -void XioMessenger::mark_down(Connection* con) -{ - XioConnection *xcon = static_cast(con); - xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE); -} /* mark_down(Connection*) */ - -void XioMessenger::mark_down_all() -{ - Spinlock::Locker lckr(conns_sp); - XioConnection::EntitySet::iterator conn_iter; - for (conn_iter = conns_entity_map.begin(); conn_iter != - conns_entity_map.begin(); ++conn_iter) { - (*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE); - } -} /* mark_down_all */ - -static inline XioMarkDownHook* pool_alloc_markdown_hook( - XioConnection *xcon, Message *m) -{ - struct xio_reg_mem mp_mem; - int e = xio_mempool_alloc(xio_msgr_noreg_mpool, - sizeof(XioMarkDownHook), &mp_mem); - if (!!e) - return NULL; - XioMarkDownHook *hook = static_cast(mp_mem.addr); - new (hook) XioMarkDownHook(xcon, m, mp_mem); - return hook; -} - -void XioMessenger::mark_down_on_empty(Connection* con) -{ - XioConnection *xcon = static_cast(con); - MNop* m = new MNop(); - m->tag = XIO_NOP_TAG_MARKDOWN; - m->set_completion_hook(pool_alloc_markdown_hook(xcon, m)); - // stall new messages - xcon->cstate.session_state = XioConnection::session_states::BARRIER; - (void) _send_message_impl(m, xcon); -} - -void XioMessenger::mark_disposable(Connection *con) -{ - XioConnection *xcon = static_cast(con); - xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE); -} - -void XioMessenger::try_insert(XioConnection *xcon) -{ - Spinlock::Locker lckr(conns_sp); - /* already resident in conns_list */ - conns_entity_map.insert(*xcon); -} - -XioMessenger::~XioMessenger() -{ - delete dispatch_strategy; - nInstances--; -} /* dtor */