Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / rdma / RDMAStack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 XSKY <haomai@xsky.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #include <poll.h>
18 #include <sys/time.h>
19 #include <sys/resource.h>
20
21 #include "include/str_list.h"
22 #include "common/deleter.h"
23 #include "common/Tub.h"
24 #include "RDMAStack.h"
25
26 #define dout_subsys ceph_subsys_ms
27 #undef dout_prefix
28 #define dout_prefix *_dout << "RDMAStack "
29
30 static Tub<Infiniband> global_infiniband;
31
32 RDMADispatcher::~RDMADispatcher()
33 {
34   done = true;
35   polling_stop();
36   ldout(cct, 20) << __func__ << " destructing rdma dispatcher" << dendl;
37
38   assert(qp_conns.empty());
39   assert(num_qp_conn == 0);
40   assert(dead_queue_pairs.empty());
41   assert(num_dead_queue_pair == 0);
42
43   tx_cc->ack_events();
44   rx_cc->ack_events();
45   delete tx_cq;
46   delete rx_cq;
47   delete tx_cc;
48   delete rx_cc;
49   delete async_handler;
50
51   global_infiniband->set_dispatcher(nullptr);
52 }
53
54 RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
55   : cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
56   w_lock("RDMADispatcher::for worker pending list"), stack(s)
57 {
58   PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
59
60   plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
61   plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");
62   plb.add_u64_counter(l_msgr_rdma_inqueue_rx_chunks, "inqueue_rx_chunks", "The number of inqueue rx chunks");
63
64   plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
65   plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
66   plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
67   plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");
68
69   plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
70   plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
71   plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");
72
73   plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
74   plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");
75
76   plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");
77
78
79   plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
80   plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");
81
82   perf_logger = plb.create_perf_counters();
83   cct->get_perfcounters_collection()->add(perf_logger);
84 }
85
86 void RDMADispatcher::polling_start()
87 {
88   tx_cc = global_infiniband->create_comp_channel(cct);
89   assert(tx_cc);
90   rx_cc = global_infiniband->create_comp_channel(cct);
91   assert(rx_cc);
92   tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
93   assert(tx_cq);
94   rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
95   assert(rx_cq);
96
97   t = std::thread(&RDMADispatcher::polling, this);
98 }
99
100 void RDMADispatcher::polling_stop()
101 {
102   if (t.joinable())
103     t.join();
104 }
105
106 void RDMADispatcher::handle_async_event()
107 {
108   ldout(cct, 30) << __func__ << dendl;
109   while (1) {
110     ibv_async_event async_event;
111     if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
112       if (errno != EAGAIN)
113        lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
114                   << " " << cpp_strerror(errno) << ")" << dendl;
115       return;
116     }
117     perf_logger->inc(l_msgr_rdma_total_async_events);
118     // FIXME: Currently we must ensure no other factor make QP in ERROR state,
119     // otherwise this qp can't be deleted in current cleanup flow.
120     if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
121       perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
122       uint64_t qpn = async_event.element.qp->qp_num;
123       ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
124                      << " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
125       Mutex::Locker l(lock);
126       RDMAConnectedSocketImpl *conn = get_conn_lockless(qpn);
127       if (!conn) {
128         ldout(cct, 1) << __func__ << " missing qp_num=" << qpn << " discard event" << dendl;
129       } else {
130         ldout(cct, 1) << __func__ << " it's not forwardly stopped by us, reenable=" << conn << dendl;
131         conn->fault();
132         erase_qpn_lockless(qpn);
133       }
134     } else {
135       ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
136                     << " evt: " << ibv_event_type_str(async_event.event_type)
137                     << dendl;
138     }
139     ibv_ack_async_event(&async_event);
140   }
141 }
142
143 void RDMADispatcher::polling()
144 {
145   static int MAX_COMPLETIONS = 32;
146   ibv_wc wc[MAX_COMPLETIONS];
147
148   std::map<RDMAConnectedSocketImpl*, std::vector<ibv_wc> > polled;
149   std::vector<ibv_wc> tx_cqe;
150   ldout(cct, 20) << __func__ << " going to poll tx cq: " << tx_cq << " rx cq: " << rx_cq << dendl;
151   RDMAConnectedSocketImpl *conn = nullptr;
152   utime_t last_inactive = ceph_clock_now();
153   bool rearmed = false;
154   int r = 0;
155
156   while (true) {
157     int tx_ret = tx_cq->poll_cq(MAX_COMPLETIONS, wc);
158     if (tx_ret > 0) {
159       ldout(cct, 20) << __func__ << " tx completion queue got " << tx_ret
160                      << " responses."<< dendl;
161       handle_tx_event(wc, tx_ret);
162     }
163
164     int rx_ret = rx_cq->poll_cq(MAX_COMPLETIONS, wc);
165     if (rx_ret > 0) {
166       ldout(cct, 20) << __func__ << " rt completion queue got " << rx_ret
167                      << " responses."<< dendl;
168       perf_logger->inc(l_msgr_rdma_rx_total_wc, rx_ret);
169
170       Mutex::Locker l(lock);//make sure connected socket alive when pass wc
171       for (int i = 0; i < rx_ret; ++i) {
172         ibv_wc* response = &wc[i];
173         Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
174         ldout(cct, 25) << __func__ << " got chunk=" << chunk << " bytes:" << response->byte_len << " opcode:" << response->opcode << dendl;
175
176         assert(wc[i].opcode == IBV_WC_RECV);
177
178         if (response->status == IBV_WC_SUCCESS) {
179           conn = get_conn_lockless(response->qp_num);
180           if (!conn) {
181             assert(global_infiniband->is_rx_buffer(chunk->buffer));
182             r = global_infiniband->post_chunk(chunk);
183             ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
184             assert(r == 0);
185           } else {
186             polled[conn].push_back(*response);
187           }
188         } else {
189           perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
190           ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
191               << ") status(" << response->status << ":"
192               << global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
193           assert(global_infiniband->is_rx_buffer(chunk->buffer));
194           r = global_infiniband->post_chunk(chunk);
195           if (r) {
196             ldout(cct, 0) << __func__ << " post chunk failed, error: " << cpp_strerror(r) << dendl;
197             assert(r == 0);
198           }
199
200           conn = get_conn_lockless(response->qp_num);
201           if (conn && conn->is_connected())
202             conn->fault();
203         }
204       }
205
206       for (auto &&i : polled) {
207         perf_logger->inc(l_msgr_rdma_inqueue_rx_chunks, i.second.size());
208         i.first->pass_wc(std::move(i.second));
209       }
210       polled.clear();
211     }
212
213     if (!tx_ret && !rx_ret) {
214       // NOTE: Has TX just transitioned to idle? We should do it when idle!
215       // It's now safe to delete queue pairs (see comment by declaration
216       // for dead_queue_pairs).
217       // Additionally, don't delete qp while outstanding_buffers isn't empty,
218       // because we need to check qp's state before sending
219       perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
220       if (num_dead_queue_pair) {
221         Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
222         while (!dead_queue_pairs.empty()) {
223           ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.back() << dendl;
224           delete dead_queue_pairs.back();
225           perf_logger->dec(l_msgr_rdma_active_queue_pair);
226           dead_queue_pairs.pop_back();
227           --num_dead_queue_pair;
228         }
229       }
230       if (!num_qp_conn && done)
231         break;
232
233       if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
234         handle_async_event();
235         if (!rearmed) {
236           // Clean up cq events after rearm notify ensure no new incoming event
237           // arrived between polling and rearm
238           tx_cq->rearm_notify();
239           rx_cq->rearm_notify();
240           rearmed = true;
241           continue;
242         }
243
244         struct pollfd channel_poll[2];
245         channel_poll[0].fd = tx_cc->get_fd();
246         channel_poll[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
247         channel_poll[0].revents = 0;
248         channel_poll[1].fd = rx_cc->get_fd();
249         channel_poll[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
250         channel_poll[1].revents = 0;
251         r = 0;
252         perf_logger->set(l_msgr_rdma_polling, 0);
253         while (!done && r == 0) {
254           r = poll(channel_poll, 2, 100);
255           if (r < 0) {
256             r = -errno;
257             lderr(cct) << __func__ << " poll failed " << r << dendl;
258             ceph_abort();
259           }
260         }
261         if (r > 0 && tx_cc->get_cq_event())
262           ldout(cct, 20) << __func__ << " got tx cq event." << dendl;
263         if (r > 0 && rx_cc->get_cq_event())
264           ldout(cct, 20) << __func__ << " got rx cq event." << dendl;
265         last_inactive = ceph_clock_now();
266         perf_logger->set(l_msgr_rdma_polling, 1);
267         rearmed = false;
268       }
269     }
270   }
271 }
272
273 void RDMADispatcher::notify_pending_workers() {
274   if (num_pending_workers) {
275     RDMAWorker *w = nullptr;
276     {
277       Mutex::Locker l(w_lock);
278       if (!pending_workers.empty()) {
279         w = pending_workers.front();
280         pending_workers.pop_front();
281         --num_pending_workers;
282       }
283     }
284     if (w)
285       w->notify_worker();
286   }
287 }
288
289 int RDMADispatcher::register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi)
290 {
291   int fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
292   assert(fd >= 0);
293   Mutex::Locker l(lock);
294   assert(!qp_conns.count(qp->get_local_qp_number()));
295   qp_conns[qp->get_local_qp_number()] = std::make_pair(qp, csi);
296   ++num_qp_conn;
297   return fd;
298 }
299
300 RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
301 {
302   auto it = qp_conns.find(qp);
303   if (it == qp_conns.end())
304     return nullptr;
305   if (it->second.first->is_dead())
306     return nullptr;
307   return it->second.second;
308 }
309
310 void RDMADispatcher::erase_qpn_lockless(uint32_t qpn)
311 {
312   auto it = qp_conns.find(qpn);
313   if (it == qp_conns.end())
314     return ;
315   ++num_dead_queue_pair;
316   dead_queue_pairs.push_back(it->second.first);
317   qp_conns.erase(it);
318   --num_qp_conn;
319 }
320
321 void RDMADispatcher::erase_qpn(uint32_t qpn)
322 {
323   Mutex::Locker l(lock);
324   erase_qpn_lockless(qpn);
325 }
326
327 void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
328 {
329   std::vector<Chunk*> tx_chunks;
330
331   for (int i = 0; i < n; ++i) {
332     ibv_wc* response = &cqe[i];
333     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
334     ldout(cct, 25) << __func__ << " QP: " << response->qp_num
335                    << " len: " << response->byte_len << " , addr:" << chunk
336                    << " " << global_infiniband->wc_status_to_string(response->status) << dendl;
337
338     if (response->status != IBV_WC_SUCCESS) {
339       perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
340       if (response->status == IBV_WC_RETRY_EXC_ERR) {
341         ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
342         perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
343       } else if (response->status == IBV_WC_WR_FLUSH_ERR) {
344         ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
345                       << response->qp_num << " should be down while this WR=" << response->wr_id
346                       << " still in flight." << dendl;
347         perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
348       } else {
349         ldout(cct, 1) << __func__ << " send work request returned error for buffer("
350                       << response->wr_id << ") status(" << response->status << "): "
351                       << global_infiniband->wc_status_to_string(response->status) << dendl;
352       }
353
354       Mutex::Locker l(lock);//make sure connected socket alive when pass wc
355       RDMAConnectedSocketImpl *conn = get_conn_lockless(response->qp_num);
356
357       if (conn && conn->is_connected()) {
358         ldout(cct, 25) << __func__ << " qp state is : " << conn->get_qp_state() << dendl;//wangzhi
359         conn->fault();
360       } else {
361         ldout(cct, 1) << __func__ << " missing qp_num=" << response->qp_num << " discard event" << dendl;
362       }
363     }
364
365     //TX completion may come either from regular send message or from 'fin' message.
366     //In the case of 'fin' wr_id points to the QueuePair.
367     if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
368       tx_chunks.push_back(chunk);
369     } else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
370       ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
371     } else {
372       ldout(cct, 1) << __func__ << " not tx buffer, chunk " << chunk << dendl;
373       ceph_abort();
374     }
375   }
376
377   perf_logger->inc(l_msgr_rdma_tx_total_wc, n);
378   post_tx_buffer(tx_chunks);
379 }
380
381 /**
382  * Add the given Chunks to the given free queue.
383  *
384  * \param[in] chunks
385  *      The Chunks to enqueue.
386  * \return
387  *      0 if success or -1 for failure
388  */
389 void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
390 {
391   if (chunks.empty())
392     return ;
393
394   inflight -= chunks.size();
395   global_infiniband->get_memory_manager()->return_tx(chunks);
396   ldout(cct, 30) << __func__ << " release " << chunks.size()
397                  << " chunks, inflight " << inflight << dendl;
398   notify_pending_workers();
399 }
400
401
402 RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
403   : Worker(c, i), stack(nullptr),
404     tx_handler(new C_handle_cq_tx(this)), lock("RDMAWorker::lock")
405 {
406   // initialize perf_logger
407   char name[128];
408   sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
409   PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);
410
411   plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
412   plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
413   plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");
414   plb.add_u64_counter(l_msgr_rdma_rx_no_registered_mem, "rx_no_registered_mem", "The count of no registered buffer when receiving");
415
416   plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
417   plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
418   plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
419   plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted");
420   plb.add_u64_counter(l_msgr_rdma_pending_sent_conns, "pending_sent_conns", "The count of pending sent conns");
421
422   perf_logger = plb.create_perf_counters();
423   cct->get_perfcounters_collection()->add(perf_logger);
424 }
425
426 RDMAWorker::~RDMAWorker()
427 {
428   delete tx_handler;
429 }
430
431 void RDMAWorker::initialize()
432 {
433   if (!dispatcher) {
434     dispatcher = stack->get_dispatcher();
435   }
436 }
437
438 int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
439 {
440   global_infiniband->init();
441
442   auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
443   int r = p->listen(sa, opt);
444   if (r < 0) {
445     delete p;
446     return r;
447   }
448
449   *sock = ServerSocket(std::unique_ptr<ServerSocketImpl>(p));
450   return 0;
451 }
452
453 int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
454 {
455   global_infiniband->init();
456
457   RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
458   int r = p->try_connect(addr, opts);
459
460   if (r < 0) {
461     ldout(cct, 1) << __func__ << " try connecting failed." << dendl;
462     delete p;
463     return r;
464   }
465   std::unique_ptr<RDMAConnectedSocketImpl> csi(p);
466   *socket = ConnectedSocket(std::move(csi));
467   return 0;
468 }
469
470 int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
471 {
472   assert(center.in_thread());
473   int r = global_infiniband->get_tx_buffers(c, bytes);
474   assert(r >= 0);
475   size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r;
476   ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered  bytes, inflight " << dispatcher->inflight << dendl;
477   stack->get_dispatcher()->inflight += r;
478   if (got >= bytes)
479     return r;
480
481   if (o) {
482     if (!o->is_pending()) {
483       pending_sent_conns.push_back(o);
484       perf_logger->inc(l_msgr_rdma_pending_sent_conns, 1);
485       o->set_pending(1);
486     }
487     dispatcher->make_pending_worker(this);
488   }
489   return r;
490 }
491
492
493 void RDMAWorker::handle_pending_message()
494 {
495   ldout(cct, 20) << __func__ << " pending conns " << pending_sent_conns.size() << dendl;
496   while (!pending_sent_conns.empty()) {
497     RDMAConnectedSocketImpl *o = pending_sent_conns.front();
498     pending_sent_conns.pop_front();
499     ssize_t r = o->submit(false);
500     ldout(cct, 20) << __func__ << " sent pending bl socket=" << o << " r=" << r << dendl;
501     if (r < 0) {
502       if (r == -EAGAIN) {
503         pending_sent_conns.push_back(o);
504         dispatcher->make_pending_worker(this);
505         return ;
506       }
507       o->fault();
508     }
509     o->set_pending(0);
510     perf_logger->dec(l_msgr_rdma_pending_sent_conns, 1);
511   }
512   dispatcher->notify_pending_workers();
513 }
514
515 RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
516 {
517   //
518   //On RDMA MUST be called before fork
519   //
520
521   int rc = ibv_fork_init();
522   if (rc) {
523      lderr(cct) << __func__ << " failed to call ibv_for_init(). On RDMA must be called before fork. Application aborts." << dendl;
524      ceph_abort();
525   }
526
527   ldout(cct, 1) << __func__ << " ms_async_rdma_enable_hugepage value is: " << cct->_conf->ms_async_rdma_enable_hugepage <<  dendl;
528   if (cct->_conf->ms_async_rdma_enable_hugepage) {
529     rc =  setenv("RDMAV_HUGEPAGES_SAFE","1",1);
530     ldout(cct, 1) << __func__ << " RDMAV_HUGEPAGES_SAFE is set as: " << getenv("RDMAV_HUGEPAGES_SAFE") <<  dendl;
531     if (rc) {
532       lderr(cct) << __func__ << " failed to export RDMA_HUGEPAGES_SAFE. On RDMA must be exported before using huge pages. Application aborts." << dendl;
533       ceph_abort();
534     }
535   }
536
537   //Check ulimit
538   struct rlimit limit;
539   getrlimit(RLIMIT_MEMLOCK, &limit);
540   if (limit.rlim_cur != RLIM_INFINITY || limit.rlim_max != RLIM_INFINITY) {
541      lderr(cct) << __func__ << "!!! WARNING !!! For RDMA to work properly user memlock (ulimit -l) must be big enough to allow large amount of registered memory."
542                                   " We recommend setting this parameter to infinity" << dendl;
543   }
544
545   if (!global_infiniband)
546     global_infiniband.construct(
547       cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
548   ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
549   dispatcher = new RDMADispatcher(cct, this);
550   global_infiniband->set_dispatcher(dispatcher);
551
552   unsigned num = get_num_worker();
553   for (unsigned i = 0; i < num; ++i) {
554     RDMAWorker* w = dynamic_cast<RDMAWorker*>(get_worker(i));
555     w->set_stack(this);
556   }
557
558   ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
559 }
560
561 RDMAStack::~RDMAStack()
562 {
563   if (cct->_conf->ms_async_rdma_enable_hugepage) {
564     unsetenv("RDMAV_HUGEPAGES_SAFE");   //remove env variable on destruction
565   }
566
567   delete dispatcher;
568 }
569
570 void RDMAStack::spawn_worker(unsigned i, std::function<void ()> &&func)
571 {
572   threads.resize(i+1);
573   threads[i] = std::thread(func);
574 }
575
576 void RDMAStack::join_worker(unsigned i)
577 {
578   assert(threads.size() > i && threads[i].joinable());
579   threads[i].join();
580 }