initial code repo
[stor4nfv.git] / src / ceph / src / msg / xio / XioPortal.h
diff --git a/src/ceph/src/msg/xio/XioPortal.h b/src/ceph/src/msg/xio/XioPortal.h
new file mode 100644 (file)
index 0000000..b3f2101
--- /dev/null
@@ -0,0 +1,465 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *s
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef XIO_PORTAL_H
+#define XIO_PORTAL_H
+
+#include <string>
+
+extern "C" {
+#include "libxio.h"
+}
+#include "XioInSeq.h"
+#include <boost/lexical_cast.hpp>
+#include "msg/SimplePolicyMessenger.h"
+#include "XioConnection.h"
+#include "XioMsg.h"
+
+#include "include/assert.h"
+#include "common/dout.h"
+
+#ifndef CACHE_LINE_SIZE
+#define CACHE_LINE_SIZE 64 /* XXX arch-specific define */
+#endif
+#define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE]
+
+class XioPortal : public Thread
+{
+private:
+
+  struct SubmitQueue
+  {
+    const static int nlanes = 7;
+
+    struct Lane
+    {
+      uint32_t size;
+      XioSubmit::Queue q;
+      pthread_spinlock_t sp;
+      CACHE_PAD(0);
+    };
+
+    Lane qlane[nlanes];
+
+    int ix; /* atomicity by portal thread */
+
+    SubmitQueue() : ix(0)
+      {
+       int ix;
+       Lane* lane;
+
+       for (ix = 0; ix < nlanes; ++ix) {
+         lane = &qlane[ix];
+         pthread_spin_init(&lane->sp, PTHREAD_PROCESS_PRIVATE);
+         lane->size = 0;
+       }
+      }
+
+    inline Lane* get_lane(XioConnection *xcon)
+      {
+       return &qlane[(((uint64_t) xcon) / 16) % nlanes];
+      }
+
+    void enq(XioConnection *xcon, XioSubmit* xs)
+      {
+       Lane* lane = get_lane(xcon);
+       pthread_spin_lock(&lane->sp);
+       lane->q.push_back(*xs);
+       ++(lane->size);
+       pthread_spin_unlock(&lane->sp);
+      }
+
+    void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q)
+      {
+       int size = requeue_q.size();
+       Lane* lane = get_lane(xcon);
+       pthread_spin_lock(&lane->sp);
+       XioSubmit::Queue::const_iterator i1 = lane->q.end();
+       lane->q.splice(i1, requeue_q);
+       lane->size += size;
+       pthread_spin_unlock(&lane->sp);
+      }
+
+    void deq(XioSubmit::Queue& send_q)
+      {
+       Lane* lane;
+       int cnt;
+       for (cnt = 0; cnt < nlanes; ++cnt, ++ix, ix = ix % nlanes) {
+         lane = &qlane[ix];
+         pthread_spin_lock(&lane->sp);
+         if (lane->size > 0) {
+           XioSubmit::Queue::const_iterator i1 = send_q.end();
+           send_q.splice(i1, lane->q);
+           lane->size = 0;
+           ++ix, ix = ix % nlanes;
+           pthread_spin_unlock(&lane->sp);
+           break;
+         }
+         pthread_spin_unlock(&lane->sp);
+       }
+      }
+
+  }; /* SubmitQueue */
+
+  Messenger *msgr;
+  struct xio_context *ctx;
+  struct xio_server *server;
+  SubmitQueue submit_q;
+  pthread_spinlock_t sp;
+  void *ev_loop;
+  string xio_uri;
+  char *portal_id;
+  bool _shutdown;
+  bool drained;
+  uint32_t magic;
+  uint32_t special_handling;
+
+  friend class XioPortals;
+  friend class XioMessenger;
+
+public:
+  explicit XioPortal(Messenger *_msgr, int max_conns) :
+    msgr(_msgr), ctx(NULL), server(NULL), submit_q(), xio_uri(""),
+    portal_id(NULL), _shutdown(false), drained(false),
+    magic(0),
+    special_handling(0)
+  {
+    pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
+
+    struct xio_context_params ctx_params;
+    memset(&ctx_params, 0, sizeof(ctx_params));
+    ctx_params.user_context = this;
+    /*
+     * hint to Accelio the total number of connections that will share
+     * this context's resources: internal primary task pool...
+     */
+    ctx_params.max_conns_per_ctx = max_conns;
+
+    /* a portal is an xio_context and event loop */
+    ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */);
+    assert(ctx && "Whoops, failed to create portal/ctx");
+  }
+
+  int bind(struct xio_session_ops *ops, const string &base_uri,
+          uint16_t port, uint16_t *assigned_port);
+
+  inline void release_xio_msg(XioCompletion* xcmp) {
+    struct xio_msg *msg = xcmp->dequeue();
+    struct xio_msg *next_msg = NULL;
+    int code;
+    if (unlikely(!xcmp->xcon->conn)) {
+      // NOTE: msg is not safe to dereference if the connection was torn down
+      xcmp->xcon->msg_release_fail(msg, ENOTCONN);
+    }
+    else while (msg) {
+      next_msg = static_cast<struct xio_msg *>(msg->user_context);
+      code = xio_release_msg(msg);
+      if (unlikely(code)) /* very unlikely, so log it */
+       xcmp->xcon->msg_release_fail(msg, code);
+      msg = next_msg;
+    }
+    xcmp->trace.event("xio_release_msg");
+    xcmp->finalize(); /* unconditional finalize */
+  }
+
+  void enqueue(XioConnection *xcon, XioSubmit *xs)
+    {
+      if (! _shutdown) {
+       submit_q.enq(xcon, xs);
+       xio_context_stop_loop(ctx);
+       return;
+      }
+
+      /* dispose xs */
+      switch(xs->type) {
+      case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
+      {
+       XioSend* xsend = static_cast<XioSend*>(xs);
+       xs->xcon->msg_send_fail(xsend, -EINVAL);
+      }
+       break;
+      default:
+       /* INCOMING_MSG_RELEASE */
+       release_xio_msg(static_cast<XioCompletion*>(xs));
+      break;
+      };
+    }
+
+  void requeue(XioConnection* xcon, XioSubmit::Queue& send_q) {
+    submit_q.enq(xcon, send_q);
+  }
+
+  void requeue_all_xcon(XioConnection* xcon,
+                       XioSubmit::Queue::iterator& q_iter,
+                       XioSubmit::Queue& send_q) {
+    // XXX gather all already-dequeued outgoing messages for xcon
+    // and push them in FIFO order to front of the input queue,
+    // and mark the connection as flow-controlled
+    XioSubmit::Queue requeue_q;
+
+    while (q_iter != send_q.end()) {
+      XioSubmit *xs = &(*q_iter);
+      // skip retires and anything for other connections
+      if (xs->xcon != xcon) {
+       q_iter++;
+       continue;
+      }
+      q_iter = send_q.erase(q_iter);
+      requeue_q.push_back(*xs);
+    }
+    pthread_spin_lock(&xcon->sp);
+    XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin();
+    xcon->outgoing.requeue.splice(i1, requeue_q);
+    xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED);
+    pthread_spin_unlock(&xcon->sp);
+  }
+
+  void *entry()
+    {
+      int size, code = 0;
+      uint32_t xio_qdepth_high;
+      XioSubmit::Queue send_q;
+      XioSubmit::Queue::iterator q_iter;
+      struct xio_msg *msg = NULL;
+      XioConnection *xcon;
+      XioSubmit *xs;
+      XioSend *xsend;
+
+      do {
+       submit_q.deq(send_q);
+
+       /* shutdown() barrier */
+       pthread_spin_lock(&sp);
+
+      restart:
+       size = send_q.size();
+
+       if (_shutdown) {
+         // XXX XioSend queues for flow-controlled connections may require
+         // cleanup
+         drained = true;
+       }
+
+       if (size > 0) {
+         q_iter = send_q.begin();
+         while (q_iter != send_q.end()) {
+           xs = &(*q_iter);
+           xcon = xs->xcon;
+
+           switch (xs->type) {
+           case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
+             xsend = static_cast<XioSend*>(xs);
+             if (unlikely(!xcon->conn || !xcon->is_connected()))
+               code = ENOTCONN;
+             else {
+               /* XXX guard Accelio send queue (should be safe to rely
+                * on Accelio's check on below, but this assures that
+                * all chained xio_msg are accounted) */
+               xio_qdepth_high = xcon->xio_qdepth_high_mark();
+               if (unlikely((xcon->send_ctr + xsend->get_msg_count()) >
+                            xio_qdepth_high)) {
+                 requeue_all_xcon(xcon, q_iter, send_q);
+                 goto restart;
+               }
+
+               xs->trace.event("xio_send_msg");
+               msg = xsend->get_xio_msg();
+               code = xio_send_msg(xcon->conn, msg);
+               /* header trace moved here to capture xio serial# */
+               if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
+                 xsend->print_debug(msgr->cct, "xio_send_msg");
+               }
+               /* get the right Accelio's errno code */
+               if (unlikely(code)) {
+                 if ((code == -1) && (xio_errno() == -1)) {
+                   /* In case XIO does not have any credits to send,
+                    * it would still queue up the message(s) for transmission,
+                    * but would return -1 and errno would also be set to -1.
+                    * This needs to be treated as a success.
+                    */
+                   code = 0;
+                 }
+                 else {
+                   code = xio_errno();
+                 }
+               }
+             } /* !ENOTCONN */
+             if (unlikely(code)) {
+               switch (code) {
+               case XIO_E_TX_QUEUE_OVERFLOW:
+               {
+                 requeue_all_xcon(xcon, q_iter, send_q);
+                 goto restart;
+               }
+                 break;
+               default:
+                 q_iter = send_q.erase(q_iter);
+                 xcon->msg_send_fail(xsend, code);
+                 continue;
+                 break;
+               };
+             } else {
+               xcon->send.set(msg->timestamp); // need atomic?
+               xcon->send_ctr += xsend->get_msg_count(); // only inc if cb promised
+             }
+             break;
+           default:
+             /* INCOMING_MSG_RELEASE */
+             q_iter = send_q.erase(q_iter);
+             release_xio_msg(static_cast<XioCompletion*>(xs));
+             continue;
+           } /* switch (xs->type) */
+           q_iter = send_q.erase(q_iter);
+         } /* while */
+       } /* size > 0 */
+
+       pthread_spin_unlock(&sp);
+       xio_context_run_loop(ctx, 300);
+
+      } while ((!_shutdown) || (!drained));
+
+      /* shutting down */
+      if (server) {
+       xio_unbind(server);
+      }
+      xio_context_destroy(ctx);
+      return NULL;
+    }
+
+  void shutdown()
+    {
+       pthread_spin_lock(&sp);
+       _shutdown = true;
+       pthread_spin_unlock(&sp);
+    }
+};
+
+class XioPortals
+{
+private:
+  vector<XioPortal*> portals;
+  char **p_vec;
+  int n;
+  int last_unused;
+
+public:
+  XioPortals(Messenger *msgr, int _n, int nconns) : p_vec(NULL), last_unused(0)
+  {
+    n = max(_n, 1);
+
+    portals.resize(n);
+    for (int i = 0; i < n; i++) {
+      if (!portals[i]) {
+        portals[i] = new XioPortal(msgr, nconns);
+        assert(portals[i] != nullptr);
+      }
+    }
+  }
+
+  vector<XioPortal*>& get() { return portals; }
+
+  const char **get_vec()
+  {
+    return (const char **) p_vec;
+  }
+
+  int get_portals_len()
+  {
+    return n;
+  }
+
+  int get_last_unused()
+  {
+    int pix = last_unused;
+    if (++last_unused >= get_portals_len())
+      last_unused = 0;
+    return pix;
+  }
+
+  XioPortal* get_next_portal()
+  {
+    int pix = get_last_unused();
+    return portals[pix];
+  }
+
+  int bind(struct xio_session_ops *ops, const string& base_uri,
+          uint16_t port, uint16_t *port0);
+
+  int accept(struct xio_session *session,
+            struct xio_new_session_req *req,
+            void *cb_user_context)
+  {
+    const char **portals_vec = get_vec();
+    int pix = get_last_unused();
+
+    if (pix == 0) {
+      return xio_accept(session, NULL, 0, NULL, 0);
+    } else {
+      return xio_accept(session,
+                       (const char **)&(portals_vec[pix]),
+                       1, NULL, 0);
+    }
+  }
+
+  void start()
+  {
+    XioPortal *portal;
+    int p_ix, nportals = portals.size();
+
+    p_vec = new char*[nportals];
+    for (p_ix = 0; p_ix < nportals; ++p_ix) {
+      portal = portals[p_ix];
+      p_vec[p_ix] = (char*) /* portal->xio_uri.c_str() */
+                       portal->portal_id;
+    }
+
+    for (p_ix = 0; p_ix < nportals; ++p_ix) {
+      string thread_name = "ms_xio_";
+      thread_name.append(std::to_string(p_ix));
+      portal = portals[p_ix];
+      portal->create(thread_name.c_str());
+    }
+  }
+
+  void shutdown()
+  {
+    int nportals = portals.size();
+    for (int p_ix = 0; p_ix < nportals; ++p_ix) {
+      XioPortal *portal = portals[p_ix];
+      portal->shutdown();
+    }
+  }
+
+  void join()
+  {
+    int nportals = portals.size();
+    for (int p_ix = 0; p_ix < nportals; ++p_ix) {
+      XioPortal *portal = portals[p_ix];
+      portal->join();
+    }
+  }
+
+  ~XioPortals()
+  {
+    int nportals = portals.size();
+    for (int ix = 0; ix < nportals; ++ix)
+      delete(portals[ix]);
+    portals.clear();
+    if (p_vec)
+      delete[] p_vec;
+  }
+};
+
+#endif /* XIO_PORTAL_H */