initial code repo
[stor4nfv.git] / src / ceph / src / msg / async / Stack.cc
diff --git a/src/ceph/src/msg/async/Stack.cc b/src/ceph/src/msg/async/Stack.cc
new file mode 100644 (file)
index 0000000..807db11
--- /dev/null
@@ -0,0 +1,216 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "include/compat.h"
+#include "common/Cond.h"
+#include "common/errno.h"
+#include "PosixStack.h"
+#ifdef HAVE_RDMA
+#include "rdma/RDMAStack.h"
+#endif
+#ifdef HAVE_DPDK
+#include "dpdk/DPDKStack.h"
+#endif
+
+#include "common/dout.h"
+#include "include/assert.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << "stack "
+
+std::function<void ()> NetworkStack::add_thread(unsigned i)
+{
+  Worker *w = workers[i];
+  return [this, w]() {
+      char tp_name[16];
+      sprintf(tp_name, "msgr-worker-%d", w->id);
+      ceph_pthread_setname(pthread_self(), tp_name);
+      const uint64_t EventMaxWaitUs = 30000000;
+      w->center.set_owner();
+      ldout(cct, 10) << __func__ << " starting" << dendl;
+      w->initialize();
+      w->init_done();
+      while (!w->done) {
+        ldout(cct, 30) << __func__ << " calling event process" << dendl;
+
+        ceph::timespan dur;
+        int r = w->center.process_events(EventMaxWaitUs, &dur);
+        if (r < 0) {
+          ldout(cct, 20) << __func__ << " process events failed: "
+                         << cpp_strerror(errno) << dendl;
+          // TODO do something?
+        }
+        w->perf_logger->tinc(l_msgr_running_total_time, dur);
+      }
+      w->reset();
+      w->destroy();
+  };
+}
+
+std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string &t)
+{
+  if (t == "posix")
+    return std::make_shared<PosixNetworkStack>(c, t);
+#ifdef HAVE_RDMA
+  else if (t == "rdma")
+    return std::make_shared<RDMAStack>(c, t);
+#endif
+#ifdef HAVE_DPDK
+  else if (t == "dpdk")
+    return std::make_shared<DPDKStack>(c, t);
+#endif
+
+  lderr(c) << __func__ << " ms_async_transport_type " << t <<
+    " is not supported! " << dendl;
+  ceph_abort();
+  return nullptr;
+}
+
+Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i)
+{
+  if (type == "posix")
+    return new PosixWorker(c, i);
+#ifdef HAVE_RDMA
+  else if (type == "rdma")
+    return new RDMAWorker(c, i);
+#endif
+#ifdef HAVE_DPDK
+  else if (type == "dpdk")
+    return new DPDKWorker(c, i);
+#endif
+
+  lderr(c) << __func__ << " ms_async_transport_type " << type <<
+    " is not supported! " << dendl;
+  ceph_abort();
+  return nullptr;
+}
+
+NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
+{
+  assert(cct->_conf->ms_async_op_threads > 0);
+
+  const uint64_t InitEventNumber = 5000;
+  num_workers = cct->_conf->ms_async_op_threads;
+  if (num_workers >= EventCenter::MAX_EVENTCENTER) {
+    ldout(cct, 0) << __func__ << " max thread limit is "
+                  << EventCenter::MAX_EVENTCENTER << ", switching to this now. "
+                  << "Higher thread values are unnecessary and currently unsupported."
+                  << dendl;
+    num_workers = EventCenter::MAX_EVENTCENTER;
+  }
+
+  for (unsigned i = 0; i < num_workers; ++i) {
+    Worker *w = create_worker(cct, type, i);
+    w->center.init(InitEventNumber, i, type);
+    workers.push_back(w);
+  }
+  cct->register_fork_watcher(this);
+}
+
+void NetworkStack::start()
+{
+  pool_spin.lock();
+  if (started) {
+    pool_spin.unlock();
+    return ;
+  }
+
+  for (unsigned i = 0; i < num_workers; ++i) {
+    if (workers[i]->is_init())
+      continue;
+    std::function<void ()> thread = add_thread(i);
+    spawn_worker(i, std::move(thread));
+  }
+  started = true;
+  pool_spin.unlock();
+
+  for (unsigned i = 0; i < num_workers; ++i)
+    workers[i]->wait_for_init();
+}
+
+Worker* NetworkStack::get_worker()
+{
+  ldout(cct, 30) << __func__ << dendl;
+
+   // start with some reasonably large number
+  unsigned min_load = std::numeric_limits<int>::max();
+  Worker* current_best = nullptr;
+
+  pool_spin.lock();
+  // find worker with least references
+  // tempting case is returning on references == 0, but in reality
+  // this will happen so rarely that there's no need for special case.
+  for (unsigned i = 0; i < num_workers; ++i) {
+    unsigned worker_load = workers[i]->references.load();
+    if (worker_load < min_load) {
+      current_best = workers[i];
+      min_load = worker_load;
+    }
+  }
+
+  pool_spin.unlock();
+  assert(current_best);
+  ++current_best->references;
+  return current_best;
+}
+
+void NetworkStack::stop()
+{
+  Spinlock::Locker l(pool_spin);
+  for (unsigned i = 0; i < num_workers; ++i) {
+    workers[i]->done = true;
+    workers[i]->center.wakeup();
+    join_worker(i);
+  }
+  started = false;
+}
+
+class C_drain : public EventCallback {
+  Mutex drain_lock;
+  Cond drain_cond;
+  unsigned drain_count;
+
+ public:
+  explicit C_drain(size_t c)
+      : drain_lock("C_drain::drain_lock"),
+        drain_count(c) {}
+  void do_request(int id) override {
+    Mutex::Locker l(drain_lock);
+    drain_count--;
+    if (drain_count == 0) drain_cond.Signal();
+  }
+  void wait() {
+    Mutex::Locker l(drain_lock);
+    while (drain_count)
+      drain_cond.Wait(drain_lock);
+  }
+};
+
+void NetworkStack::drain()
+{
+  ldout(cct, 30) << __func__ << " started." << dendl;
+  pthread_t cur = pthread_self();
+  pool_spin.lock();
+  C_drain drain(num_workers);
+  for (unsigned i = 0; i < num_workers; ++i) {
+    assert(cur != workers[i]->center.get_owner());
+    workers[i]->center.dispatch_event_external(EventCallbackRef(&drain));
+  }
+  pool_spin.unlock();
+  drain.wait();
+  ldout(cct, 30) << __func__ << " end." << dendl;
+}