remove ceph code
[stor4nfv.git] / src / ceph / src / msg / xio / XioPortal.h
diff --git a/src/ceph/src/msg/xio/XioPortal.h b/src/ceph/src/msg/xio/XioPortal.h
deleted file mode 100644 (file)
index b3f2101..0000000
+++ /dev/null
@@ -1,465 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- * Portions Copyright (C) 2013 CohortFS, LLC
- *s
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#ifndef XIO_PORTAL_H
-#define XIO_PORTAL_H
-
-#include <string>
-
-extern "C" {
-#include "libxio.h"
-}
-#include "XioInSeq.h"
-#include <boost/lexical_cast.hpp>
-#include "msg/SimplePolicyMessenger.h"
-#include "XioConnection.h"
-#include "XioMsg.h"
-
-#include "include/assert.h"
-#include "common/dout.h"
-
-#ifndef CACHE_LINE_SIZE
-#define CACHE_LINE_SIZE 64 /* XXX arch-specific define */
-#endif
-#define CACHE_PAD(_n) char __pad ## _n [CACHE_LINE_SIZE]
-
-class XioPortal : public Thread
-{
-private:
-
-  struct SubmitQueue
-  {
-    const static int nlanes = 7;
-
-    struct Lane
-    {
-      uint32_t size;
-      XioSubmit::Queue q;
-      pthread_spinlock_t sp;
-      CACHE_PAD(0);
-    };
-
-    Lane qlane[nlanes];
-
-    int ix; /* atomicity by portal thread */
-
-    SubmitQueue() : ix(0)
-      {
-       int ix;
-       Lane* lane;
-
-       for (ix = 0; ix < nlanes; ++ix) {
-         lane = &qlane[ix];
-         pthread_spin_init(&lane->sp, PTHREAD_PROCESS_PRIVATE);
-         lane->size = 0;
-       }
-      }
-
-    inline Lane* get_lane(XioConnection *xcon)
-      {
-       return &qlane[(((uint64_t) xcon) / 16) % nlanes];
-      }
-
-    void enq(XioConnection *xcon, XioSubmit* xs)
-      {
-       Lane* lane = get_lane(xcon);
-       pthread_spin_lock(&lane->sp);
-       lane->q.push_back(*xs);
-       ++(lane->size);
-       pthread_spin_unlock(&lane->sp);
-      }
-
-    void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q)
-      {
-       int size = requeue_q.size();
-       Lane* lane = get_lane(xcon);
-       pthread_spin_lock(&lane->sp);
-       XioSubmit::Queue::const_iterator i1 = lane->q.end();
-       lane->q.splice(i1, requeue_q);
-       lane->size += size;
-       pthread_spin_unlock(&lane->sp);
-      }
-
-    void deq(XioSubmit::Queue& send_q)
-      {
-       Lane* lane;
-       int cnt;
-       for (cnt = 0; cnt < nlanes; ++cnt, ++ix, ix = ix % nlanes) {
-         lane = &qlane[ix];
-         pthread_spin_lock(&lane->sp);
-         if (lane->size > 0) {
-           XioSubmit::Queue::const_iterator i1 = send_q.end();
-           send_q.splice(i1, lane->q);
-           lane->size = 0;
-           ++ix, ix = ix % nlanes;
-           pthread_spin_unlock(&lane->sp);
-           break;
-         }
-         pthread_spin_unlock(&lane->sp);
-       }
-      }
-
-  }; /* SubmitQueue */
-
-  Messenger *msgr;
-  struct xio_context *ctx;
-  struct xio_server *server;
-  SubmitQueue submit_q;
-  pthread_spinlock_t sp;
-  void *ev_loop;
-  string xio_uri;
-  char *portal_id;
-  bool _shutdown;
-  bool drained;
-  uint32_t magic;
-  uint32_t special_handling;
-
-  friend class XioPortals;
-  friend class XioMessenger;
-
-public:
-  explicit XioPortal(Messenger *_msgr, int max_conns) :
-    msgr(_msgr), ctx(NULL), server(NULL), submit_q(), xio_uri(""),
-    portal_id(NULL), _shutdown(false), drained(false),
-    magic(0),
-    special_handling(0)
-  {
-    pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
-
-    struct xio_context_params ctx_params;
-    memset(&ctx_params, 0, sizeof(ctx_params));
-    ctx_params.user_context = this;
-    /*
-     * hint to Accelio the total number of connections that will share
-     * this context's resources: internal primary task pool...
-     */
-    ctx_params.max_conns_per_ctx = max_conns;
-
-    /* a portal is an xio_context and event loop */
-    ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */);
-    assert(ctx && "Whoops, failed to create portal/ctx");
-  }
-
-  int bind(struct xio_session_ops *ops, const string &base_uri,
-          uint16_t port, uint16_t *assigned_port);
-
-  inline void release_xio_msg(XioCompletion* xcmp) {
-    struct xio_msg *msg = xcmp->dequeue();
-    struct xio_msg *next_msg = NULL;
-    int code;
-    if (unlikely(!xcmp->xcon->conn)) {
-      // NOTE: msg is not safe to dereference if the connection was torn down
-      xcmp->xcon->msg_release_fail(msg, ENOTCONN);
-    }
-    else while (msg) {
-      next_msg = static_cast<struct xio_msg *>(msg->user_context);
-      code = xio_release_msg(msg);
-      if (unlikely(code)) /* very unlikely, so log it */
-       xcmp->xcon->msg_release_fail(msg, code);
-      msg = next_msg;
-    }
-    xcmp->trace.event("xio_release_msg");
-    xcmp->finalize(); /* unconditional finalize */
-  }
-
-  void enqueue(XioConnection *xcon, XioSubmit *xs)
-    {
-      if (! _shutdown) {
-       submit_q.enq(xcon, xs);
-       xio_context_stop_loop(ctx);
-       return;
-      }
-
-      /* dispose xs */
-      switch(xs->type) {
-      case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
-      {
-       XioSend* xsend = static_cast<XioSend*>(xs);
-       xs->xcon->msg_send_fail(xsend, -EINVAL);
-      }
-       break;
-      default:
-       /* INCOMING_MSG_RELEASE */
-       release_xio_msg(static_cast<XioCompletion*>(xs));
-      break;
-      };
-    }
-
-  void requeue(XioConnection* xcon, XioSubmit::Queue& send_q) {
-    submit_q.enq(xcon, send_q);
-  }
-
-  void requeue_all_xcon(XioConnection* xcon,
-                       XioSubmit::Queue::iterator& q_iter,
-                       XioSubmit::Queue& send_q) {
-    // XXX gather all already-dequeued outgoing messages for xcon
-    // and push them in FIFO order to front of the input queue,
-    // and mark the connection as flow-controlled
-    XioSubmit::Queue requeue_q;
-
-    while (q_iter != send_q.end()) {
-      XioSubmit *xs = &(*q_iter);
-      // skip retires and anything for other connections
-      if (xs->xcon != xcon) {
-       q_iter++;
-       continue;
-      }
-      q_iter = send_q.erase(q_iter);
-      requeue_q.push_back(*xs);
-    }
-    pthread_spin_lock(&xcon->sp);
-    XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin();
-    xcon->outgoing.requeue.splice(i1, requeue_q);
-    xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED);
-    pthread_spin_unlock(&xcon->sp);
-  }
-
-  void *entry()
-    {
-      int size, code = 0;
-      uint32_t xio_qdepth_high;
-      XioSubmit::Queue send_q;
-      XioSubmit::Queue::iterator q_iter;
-      struct xio_msg *msg = NULL;
-      XioConnection *xcon;
-      XioSubmit *xs;
-      XioSend *xsend;
-
-      do {
-       submit_q.deq(send_q);
-
-       /* shutdown() barrier */
-       pthread_spin_lock(&sp);
-
-      restart:
-       size = send_q.size();
-
-       if (_shutdown) {
-         // XXX XioSend queues for flow-controlled connections may require
-         // cleanup
-         drained = true;
-       }
-
-       if (size > 0) {
-         q_iter = send_q.begin();
-         while (q_iter != send_q.end()) {
-           xs = &(*q_iter);
-           xcon = xs->xcon;
-
-           switch (xs->type) {
-           case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
-             xsend = static_cast<XioSend*>(xs);
-             if (unlikely(!xcon->conn || !xcon->is_connected()))
-               code = ENOTCONN;
-             else {
-               /* XXX guard Accelio send queue (should be safe to rely
-                * on Accelio's check on below, but this assures that
-                * all chained xio_msg are accounted) */
-               xio_qdepth_high = xcon->xio_qdepth_high_mark();
-               if (unlikely((xcon->send_ctr + xsend->get_msg_count()) >
-                            xio_qdepth_high)) {
-                 requeue_all_xcon(xcon, q_iter, send_q);
-                 goto restart;
-               }
-
-               xs->trace.event("xio_send_msg");
-               msg = xsend->get_xio_msg();
-               code = xio_send_msg(xcon->conn, msg);
-               /* header trace moved here to capture xio serial# */
-               if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
-                 xsend->print_debug(msgr->cct, "xio_send_msg");
-               }
-               /* get the right Accelio's errno code */
-               if (unlikely(code)) {
-                 if ((code == -1) && (xio_errno() == -1)) {
-                   /* In case XIO does not have any credits to send,
-                    * it would still queue up the message(s) for transmission,
-                    * but would return -1 and errno would also be set to -1.
-                    * This needs to be treated as a success.
-                    */
-                   code = 0;
-                 }
-                 else {
-                   code = xio_errno();
-                 }
-               }
-             } /* !ENOTCONN */
-             if (unlikely(code)) {
-               switch (code) {
-               case XIO_E_TX_QUEUE_OVERFLOW:
-               {
-                 requeue_all_xcon(xcon, q_iter, send_q);
-                 goto restart;
-               }
-                 break;
-               default:
-                 q_iter = send_q.erase(q_iter);
-                 xcon->msg_send_fail(xsend, code);
-                 continue;
-                 break;
-               };
-             } else {
-               xcon->send.set(msg->timestamp); // need atomic?
-               xcon->send_ctr += xsend->get_msg_count(); // only inc if cb promised
-             }
-             break;
-           default:
-             /* INCOMING_MSG_RELEASE */
-             q_iter = send_q.erase(q_iter);
-             release_xio_msg(static_cast<XioCompletion*>(xs));
-             continue;
-           } /* switch (xs->type) */
-           q_iter = send_q.erase(q_iter);
-         } /* while */
-       } /* size > 0 */
-
-       pthread_spin_unlock(&sp);
-       xio_context_run_loop(ctx, 300);
-
-      } while ((!_shutdown) || (!drained));
-
-      /* shutting down */
-      if (server) {
-       xio_unbind(server);
-      }
-      xio_context_destroy(ctx);
-      return NULL;
-    }
-
-  void shutdown()
-    {
-       pthread_spin_lock(&sp);
-       _shutdown = true;
-       pthread_spin_unlock(&sp);
-    }
-};
-
-class XioPortals
-{
-private:
-  vector<XioPortal*> portals;
-  char **p_vec;
-  int n;
-  int last_unused;
-
-public:
-  XioPortals(Messenger *msgr, int _n, int nconns) : p_vec(NULL), last_unused(0)
-  {
-    n = max(_n, 1);
-
-    portals.resize(n);
-    for (int i = 0; i < n; i++) {
-      if (!portals[i]) {
-        portals[i] = new XioPortal(msgr, nconns);
-        assert(portals[i] != nullptr);
-      }
-    }
-  }
-
-  vector<XioPortal*>& get() { return portals; }
-
-  const char **get_vec()
-  {
-    return (const char **) p_vec;
-  }
-
-  int get_portals_len()
-  {
-    return n;
-  }
-
-  int get_last_unused()
-  {
-    int pix = last_unused;
-    if (++last_unused >= get_portals_len())
-      last_unused = 0;
-    return pix;
-  }
-
-  XioPortal* get_next_portal()
-  {
-    int pix = get_last_unused();
-    return portals[pix];
-  }
-
-  int bind(struct xio_session_ops *ops, const string& base_uri,
-          uint16_t port, uint16_t *port0);
-
-  int accept(struct xio_session *session,
-            struct xio_new_session_req *req,
-            void *cb_user_context)
-  {
-    const char **portals_vec = get_vec();
-    int pix = get_last_unused();
-
-    if (pix == 0) {
-      return xio_accept(session, NULL, 0, NULL, 0);
-    } else {
-      return xio_accept(session,
-                       (const char **)&(portals_vec[pix]),
-                       1, NULL, 0);
-    }
-  }
-
-  void start()
-  {
-    XioPortal *portal;
-    int p_ix, nportals = portals.size();
-
-    p_vec = new char*[nportals];
-    for (p_ix = 0; p_ix < nportals; ++p_ix) {
-      portal = portals[p_ix];
-      p_vec[p_ix] = (char*) /* portal->xio_uri.c_str() */
-                       portal->portal_id;
-    }
-
-    for (p_ix = 0; p_ix < nportals; ++p_ix) {
-      string thread_name = "ms_xio_";
-      thread_name.append(std::to_string(p_ix));
-      portal = portals[p_ix];
-      portal->create(thread_name.c_str());
-    }
-  }
-
-  void shutdown()
-  {
-    int nportals = portals.size();
-    for (int p_ix = 0; p_ix < nportals; ++p_ix) {
-      XioPortal *portal = portals[p_ix];
-      portal->shutdown();
-    }
-  }
-
-  void join()
-  {
-    int nportals = portals.size();
-    for (int p_ix = 0; p_ix < nportals; ++p_ix) {
-      XioPortal *portal = portals[p_ix];
-      portal->join();
-    }
-  }
-
-  ~XioPortals()
-  {
-    int nportals = portals.size();
-    for (int ix = 0; ix < nportals; ++ix)
-      delete(portals[ix]);
-    portals.clear();
-    if (p_vec)
-      delete[] p_vec;
-  }
-};
-
-#endif /* XIO_PORTAL_H */