Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / Stack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 XSky <haomai@xsky.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #include "include/compat.h"
18 #include "common/Cond.h"
19 #include "common/errno.h"
20 #include "PosixStack.h"
21 #ifdef HAVE_RDMA
22 #include "rdma/RDMAStack.h"
23 #endif
24 #ifdef HAVE_DPDK
25 #include "dpdk/DPDKStack.h"
26 #endif
27
28 #include "common/dout.h"
29 #include "include/assert.h"
30
31 #define dout_subsys ceph_subsys_ms
32 #undef dout_prefix
33 #define dout_prefix *_dout << "stack "
34
35 std::function<void ()> NetworkStack::add_thread(unsigned i)
36 {
37   Worker *w = workers[i];
38   return [this, w]() {
39       char tp_name[16];
40       sprintf(tp_name, "msgr-worker-%d", w->id);
41       ceph_pthread_setname(pthread_self(), tp_name);
42       const uint64_t EventMaxWaitUs = 30000000;
43       w->center.set_owner();
44       ldout(cct, 10) << __func__ << " starting" << dendl;
45       w->initialize();
46       w->init_done();
47       while (!w->done) {
48         ldout(cct, 30) << __func__ << " calling event process" << dendl;
49
50         ceph::timespan dur;
51         int r = w->center.process_events(EventMaxWaitUs, &dur);
52         if (r < 0) {
53           ldout(cct, 20) << __func__ << " process events failed: "
54                          << cpp_strerror(errno) << dendl;
55           // TODO do something?
56         }
57         w->perf_logger->tinc(l_msgr_running_total_time, dur);
58       }
59       w->reset();
60       w->destroy();
61   };
62 }
63
64 std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string &t)
65 {
66   if (t == "posix")
67     return std::make_shared<PosixNetworkStack>(c, t);
68 #ifdef HAVE_RDMA
69   else if (t == "rdma")
70     return std::make_shared<RDMAStack>(c, t);
71 #endif
72 #ifdef HAVE_DPDK
73   else if (t == "dpdk")
74     return std::make_shared<DPDKStack>(c, t);
75 #endif
76
77   lderr(c) << __func__ << " ms_async_transport_type " << t <<
78     " is not supported! " << dendl;
79   ceph_abort();
80   return nullptr;
81 }
82
83 Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i)
84 {
85   if (type == "posix")
86     return new PosixWorker(c, i);
87 #ifdef HAVE_RDMA
88   else if (type == "rdma")
89     return new RDMAWorker(c, i);
90 #endif
91 #ifdef HAVE_DPDK
92   else if (type == "dpdk")
93     return new DPDKWorker(c, i);
94 #endif
95
96   lderr(c) << __func__ << " ms_async_transport_type " << type <<
97     " is not supported! " << dendl;
98   ceph_abort();
99   return nullptr;
100 }
101
102 NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
103 {
104   assert(cct->_conf->ms_async_op_threads > 0);
105
106   const uint64_t InitEventNumber = 5000;
107   num_workers = cct->_conf->ms_async_op_threads;
108   if (num_workers >= EventCenter::MAX_EVENTCENTER) {
109     ldout(cct, 0) << __func__ << " max thread limit is "
110                   << EventCenter::MAX_EVENTCENTER << ", switching to this now. "
111                   << "Higher thread values are unnecessary and currently unsupported."
112                   << dendl;
113     num_workers = EventCenter::MAX_EVENTCENTER;
114   }
115
116   for (unsigned i = 0; i < num_workers; ++i) {
117     Worker *w = create_worker(cct, type, i);
118     w->center.init(InitEventNumber, i, type);
119     workers.push_back(w);
120   }
121   cct->register_fork_watcher(this);
122 }
123
124 void NetworkStack::start()
125 {
126   pool_spin.lock();
127   if (started) {
128     pool_spin.unlock();
129     return ;
130   }
131
132   for (unsigned i = 0; i < num_workers; ++i) {
133     if (workers[i]->is_init())
134       continue;
135     std::function<void ()> thread = add_thread(i);
136     spawn_worker(i, std::move(thread));
137   }
138   started = true;
139   pool_spin.unlock();
140
141   for (unsigned i = 0; i < num_workers; ++i)
142     workers[i]->wait_for_init();
143 }
144
145 Worker* NetworkStack::get_worker()
146 {
147   ldout(cct, 30) << __func__ << dendl;
148
149    // start with some reasonably large number
150   unsigned min_load = std::numeric_limits<int>::max();
151   Worker* current_best = nullptr;
152
153   pool_spin.lock();
154   // find worker with least references
155   // tempting case is returning on references == 0, but in reality
156   // this will happen so rarely that there's no need for special case.
157   for (unsigned i = 0; i < num_workers; ++i) {
158     unsigned worker_load = workers[i]->references.load();
159     if (worker_load < min_load) {
160       current_best = workers[i];
161       min_load = worker_load;
162     }
163   }
164
165   pool_spin.unlock();
166   assert(current_best);
167   ++current_best->references;
168   return current_best;
169 }
170
171 void NetworkStack::stop()
172 {
173   Spinlock::Locker l(pool_spin);
174   for (unsigned i = 0; i < num_workers; ++i) {
175     workers[i]->done = true;
176     workers[i]->center.wakeup();
177     join_worker(i);
178   }
179   started = false;
180 }
181
182 class C_drain : public EventCallback {
183   Mutex drain_lock;
184   Cond drain_cond;
185   unsigned drain_count;
186
187  public:
188   explicit C_drain(size_t c)
189       : drain_lock("C_drain::drain_lock"),
190         drain_count(c) {}
191   void do_request(int id) override {
192     Mutex::Locker l(drain_lock);
193     drain_count--;
194     if (drain_count == 0) drain_cond.Signal();
195   }
196   void wait() {
197     Mutex::Locker l(drain_lock);
198     while (drain_count)
199       drain_cond.Wait(drain_lock);
200   }
201 };
202
203 void NetworkStack::drain()
204 {
205   ldout(cct, 30) << __func__ << " started." << dendl;
206   pthread_t cur = pthread_self();
207   pool_spin.lock();
208   C_drain drain(num_workers);
209   for (unsigned i = 0; i < num_workers; ++i) {
210     assert(cur != workers[i]->center.get_owner());
211     workers[i]->center.dispatch_event_external(EventCallbackRef(&drain));
212   }
213   pool_spin.unlock();
214   drain.wait();
215   ldout(cct, 30) << __func__ << " end." << dendl;
216 }