Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / rdma / RDMAStack.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 XSKY <haomai@xsky.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #ifndef CEPH_MSG_RDMASTACK_H
18 #define CEPH_MSG_RDMASTACK_H
19
20 #include <sys/eventfd.h>
21
22 #include <list>
23 #include <vector>
24 #include <thread>
25
26 #include "common/ceph_context.h"
27 #include "common/debug.h"
28 #include "common/errno.h"
29 #include "msg/async/Stack.h"
30 #include "Infiniband.h"
31
32 class RDMAConnectedSocketImpl;
33 class RDMAServerSocketImpl;
34 class RDMAStack;
35 class RDMAWorker;
36
37 enum {
38   l_msgr_rdma_dispatcher_first = 94000,
39
40   l_msgr_rdma_polling,
41   l_msgr_rdma_inflight_tx_chunks,
42   l_msgr_rdma_inqueue_rx_chunks,
43
44   l_msgr_rdma_tx_total_wc,
45   l_msgr_rdma_tx_total_wc_errors,
46   l_msgr_rdma_tx_wc_retry_errors,
47   l_msgr_rdma_tx_wc_wr_flush_errors,
48
49   l_msgr_rdma_rx_total_wc,
50   l_msgr_rdma_rx_total_wc_errors,
51   l_msgr_rdma_rx_fin,
52
53   l_msgr_rdma_handshake_errors,
54
55   l_msgr_rdma_total_async_events,
56   l_msgr_rdma_async_last_wqe_events,
57
58   l_msgr_rdma_created_queue_pair,
59   l_msgr_rdma_active_queue_pair,
60
61   l_msgr_rdma_dispatcher_last,
62 };
63
64
65 class RDMADispatcher {
66   typedef Infiniband::MemoryManager::Chunk Chunk;
67   typedef Infiniband::QueuePair QueuePair;
68
69   std::thread t;
70   CephContext *cct;
71   Infiniband::CompletionQueue* tx_cq;
72   Infiniband::CompletionQueue* rx_cq;
73   Infiniband::CompletionChannel *tx_cc, *rx_cc;
74   EventCallbackRef async_handler;
75   bool done = false;
76   std::atomic<uint64_t> num_dead_queue_pair = {0};
77   std::atomic<uint64_t> num_qp_conn = {0};
78   Mutex lock; // protect `qp_conns`, `dead_queue_pairs`
79   // qp_num -> InfRcConnection
80   // The main usage of `qp_conns` is looking up connection by qp_num,
81   // so the lifecycle of element in `qp_conns` is the lifecycle of qp.
82   //// make qp queue into dead state
83   /**
84    * 1. Connection call mark_down
85    * 2. Move the Queue Pair into the Error state(QueuePair::to_dead)
86    * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event)
87    * 4. Wait for CQ to be empty(handle_tx_event)
88    * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event)
89    *
90    * @param qp The qp needed to dead
91    */
92   ceph::unordered_map<uint32_t, std::pair<QueuePair*, RDMAConnectedSocketImpl*> > qp_conns;
93
94   /// if a queue pair is closed when transmit buffers are active
95   /// on it, the transmit buffers never get returned via tx_cq.  To
96   /// work around this problem, don't delete queue pairs immediately. Instead,
97   /// save them in this vector and delete them at a safe time, when there are
98   /// no outstanding transmit buffers to be lost.
99   std::vector<QueuePair*> dead_queue_pairs;
100
101   std::atomic<uint64_t> num_pending_workers = {0};
102   Mutex w_lock; // protect pending workers
103   // fixme: lockfree
104   std::list<RDMAWorker*> pending_workers;
105   RDMAStack* stack;
106
107   class C_handle_cq_async : public EventCallback {
108     RDMADispatcher *dispatcher;
109    public:
110     C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
111     void do_request(int fd) {
112       // worker->handle_tx_event();
113       dispatcher->handle_async_event();
114     }
115   };
116
117  public:
118   PerfCounters *perf_logger;
119
120   explicit RDMADispatcher(CephContext* c, RDMAStack* s);
121   virtual ~RDMADispatcher();
122   void handle_async_event();
123
124   void polling_start();
125   void polling_stop();
126   void polling();
127   int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
128   void make_pending_worker(RDMAWorker* w) {
129     Mutex::Locker l(w_lock);
130     auto it = std::find(pending_workers.begin(), pending_workers.end(), w);
131     if (it != pending_workers.end())
132       return;
133     pending_workers.push_back(w);
134     ++num_pending_workers;
135   }
136   RDMAStack* get_stack() { return stack; }
137   RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
138   void erase_qpn_lockless(uint32_t qpn);
139   void erase_qpn(uint32_t qpn);
140   Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
141   Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; }
142   void notify_pending_workers();
143   void handle_tx_event(ibv_wc *cqe, int n);
144   void post_tx_buffer(std::vector<Chunk*> &chunks);
145
146   std::atomic<uint64_t> inflight = {0};
147 };
148
149
150 enum {
151   l_msgr_rdma_first = 95000,
152
153   l_msgr_rdma_tx_no_mem,
154   l_msgr_rdma_tx_parital_mem,
155   l_msgr_rdma_tx_failed,
156   l_msgr_rdma_rx_no_registered_mem,
157
158   l_msgr_rdma_tx_chunks,
159   l_msgr_rdma_tx_bytes,
160   l_msgr_rdma_rx_chunks,
161   l_msgr_rdma_rx_bytes,
162   l_msgr_rdma_pending_sent_conns,
163
164   l_msgr_rdma_last,
165 };
166
167 class RDMAWorker : public Worker {
168   typedef Infiniband::CompletionQueue CompletionQueue;
169   typedef Infiniband::CompletionChannel CompletionChannel;
170   typedef Infiniband::MemoryManager::Chunk Chunk;
171   typedef Infiniband::MemoryManager MemoryManager;
172   typedef std::vector<Chunk*>::iterator ChunkIter;
173   RDMAStack *stack;
174   EventCallbackRef tx_handler;
175   std::list<RDMAConnectedSocketImpl*> pending_sent_conns;
176   RDMADispatcher* dispatcher = nullptr;
177   Mutex lock;
178
179   class C_handle_cq_tx : public EventCallback {
180     RDMAWorker *worker;
181     public:
182     C_handle_cq_tx(RDMAWorker *w): worker(w) {}
183     void do_request(int fd) {
184       worker->handle_pending_message();
185     }
186   };
187
188  public:
189   PerfCounters *perf_logger;
190   explicit RDMAWorker(CephContext *c, unsigned i);
191   virtual ~RDMAWorker();
192   virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
193   virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
194   virtual void initialize() override;
195   RDMAStack *get_stack() { return stack; }
196   int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes);
197   void remove_pending_conn(RDMAConnectedSocketImpl *o) {
198     assert(center.in_thread());
199     pending_sent_conns.remove(o);
200   }
201   void handle_pending_message();
202   void set_stack(RDMAStack *s) { stack = s; }
203   void notify_worker() {
204     center.dispatch_event_external(tx_handler);
205   }
206 };
207
208 class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
209  public:
210   typedef Infiniband::MemoryManager::Chunk Chunk;
211   typedef Infiniband::CompletionChannel CompletionChannel;
212   typedef Infiniband::CompletionQueue CompletionQueue;
213
214  private:
215   CephContext *cct;
216   Infiniband::QueuePair *qp;
217   IBSYNMsg peer_msg;
218   IBSYNMsg my_msg;
219   int connected;
220   int error;
221   Infiniband* infiniband;
222   RDMADispatcher* dispatcher;
223   RDMAWorker* worker;
224   std::vector<Chunk*> buffers;
225   int notify_fd = -1;
226   bufferlist pending_bl;
227
228   Mutex lock;
229   std::vector<ibv_wc> wc;
230   bool is_server;
231   EventCallbackRef con_handler;
232   int tcp_fd = -1;
233   bool active;// qp is active ?
234   bool pending;
235
236   void notify();
237   ssize_t read_buffers(char* buf, size_t len);
238   int post_work_request(std::vector<Chunk*>&);
239
240  public:
241   RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
242                           RDMAWorker *w);
243   virtual ~RDMAConnectedSocketImpl();
244
245   void pass_wc(std::vector<ibv_wc> &&v);
246   void get_wc(std::vector<ibv_wc> &w);
247   virtual int is_connected() override { return connected; }
248
249   virtual ssize_t read(char* buf, size_t len) override;
250   virtual ssize_t zero_copy_read(bufferptr &data) override;
251   virtual ssize_t send(bufferlist &bl, bool more) override;
252   virtual void shutdown() override;
253   virtual void close() override;
254   virtual int fd() const override { return notify_fd; }
255   void fault();
256   const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
257   ssize_t submit(bool more);
258   int activate();
259   void fin();
260   void handle_connection();
261   void cleanup();
262   void set_accept_fd(int sd);
263   int try_connect(const entity_addr_t&, const SocketOptions &opt);
264   bool is_pending() {return pending;}
265   void set_pending(bool val) {pending = val;}
266   class C_handle_connection : public EventCallback {
267     RDMAConnectedSocketImpl *csi;
268     bool active;
269    public:
270     C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
271     void do_request(int fd) {
272       if (active)
273         csi->handle_connection();
274     }
275     void close() {
276       active = false;
277     }
278   };
279 };
280
281 class RDMAServerSocketImpl : public ServerSocketImpl {
282   CephContext *cct;
283   NetHandler net;
284   int server_setup_socket;
285   Infiniband* infiniband;
286   RDMADispatcher *dispatcher;
287   RDMAWorker *worker;
288   entity_addr_t sa;
289
290  public:
291   RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
292
293   int listen(entity_addr_t &sa, const SocketOptions &opt);
294   virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
295   virtual void abort_accept() override;
296   virtual int fd() const override { return server_setup_socket; }
297   int get_fd() { return server_setup_socket; }
298 };
299
300 class RDMAStack : public NetworkStack {
301   vector<std::thread> threads;
302   RDMADispatcher *dispatcher;
303
304   std::atomic<bool> fork_finished = {false};
305
306  public:
307   explicit RDMAStack(CephContext *cct, const string &t);
308   virtual ~RDMAStack();
309   virtual bool support_zero_copy_read() const override { return false; }
310   virtual bool nonblock_connect_need_writable_event() const { return false; }
311
312   virtual void spawn_worker(unsigned i, std::function<void ()> &&func) override;
313   virtual void join_worker(unsigned i) override;
314   RDMADispatcher *get_dispatcher() { return dispatcher; }
315
316   virtual bool is_ready() override { return fork_finished.load(); };
317   virtual void ready() override { fork_finished = true; };
318 };
319
320 #endif