X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FStack.cc;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FStack.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=807db1136e39594c9a0f80c8cb5b6093fffd165d;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/Stack.cc b/src/ceph/src/msg/async/Stack.cc deleted file mode 100644 index 807db11..0000000 --- a/src/ceph/src/msg/async/Stack.cc +++ /dev/null @@ -1,216 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 XSky - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "include/compat.h" -#include "common/Cond.h" -#include "common/errno.h" -#include "PosixStack.h" -#ifdef HAVE_RDMA -#include "rdma/RDMAStack.h" -#endif -#ifdef HAVE_DPDK -#include "dpdk/DPDKStack.h" -#endif - -#include "common/dout.h" -#include "include/assert.h" - -#define dout_subsys ceph_subsys_ms -#undef dout_prefix -#define dout_prefix *_dout << "stack " - -std::function NetworkStack::add_thread(unsigned i) -{ - Worker *w = workers[i]; - return [this, w]() { - char tp_name[16]; - sprintf(tp_name, "msgr-worker-%d", w->id); - ceph_pthread_setname(pthread_self(), tp_name); - const uint64_t EventMaxWaitUs = 30000000; - w->center.set_owner(); - ldout(cct, 10) << __func__ << " starting" << dendl; - w->initialize(); - w->init_done(); - while (!w->done) { - ldout(cct, 30) << __func__ << " calling event process" << dendl; - - ceph::timespan dur; - int r = w->center.process_events(EventMaxWaitUs, &dur); - if (r < 0) { - ldout(cct, 20) << __func__ << " process events failed: " - << cpp_strerror(errno) << dendl; - // TODO do something? - } - w->perf_logger->tinc(l_msgr_running_total_time, dur); - } - w->reset(); - w->destroy(); - }; -} - -std::shared_ptr NetworkStack::create(CephContext *c, const string &t) -{ - if (t == "posix") - return std::make_shared(c, t); -#ifdef HAVE_RDMA - else if (t == "rdma") - return std::make_shared(c, t); -#endif -#ifdef HAVE_DPDK - else if (t == "dpdk") - return std::make_shared(c, t); -#endif - - lderr(c) << __func__ << " ms_async_transport_type " << t << - " is not supported! " << dendl; - ceph_abort(); - return nullptr; -} - -Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i) -{ - if (type == "posix") - return new PosixWorker(c, i); -#ifdef HAVE_RDMA - else if (type == "rdma") - return new RDMAWorker(c, i); -#endif -#ifdef HAVE_DPDK - else if (type == "dpdk") - return new DPDKWorker(c, i); -#endif - - lderr(c) << __func__ << " ms_async_transport_type " << type << - " is not supported! " << dendl; - ceph_abort(); - return nullptr; -} - -NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c) -{ - assert(cct->_conf->ms_async_op_threads > 0); - - const uint64_t InitEventNumber = 5000; - num_workers = cct->_conf->ms_async_op_threads; - if (num_workers >= EventCenter::MAX_EVENTCENTER) { - ldout(cct, 0) << __func__ << " max thread limit is " - << EventCenter::MAX_EVENTCENTER << ", switching to this now. " - << "Higher thread values are unnecessary and currently unsupported." - << dendl; - num_workers = EventCenter::MAX_EVENTCENTER; - } - - for (unsigned i = 0; i < num_workers; ++i) { - Worker *w = create_worker(cct, type, i); - w->center.init(InitEventNumber, i, type); - workers.push_back(w); - } - cct->register_fork_watcher(this); -} - -void NetworkStack::start() -{ - pool_spin.lock(); - if (started) { - pool_spin.unlock(); - return ; - } - - for (unsigned i = 0; i < num_workers; ++i) { - if (workers[i]->is_init()) - continue; - std::function thread = add_thread(i); - spawn_worker(i, std::move(thread)); - } - started = true; - pool_spin.unlock(); - - for (unsigned i = 0; i < num_workers; ++i) - workers[i]->wait_for_init(); -} - -Worker* NetworkStack::get_worker() -{ - ldout(cct, 30) << __func__ << dendl; - - // start with some reasonably large number - unsigned min_load = std::numeric_limits::max(); - Worker* current_best = nullptr; - - pool_spin.lock(); - // find worker with least references - // tempting case is returning on references == 0, but in reality - // this will happen so rarely that there's no need for special case. - for (unsigned i = 0; i < num_workers; ++i) { - unsigned worker_load = workers[i]->references.load(); - if (worker_load < min_load) { - current_best = workers[i]; - min_load = worker_load; - } - } - - pool_spin.unlock(); - assert(current_best); - ++current_best->references; - return current_best; -} - -void NetworkStack::stop() -{ - Spinlock::Locker l(pool_spin); - for (unsigned i = 0; i < num_workers; ++i) { - workers[i]->done = true; - workers[i]->center.wakeup(); - join_worker(i); - } - started = false; -} - -class C_drain : public EventCallback { - Mutex drain_lock; - Cond drain_cond; - unsigned drain_count; - - public: - explicit C_drain(size_t c) - : drain_lock("C_drain::drain_lock"), - drain_count(c) {} - void do_request(int id) override { - Mutex::Locker l(drain_lock); - drain_count--; - if (drain_count == 0) drain_cond.Signal(); - } - void wait() { - Mutex::Locker l(drain_lock); - while (drain_count) - drain_cond.Wait(drain_lock); - } -}; - -void NetworkStack::drain() -{ - ldout(cct, 30) << __func__ << " started." << dendl; - pthread_t cur = pthread_self(); - pool_spin.lock(); - C_drain drain(num_workers); - for (unsigned i = 0; i < num_workers; ++i) { - assert(cur != workers[i]->center.get_owner()); - workers[i]->center.dispatch_event_external(EventCallbackRef(&drain)); - } - pool_spin.unlock(); - drain.wait(); - ldout(cct, 30) << __func__ << " end." << dendl; -}