1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
25 #include <boost/lexical_cast.hpp>
26 #include "msg/SimplePolicyMessenger.h"
27 #include "XioConnection.h"
30 #include "include/assert.h"
31 #include "common/dout.h"
33 #ifndef CACHE_LINE_SIZE
34 #define CACHE_LINE_SIZE 64 /* XXX arch-specific define */
36 #define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE]
38 class XioPortal : public Thread
44 const static int nlanes = 7;
50 pthread_spinlock_t sp;
56 int ix; /* atomicity by portal thread */
63 for (ix = 0; ix < nlanes; ++ix) {
65 pthread_spin_init(&lane->sp, PTHREAD_PROCESS_PRIVATE);
70 inline Lane* get_lane(XioConnection *xcon)
72 return &qlane[(((uint64_t) xcon) / 16) % nlanes];
75 void enq(XioConnection *xcon, XioSubmit* xs)
77 Lane* lane = get_lane(xcon);
78 pthread_spin_lock(&lane->sp);
79 lane->q.push_back(*xs);
81 pthread_spin_unlock(&lane->sp);
84 void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q)
86 int size = requeue_q.size();
87 Lane* lane = get_lane(xcon);
88 pthread_spin_lock(&lane->sp);
89 XioSubmit::Queue::const_iterator i1 = lane->q.end();
90 lane->q.splice(i1, requeue_q);
92 pthread_spin_unlock(&lane->sp);
95 void deq(XioSubmit::Queue& send_q)
99 for (cnt = 0; cnt < nlanes; ++cnt, ++ix, ix = ix % nlanes) {
101 pthread_spin_lock(&lane->sp);
102 if (lane->size > 0) {
103 XioSubmit::Queue::const_iterator i1 = send_q.end();
104 send_q.splice(i1, lane->q);
106 ++ix, ix = ix % nlanes;
107 pthread_spin_unlock(&lane->sp);
110 pthread_spin_unlock(&lane->sp);
117 struct xio_context *ctx;
118 struct xio_server *server;
119 SubmitQueue submit_q;
120 pthread_spinlock_t sp;
127 uint32_t special_handling;
129 friend class XioPortals;
130 friend class XioMessenger;
133 explicit XioPortal(Messenger *_msgr, int max_conns) :
134 msgr(_msgr), ctx(NULL), server(NULL), submit_q(), xio_uri(""),
135 portal_id(NULL), _shutdown(false), drained(false),
139 pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
141 struct xio_context_params ctx_params;
142 memset(&ctx_params, 0, sizeof(ctx_params));
143 ctx_params.user_context = this;
145 * hint to Accelio the total number of connections that will share
146 * this context's resources: internal primary task pool...
148 ctx_params.max_conns_per_ctx = max_conns;
150 /* a portal is an xio_context and event loop */
151 ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */);
152 assert(ctx && "Whoops, failed to create portal/ctx");
155 int bind(struct xio_session_ops *ops, const string &base_uri,
156 uint16_t port, uint16_t *assigned_port);
158 inline void release_xio_msg(XioCompletion* xcmp) {
159 struct xio_msg *msg = xcmp->dequeue();
160 struct xio_msg *next_msg = NULL;
162 if (unlikely(!xcmp->xcon->conn)) {
163 // NOTE: msg is not safe to dereference if the connection was torn down
164 xcmp->xcon->msg_release_fail(msg, ENOTCONN);
167 next_msg = static_cast<struct xio_msg *>(msg->user_context);
168 code = xio_release_msg(msg);
169 if (unlikely(code)) /* very unlikely, so log it */
170 xcmp->xcon->msg_release_fail(msg, code);
173 xcmp->trace.event("xio_release_msg");
174 xcmp->finalize(); /* unconditional finalize */
177 void enqueue(XioConnection *xcon, XioSubmit *xs)
180 submit_q.enq(xcon, xs);
181 xio_context_stop_loop(ctx);
187 case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
189 XioSend* xsend = static_cast<XioSend*>(xs);
190 xs->xcon->msg_send_fail(xsend, -EINVAL);
194 /* INCOMING_MSG_RELEASE */
195 release_xio_msg(static_cast<XioCompletion*>(xs));
200 void requeue(XioConnection* xcon, XioSubmit::Queue& send_q) {
201 submit_q.enq(xcon, send_q);
204 void requeue_all_xcon(XioConnection* xcon,
205 XioSubmit::Queue::iterator& q_iter,
206 XioSubmit::Queue& send_q) {
207 // XXX gather all already-dequeued outgoing messages for xcon
208 // and push them in FIFO order to front of the input queue,
209 // and mark the connection as flow-controlled
210 XioSubmit::Queue requeue_q;
212 while (q_iter != send_q.end()) {
213 XioSubmit *xs = &(*q_iter);
214 // skip retires and anything for other connections
215 if (xs->xcon != xcon) {
219 q_iter = send_q.erase(q_iter);
220 requeue_q.push_back(*xs);
222 pthread_spin_lock(&xcon->sp);
223 XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin();
224 xcon->outgoing.requeue.splice(i1, requeue_q);
225 xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED);
226 pthread_spin_unlock(&xcon->sp);
232 uint32_t xio_qdepth_high;
233 XioSubmit::Queue send_q;
234 XioSubmit::Queue::iterator q_iter;
235 struct xio_msg *msg = NULL;
241 submit_q.deq(send_q);
243 /* shutdown() barrier */
244 pthread_spin_lock(&sp);
247 size = send_q.size();
250 // XXX XioSend queues for flow-controlled connections may require
256 q_iter = send_q.begin();
257 while (q_iter != send_q.end()) {
262 case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
263 xsend = static_cast<XioSend*>(xs);
264 if (unlikely(!xcon->conn || !xcon->is_connected()))
267 /* XXX guard Accelio send queue (should be safe to rely
268 * on Accelio's check on below, but this assures that
269 * all chained xio_msg are accounted) */
270 xio_qdepth_high = xcon->xio_qdepth_high_mark();
271 if (unlikely((xcon->send_ctr + xsend->get_msg_count()) >
273 requeue_all_xcon(xcon, q_iter, send_q);
277 xs->trace.event("xio_send_msg");
278 msg = xsend->get_xio_msg();
279 code = xio_send_msg(xcon->conn, msg);
280 /* header trace moved here to capture xio serial# */
281 if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
282 xsend->print_debug(msgr->cct, "xio_send_msg");
284 /* get the right Accelio's errno code */
285 if (unlikely(code)) {
286 if ((code == -1) && (xio_errno() == -1)) {
287 /* In case XIO does not have any credits to send,
288 * it would still queue up the message(s) for transmission,
289 * but would return -1 and errno would also be set to -1.
290 * This needs to be treated as a success.
299 if (unlikely(code)) {
301 case XIO_E_TX_QUEUE_OVERFLOW:
303 requeue_all_xcon(xcon, q_iter, send_q);
308 q_iter = send_q.erase(q_iter);
309 xcon->msg_send_fail(xsend, code);
314 xcon->send.set(msg->timestamp); // need atomic?
315 xcon->send_ctr += xsend->get_msg_count(); // only inc if cb promised
319 /* INCOMING_MSG_RELEASE */
320 q_iter = send_q.erase(q_iter);
321 release_xio_msg(static_cast<XioCompletion*>(xs));
323 } /* switch (xs->type) */
324 q_iter = send_q.erase(q_iter);
328 pthread_spin_unlock(&sp);
329 xio_context_run_loop(ctx, 300);
331 } while ((!_shutdown) || (!drained));
337 xio_context_destroy(ctx);
343 pthread_spin_lock(&sp);
345 pthread_spin_unlock(&sp);
352 vector<XioPortal*> portals;
358 XioPortals(Messenger *msgr, int _n, int nconns) : p_vec(NULL), last_unused(0)
363 for (int i = 0; i < n; i++) {
365 portals[i] = new XioPortal(msgr, nconns);
366 assert(portals[i] != nullptr);
371 vector<XioPortal*>& get() { return portals; }
373 const char **get_vec()
375 return (const char **) p_vec;
378 int get_portals_len()
383 int get_last_unused()
385 int pix = last_unused;
386 if (++last_unused >= get_portals_len())
391 XioPortal* get_next_portal()
393 int pix = get_last_unused();
397 int bind(struct xio_session_ops *ops, const string& base_uri,
398 uint16_t port, uint16_t *port0);
400 int accept(struct xio_session *session,
401 struct xio_new_session_req *req,
402 void *cb_user_context)
404 const char **portals_vec = get_vec();
405 int pix = get_last_unused();
408 return xio_accept(session, NULL, 0, NULL, 0);
410 return xio_accept(session,
411 (const char **)&(portals_vec[pix]),
419 int p_ix, nportals = portals.size();
421 p_vec = new char*[nportals];
422 for (p_ix = 0; p_ix < nportals; ++p_ix) {
423 portal = portals[p_ix];
424 p_vec[p_ix] = (char*) /* portal->xio_uri.c_str() */
428 for (p_ix = 0; p_ix < nportals; ++p_ix) {
429 string thread_name = "ms_xio_";
430 thread_name.append(std::to_string(p_ix));
431 portal = portals[p_ix];
432 portal->create(thread_name.c_str());
438 int nportals = portals.size();
439 for (int p_ix = 0; p_ix < nportals; ++p_ix) {
440 XioPortal *portal = portals[p_ix];
447 int nportals = portals.size();
448 for (int p_ix = 0; p_ix < nportals; ++p_ix) {
449 XioPortal *portal = portals[p_ix];
456 int nportals = portals.size();
457 for (int ix = 0; ix < nportals; ++ix)
465 #endif /* XIO_PORTAL_H */