Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / Stack.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 XSKY <haomai@xsky.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #ifndef CEPH_MSG_ASYNC_STACK_H
18 #define CEPH_MSG_ASYNC_STACK_H
19
20 #include "include/Spinlock.h"
21 #include "common/perf_counters.h"
22 #include "common/simple_spin.h"
23 #include "msg/msg_types.h"
24 #include "msg/async/Event.h"
25
26 class Worker;
27 class ConnectedSocketImpl {
28  public:
29   virtual ~ConnectedSocketImpl() {}
30   virtual int is_connected() = 0;
31   virtual ssize_t read(char*, size_t) = 0;
32   virtual ssize_t zero_copy_read(bufferptr&) = 0;
33   virtual ssize_t send(bufferlist &bl, bool more) = 0;
34   virtual void shutdown() = 0;
35   virtual void close() = 0;
36   virtual int fd() const = 0;
37 };
38
39 class ConnectedSocket;
40 struct SocketOptions {
41   bool nonblock = true;
42   bool nodelay = true;
43   int rcbuf_size = 0;
44   int priority = -1;
45   entity_addr_t connect_bind_addr;
46 };
47
48 /// \cond internal
49 class ServerSocketImpl {
50  public:
51   virtual ~ServerSocketImpl() {}
52   virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
53   virtual void abort_accept() = 0;
54   /// Get file descriptor
55   virtual int fd() const = 0;
56 };
57 /// \endcond
58
59 /// \addtogroup networking-module
60 /// @{
61
62 /// A TCP (or other stream-based protocol) connection.
63 ///
64 /// A \c ConnectedSocket represents a full-duplex stream between
65 /// two endpoints, a local endpoint and a remote endpoint.
66 class ConnectedSocket {
67   std::unique_ptr<ConnectedSocketImpl> _csi;
68
69  public:
70   /// Constructs a \c ConnectedSocket not corresponding to a connection
71   ConnectedSocket() {};
72   /// \cond internal
73   explicit ConnectedSocket(std::unique_ptr<ConnectedSocketImpl> csi)
74       : _csi(std::move(csi)) {}
75   /// \endcond
76    ~ConnectedSocket() {
77     if (_csi)
78       _csi->close();
79   }
80   /// Moves a \c ConnectedSocket object.
81   ConnectedSocket(ConnectedSocket&& cs) = default;
82   /// Move-assigns a \c ConnectedSocket object.
83   ConnectedSocket& operator=(ConnectedSocket&& cs) = default;
84
85   int is_connected() {
86     return _csi->is_connected();
87   }
88   /// Read the input stream with copy.
89   ///
90   /// Copy an object returning data sent from the remote endpoint.
91   ssize_t read(char* buf, size_t len) {
92     return _csi->read(buf, len);
93   }
94   /// Gets the input stream.
95   ///
96   /// Gets an object returning data sent from the remote endpoint.
97   ssize_t zero_copy_read(bufferptr &data) {
98     return _csi->zero_copy_read(data);
99   }
100   /// Gets the output stream.
101   ///
102   /// Gets an object that sends data to the remote endpoint.
103   ssize_t send(bufferlist &bl, bool more) {
104     return _csi->send(bl, more);
105   }
106   /// Disables output to the socket.
107   ///
108   /// Current or future writes that have not been successfully flushed
109   /// will immediately fail with an error.  This is useful to abort
110   /// operations on a socket that is not making progress due to a
111   /// peer failure.
112   void shutdown() {
113     return _csi->shutdown();
114   }
115   /// Disables input from the socket.
116   ///
117   /// Current or future reads will immediately fail with an error.
118   /// This is useful to abort operations on a socket that is not making
119   /// progress due to a peer failure.
120   void close() {
121     _csi->close();
122     _csi.reset();
123   }
124
125   /// Get file descriptor
126   int fd() const {
127     return _csi->fd();
128   }
129
130   explicit operator bool() const {
131     return _csi.get();
132   }
133 };
134 /// @}
135
136 /// \addtogroup networking-module
137 /// @{
138
139 /// A listening socket, waiting to accept incoming network connections.
140 class ServerSocket {
141   std::unique_ptr<ServerSocketImpl> _ssi;
142  public:
143   /// Constructs a \c ServerSocket not corresponding to a connection
144   ServerSocket() {}
145   /// \cond internal
146   explicit ServerSocket(std::unique_ptr<ServerSocketImpl> ssi)
147       : _ssi(std::move(ssi)) {}
148   ~ServerSocket() {
149     if (_ssi)
150       _ssi->abort_accept();
151   }
152   /// \endcond
153   /// Moves a \c ServerSocket object.
154   ServerSocket(ServerSocket&& ss) = default;
155   /// Move-assigns a \c ServerSocket object.
156   ServerSocket& operator=(ServerSocket&& cs) = default;
157
158   /// Accepts the next connection to successfully connect to this socket.
159   ///
160   /// \Accepts a \ref ConnectedSocket representing the connection, and
161   ///          a \ref entity_addr_t describing the remote endpoint.
162   int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
163     return _ssi->accept(sock, opt, out, w);
164   }
165
166   /// Stops any \ref accept() in progress.
167   ///
168   /// Current and future \ref accept() calls will terminate immediately
169   /// with an error.
170   void abort_accept() {
171     _ssi->abort_accept();
172     _ssi.reset();
173   }
174
175   /// Get file descriptor
176   int fd() const {
177     return _ssi->fd();
178   }
179
180   explicit operator bool() const {
181     return _ssi.get();
182   }
183 };
184 /// @}
185
186 class NetworkStack;
187
188 enum {
189   l_msgr_first = 94000,
190   l_msgr_recv_messages,
191   l_msgr_send_messages,
192   l_msgr_recv_bytes,
193   l_msgr_send_bytes,
194   l_msgr_created_connections,
195   l_msgr_active_connections,
196
197   l_msgr_running_total_time,
198   l_msgr_running_send_time,
199   l_msgr_running_recv_time,
200   l_msgr_running_fast_dispatch_time,
201
202   l_msgr_last,
203 };
204
205 class Worker {
206   std::mutex init_lock;
207   std::condition_variable init_cond;
208   bool init = false;
209
210  public:
211   bool done = false;
212
213   CephContext *cct;
214   PerfCounters *perf_logger;
215   unsigned id;
216
217   std::atomic_uint references;
218   EventCenter center;
219
220   Worker(const Worker&) = delete;
221   Worker& operator=(const Worker&) = delete;
222
223   Worker(CephContext *c, unsigned i)
224     : cct(c), perf_logger(NULL), id(i), references(0), center(c) {
225     char name[128];
226     sprintf(name, "AsyncMessenger::Worker-%u", id);
227     // initialize perf_logger
228     PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
229
230     plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
231     plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
232     plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
233     plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network sent bytes");
234     plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
235     plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
236
237     plb.add_time(l_msgr_running_total_time, "msgr_running_total_time", "The total time of thread running");
238     plb.add_time(l_msgr_running_send_time, "msgr_running_send_time", "The total time of message sending");
239     plb.add_time(l_msgr_running_recv_time, "msgr_running_recv_time", "The total time of message receiving");
240     plb.add_time(l_msgr_running_fast_dispatch_time, "msgr_running_fast_dispatch_time", "The total time of fast dispatch");
241
242     perf_logger = plb.create_perf_counters();
243     cct->get_perfcounters_collection()->add(perf_logger);
244   }
245   virtual ~Worker() {
246     if (perf_logger) {
247       cct->get_perfcounters_collection()->remove(perf_logger);
248       delete perf_logger;
249     }
250   }
251
252   virtual int listen(entity_addr_t &addr,
253                      const SocketOptions &opts, ServerSocket *) = 0;
254   virtual int connect(const entity_addr_t &addr,
255                       const SocketOptions &opts, ConnectedSocket *socket) = 0;
256   virtual void destroy() {}
257
258   virtual void initialize() {}
259   PerfCounters *get_perf_counter() { return perf_logger; }
260   void release_worker() {
261     int oldref = references.fetch_sub(1);
262     assert(oldref > 0);
263   }
264   void init_done() {
265     init_lock.lock();
266     init = true;
267     init_cond.notify_all();
268     init_lock.unlock();
269   }
270   bool is_init() {
271     std::lock_guard<std::mutex> l(init_lock);
272     return init;
273   }
274   void wait_for_init() {
275     std::unique_lock<std::mutex> l(init_lock);
276     while (!init)
277       init_cond.wait(l);
278   }
279   void reset() {
280     init_lock.lock();
281     init = false;
282     init_cond.notify_all();
283     init_lock.unlock();
284     done = false;
285   }
286 };
287
288 class NetworkStack : public CephContext::ForkWatcher {
289   std::string type;
290   unsigned num_workers = 0;
291   Spinlock pool_spin;
292   bool started = false;
293
294   std::function<void ()> add_thread(unsigned i);
295
296  protected:
297   CephContext *cct;
298   vector<Worker*> workers;
299
300   explicit NetworkStack(CephContext *c, const string &t);
301  public:
302   NetworkStack(const NetworkStack &) = delete;
303   NetworkStack& operator=(const NetworkStack &) = delete;
304   ~NetworkStack() override {
305     for (auto &&w : workers)
306       delete w;
307   }
308
309   static std::shared_ptr<NetworkStack> create(
310           CephContext *c, const string &type);
311
312   static Worker* create_worker(
313           CephContext *c, const string &t, unsigned i);
314   // backend need to override this method if supports zero copy read
315   virtual bool support_zero_copy_read() const { return false; }
316   // backend need to override this method if backend doesn't support shared
317   // listen table.
318   // For example, posix backend has in kernel global listen table. If one
319   // thread bind a port, other threads also aware this.
320   // But for dpdk backend, we maintain listen table in each thread. So we
321   // need to let each thread do binding port.
322   virtual bool support_local_listen_table() const { return false; }
323   virtual bool nonblock_connect_need_writable_event() const { return true; }
324
325   void start();
326   void stop();
327   virtual Worker *get_worker();
328   Worker *get_worker(unsigned i) {
329     return workers[i];
330   }
331   void drain();
332   unsigned get_num_worker() const {
333     return num_workers;
334   }
335
336   // direct is used in tests only
337   virtual void spawn_worker(unsigned i, std::function<void ()> &&) = 0;
338   virtual void join_worker(unsigned i) = 0;
339
340   void handle_pre_fork() override {
341     stop();
342   }
343
344   void handle_post_fork() override {
345     start();
346   }
347
348   virtual bool is_ready() { return true; };
349   virtual void ready() { };
350 };
351
352 #endif //CEPH_MSG_ASYNC_STACK_H