1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSky <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include "include/compat.h"
18 #include "common/Cond.h"
19 #include "common/errno.h"
20 #include "PosixStack.h"
22 #include "rdma/RDMAStack.h"
25 #include "dpdk/DPDKStack.h"
28 #include "common/dout.h"
29 #include "include/assert.h"
31 #define dout_subsys ceph_subsys_ms
33 #define dout_prefix *_dout << "stack "
35 std::function<void ()> NetworkStack::add_thread(unsigned i)
37 Worker *w = workers[i];
40 sprintf(tp_name, "msgr-worker-%d", w->id);
41 ceph_pthread_setname(pthread_self(), tp_name);
42 const uint64_t EventMaxWaitUs = 30000000;
43 w->center.set_owner();
44 ldout(cct, 10) << __func__ << " starting" << dendl;
48 ldout(cct, 30) << __func__ << " calling event process" << dendl;
51 int r = w->center.process_events(EventMaxWaitUs, &dur);
53 ldout(cct, 20) << __func__ << " process events failed: "
54 << cpp_strerror(errno) << dendl;
57 w->perf_logger->tinc(l_msgr_running_total_time, dur);
64 std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string &t)
67 return std::make_shared<PosixNetworkStack>(c, t);
70 return std::make_shared<RDMAStack>(c, t);
74 return std::make_shared<DPDKStack>(c, t);
77 lderr(c) << __func__ << " ms_async_transport_type " << t <<
78 " is not supported! " << dendl;
83 Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i)
86 return new PosixWorker(c, i);
88 else if (type == "rdma")
89 return new RDMAWorker(c, i);
92 else if (type == "dpdk")
93 return new DPDKWorker(c, i);
96 lderr(c) << __func__ << " ms_async_transport_type " << type <<
97 " is not supported! " << dendl;
102 NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
104 assert(cct->_conf->ms_async_op_threads > 0);
106 const uint64_t InitEventNumber = 5000;
107 num_workers = cct->_conf->ms_async_op_threads;
108 if (num_workers >= EventCenter::MAX_EVENTCENTER) {
109 ldout(cct, 0) << __func__ << " max thread limit is "
110 << EventCenter::MAX_EVENTCENTER << ", switching to this now. "
111 << "Higher thread values are unnecessary and currently unsupported."
113 num_workers = EventCenter::MAX_EVENTCENTER;
116 for (unsigned i = 0; i < num_workers; ++i) {
117 Worker *w = create_worker(cct, type, i);
118 w->center.init(InitEventNumber, i, type);
119 workers.push_back(w);
121 cct->register_fork_watcher(this);
124 void NetworkStack::start()
132 for (unsigned i = 0; i < num_workers; ++i) {
133 if (workers[i]->is_init())
135 std::function<void ()> thread = add_thread(i);
136 spawn_worker(i, std::move(thread));
141 for (unsigned i = 0; i < num_workers; ++i)
142 workers[i]->wait_for_init();
145 Worker* NetworkStack::get_worker()
147 ldout(cct, 30) << __func__ << dendl;
149 // start with some reasonably large number
150 unsigned min_load = std::numeric_limits<int>::max();
151 Worker* current_best = nullptr;
154 // find worker with least references
155 // tempting case is returning on references == 0, but in reality
156 // this will happen so rarely that there's no need for special case.
157 for (unsigned i = 0; i < num_workers; ++i) {
158 unsigned worker_load = workers[i]->references.load();
159 if (worker_load < min_load) {
160 current_best = workers[i];
161 min_load = worker_load;
166 assert(current_best);
167 ++current_best->references;
171 void NetworkStack::stop()
173 Spinlock::Locker l(pool_spin);
174 for (unsigned i = 0; i < num_workers; ++i) {
175 workers[i]->done = true;
176 workers[i]->center.wakeup();
182 class C_drain : public EventCallback {
185 unsigned drain_count;
188 explicit C_drain(size_t c)
189 : drain_lock("C_drain::drain_lock"),
191 void do_request(int id) override {
192 Mutex::Locker l(drain_lock);
194 if (drain_count == 0) drain_cond.Signal();
197 Mutex::Locker l(drain_lock);
199 drain_cond.Wait(drain_lock);
203 void NetworkStack::drain()
205 ldout(cct, 30) << __func__ << " started." << dendl;
206 pthread_t cur = pthread_self();
208 C_drain drain(num_workers);
209 for (unsigned i = 0; i < num_workers; ++i) {
210 assert(cur != workers[i]->center.get_owner());
211 workers[i]->center.dispatch_event_external(EventCallbackRef(&drain));
215 ldout(cct, 30) << __func__ << " end." << dendl;