Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / rdma / Infiniband.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 XSKY <haomai@xsky.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #include "Infiniband.h"
18 #include "common/errno.h"
19 #include "common/debug.h"
20 #include "RDMAStack.h"
21
22 #define dout_subsys ceph_subsys_ms
23 #undef dout_prefix
24 #define dout_prefix *_dout << "Infiniband "
25
26 static const uint32_t MAX_SHARED_RX_SGE_COUNT = 1;
27 static const uint32_t MAX_INLINE_DATA = 0;
28 static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");
29 static const uint32_t CQ_DEPTH = 30000;
30
31 Port::Port(CephContext *cct, struct ibv_context* ictxt, uint8_t ipn): ctxt(ictxt), port_num(ipn), port_attr(new ibv_port_attr)
32 {
33 #ifdef HAVE_IBV_EXP
34   union ibv_gid cgid;
35   struct ibv_exp_gid_attr gid_attr;
36   bool malformed = false;
37
38   ldout(cct,1) << __func__ << " using experimental verbs for gid" << dendl;
39   int r = ibv_query_port(ctxt, port_num, port_attr);
40   if (r == -1) {
41     lderr(cct) << __func__  << " query port failed  " << cpp_strerror(errno) << dendl;
42     ceph_abort();
43   }
44
45   lid = port_attr->lid;
46
47   // search for requested GID in GIDs table
48   ldout(cct, 1) << __func__ << " looking for local GID " << (cct->_conf->ms_async_rdma_local_gid)
49     << " of type " << (cct->_conf->ms_async_rdma_roce_ver) << dendl;
50   r = sscanf(cct->_conf->ms_async_rdma_local_gid.c_str(),
51              "%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx"
52              ":%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx:%02hhx%02hhx",
53              &cgid.raw[ 0], &cgid.raw[ 1],
54              &cgid.raw[ 2], &cgid.raw[ 3],
55              &cgid.raw[ 4], &cgid.raw[ 5],
56              &cgid.raw[ 6], &cgid.raw[ 7],
57              &cgid.raw[ 8], &cgid.raw[ 9],
58              &cgid.raw[10], &cgid.raw[11],
59              &cgid.raw[12], &cgid.raw[13],
60              &cgid.raw[14], &cgid.raw[15]);
61
62   if (r != 16) {
63     ldout(cct, 1) << __func__ << " malformed or no GID supplied, using GID index 0" << dendl;
64     malformed = true;
65   }
66
67   gid_attr.comp_mask = IBV_EXP_QUERY_GID_ATTR_TYPE;
68
69   for (gid_idx = 0; gid_idx < port_attr->gid_tbl_len; gid_idx++) {
70     r = ibv_query_gid(ctxt, port_num, gid_idx, &gid);
71     if (r) {
72       lderr(cct) << __func__  << " query gid of port " << port_num << " index " << gid_idx << " failed  " << cpp_strerror(errno) << dendl;
73       ceph_abort();
74     }
75     r = ibv_exp_query_gid_attr(ctxt, port_num, gid_idx, &gid_attr);
76     if (r) {
77       lderr(cct) << __func__  << " query gid attributes of port " << port_num << " index " << gid_idx << " failed  " << cpp_strerror(errno) << dendl;
78       ceph_abort();
79     }
80
81     if (malformed) break; // stay with gid_idx=0
82     if ( (gid_attr.type == cct->_conf->ms_async_rdma_roce_ver) &&
83          (memcmp(&gid, &cgid, 16) == 0) ) {
84       ldout(cct, 1) << __func__ << " found at index " << gid_idx << dendl;
85       break;
86     }
87   }
88
89   if (gid_idx == port_attr->gid_tbl_len) {
90     lderr(cct) << __func__ << " Requested local GID was not found in GID table" << dendl;
91     ceph_abort();
92   }
93 #else
94   int r = ibv_query_port(ctxt, port_num, port_attr);
95   if (r == -1) {
96     lderr(cct) << __func__  << " query port failed  " << cpp_strerror(errno) << dendl;
97     ceph_abort();
98   }
99
100   lid = port_attr->lid;
101   r = ibv_query_gid(ctxt, port_num, 0, &gid);
102   if (r) {
103     lderr(cct) << __func__  << " query gid failed  " << cpp_strerror(errno) << dendl;
104     ceph_abort();
105   }
106 #endif
107 }
108
109
110 Device::Device(CephContext *cct, ibv_device* d): device(d), device_attr(new ibv_device_attr), active_port(nullptr)
111 {
112   if (device == NULL) {
113     lderr(cct) << __func__ << " device == NULL" << cpp_strerror(errno) << dendl;
114     ceph_abort();
115   }
116   name = ibv_get_device_name(device);
117   ctxt = ibv_open_device(device);
118   if (ctxt == NULL) {
119     lderr(cct) << __func__ << " open rdma device failed. " << cpp_strerror(errno) << dendl;
120     ceph_abort();
121   }
122   int r = ibv_query_device(ctxt, device_attr);
123   if (r == -1) {
124     lderr(cct) << __func__ << " failed to query rdma device. " << cpp_strerror(errno) << dendl;
125     ceph_abort();
126   }
127 }
128
129 void Device::binding_port(CephContext *cct, int port_num) {
130   port_cnt = device_attr->phys_port_cnt;
131   for (uint8_t i = 0; i < port_cnt; ++i) {
132     Port *port = new Port(cct, ctxt, i+1);
133     if (i + 1 == port_num && port->get_port_attr()->state == IBV_PORT_ACTIVE) {
134       active_port = port;
135       ldout(cct, 1) << __func__ << " found active port " << i+1 << dendl;
136       break;
137     } else {
138       ldout(cct, 10) << __func__ << " port " << i+1 << " is not what we want. state: " << port->get_port_attr()->state << ")"<< dendl;
139     }
140     delete port;
141   }
142   if (nullptr == active_port) {
143     lderr(cct) << __func__ << "  port not found" << dendl;
144     assert(active_port);
145   }
146 }
147
148
149 Infiniband::QueuePair::QueuePair(
150     CephContext *c, Infiniband& infiniband, ibv_qp_type type,
151     int port, ibv_srq *srq,
152     Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
153     uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
154 : cct(c), infiniband(infiniband),
155   type(type),
156   ctxt(infiniband.device->ctxt),
157   ib_physical_port(port),
158   pd(infiniband.pd->pd),
159   srq(srq),
160   qp(NULL),
161   txcq(txcq),
162   rxcq(rxcq),
163   initial_psn(0),
164   max_send_wr(max_send_wr),
165   max_recv_wr(max_recv_wr),
166   q_key(q_key),
167   dead(false)
168 {
169   initial_psn = lrand48() & 0xffffff;
170   if (type != IBV_QPT_RC && type != IBV_QPT_UD && type != IBV_QPT_RAW_PACKET) {
171     lderr(cct) << __func__ << " invalid queue pair type" << cpp_strerror(errno) << dendl;
172     ceph_abort();
173   }
174   pd = infiniband.pd->pd;
175 }
176
177 int Infiniband::QueuePair::init()
178 {
179   ldout(cct, 20) << __func__ << " started." << dendl;
180   ibv_qp_init_attr qpia;
181   memset(&qpia, 0, sizeof(qpia));
182   qpia.send_cq = txcq->get_cq();
183   qpia.recv_cq = rxcq->get_cq();
184   qpia.srq = srq;                      // use the same shared receive queue
185   qpia.cap.max_send_wr  = max_send_wr; // max outstanding send requests
186   qpia.cap.max_send_sge = 1;           // max send scatter-gather elements
187   qpia.cap.max_inline_data = MAX_INLINE_DATA;          // max bytes of immediate data on send q
188   qpia.qp_type = type;                 // RC, UC, UD, or XRC
189   qpia.sq_sig_all = 0;                 // only generate CQEs on requested WQEs
190
191   qp = ibv_create_qp(pd, &qpia);
192   if (qp == NULL) {
193     lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
194     if (errno == ENOMEM) {
195       lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers, "
196                                 " ms_async_rdma_send_buffers or"
197                                 " ms_async_rdma_buffer_size" << dendl;
198     }
199     return -1;
200   }
201
202   ldout(cct, 20) << __func__ << " successfully create queue pair: "
203                  << "qp=" << qp << dendl;
204
205   // move from RESET to INIT state
206   ibv_qp_attr qpa;
207   memset(&qpa, 0, sizeof(qpa));
208   qpa.qp_state   = IBV_QPS_INIT;
209   qpa.pkey_index = 0;
210   qpa.port_num   = (uint8_t)(ib_physical_port);
211   qpa.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE;
212   qpa.qkey       = q_key;
213
214   int mask = IBV_QP_STATE | IBV_QP_PORT;
215   switch (type) {
216     case IBV_QPT_RC:
217       mask |= IBV_QP_ACCESS_FLAGS;
218       mask |= IBV_QP_PKEY_INDEX;
219       break;
220     case IBV_QPT_UD:
221       mask |= IBV_QP_QKEY;
222       mask |= IBV_QP_PKEY_INDEX;
223       break;
224     case IBV_QPT_RAW_PACKET:
225       break;
226     default:
227       ceph_abort();
228   }
229
230   int ret = ibv_modify_qp(qp, &qpa, mask);
231   if (ret) {
232     ibv_destroy_qp(qp);
233     lderr(cct) << __func__ << " failed to transition to INIT state: "
234                << cpp_strerror(errno) << dendl;
235     return -1;
236   }
237   ldout(cct, 20) << __func__ << " successfully change queue pair to INIT:"
238                  << " qp=" << qp << dendl;
239   return 0;
240 }
241
242 /**
243  * Change RC QueuePair into the ERROR state. This is necessary modify
244  * the Queue Pair into the Error state and poll all of the relevant
245  * Work Completions prior to destroying a Queue Pair.
246  * Since destroying a Queue Pair does not guarantee that its Work
247  * Completions are removed from the CQ upon destruction. Even if the
248  * Work Completions are already in the CQ, it might not be possible to
249  * retrieve them. If the Queue Pair is associated with an SRQ, it is
250  * recommended wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED
251  *
252  * \return
253  *      -errno if the QueuePair can't switch to ERROR
254  *      0 for success.
255  */
256 int Infiniband::QueuePair::to_dead()
257 {
258   if (dead)
259     return 0;
260   ibv_qp_attr qpa;
261   memset(&qpa, 0, sizeof(qpa));
262   qpa.qp_state = IBV_QPS_ERR;
263
264   int mask = IBV_QP_STATE;
265   int ret = ibv_modify_qp(qp, &qpa, mask);
266   if (ret) {
267     lderr(cct) << __func__ << " failed to transition to ERROR state: "
268                << cpp_strerror(errno) << dendl;
269     return -errno;
270   }
271   dead = true;
272   return ret;
273 }
274
275 int Infiniband::QueuePair::get_remote_qp_number(uint32_t *rqp) const
276 {
277   ibv_qp_attr qpa;
278   ibv_qp_init_attr qpia;
279
280   int r = ibv_query_qp(qp, &qpa, IBV_QP_DEST_QPN, &qpia);
281   if (r) {
282     lderr(cct) << __func__ << " failed to query qp: "
283       << cpp_strerror(errno) << dendl;
284     return -1;
285   }
286
287   if (rqp)
288     *rqp = qpa.dest_qp_num;
289   return 0;
290 }
291
292 /**
293  * Get the remote infiniband address for this QueuePair, as set in #plumb().
294  * LIDs are "local IDs" in infiniband terminology. They are short, locally
295  * routable addresses.
296  */
297 int Infiniband::QueuePair::get_remote_lid(uint16_t *lid) const
298 {
299   ibv_qp_attr qpa;
300   ibv_qp_init_attr qpia;
301
302   int r = ibv_query_qp(qp, &qpa, IBV_QP_AV, &qpia);
303   if (r) {
304     lderr(cct) << __func__ << " failed to query qp: "
305       << cpp_strerror(errno) << dendl;
306     return -1;
307   }
308
309   if (lid)
310     *lid = qpa.ah_attr.dlid;
311   return 0;
312 }
313
314 /**
315  * Get the state of a QueuePair.
316  */
317 int Infiniband::QueuePair::get_state() const
318 {
319   ibv_qp_attr qpa;
320   ibv_qp_init_attr qpia;
321
322   int r = ibv_query_qp(qp, &qpa, IBV_QP_STATE, &qpia);
323   if (r) {
324     lderr(cct) << __func__ << " failed to get state: "
325       << cpp_strerror(errno) << dendl;
326     return -1;
327   }
328   return qpa.qp_state;
329 }
330
331 /**
332  * Return true if the queue pair is in an error state, false otherwise.
333  */
334 bool Infiniband::QueuePair::is_error() const
335 {
336   ibv_qp_attr qpa;
337   ibv_qp_init_attr qpia;
338
339   int r = ibv_query_qp(qp, &qpa, -1, &qpia);
340   if (r) {
341     lderr(cct) << __func__ << " failed to get state: "
342       << cpp_strerror(errno) << dendl;
343     return true;
344   }
345   return qpa.cur_qp_state == IBV_QPS_ERR;
346 }
347
348
349 Infiniband::CompletionChannel::CompletionChannel(CephContext *c, Infiniband &ib)
350   : cct(c), infiniband(ib), channel(NULL), cq(NULL), cq_events_that_need_ack(0)
351 {
352 }
353
354 Infiniband::CompletionChannel::~CompletionChannel()
355 {
356   if (channel) {
357     int r = ibv_destroy_comp_channel(channel);
358     if (r < 0)
359       lderr(cct) << __func__ << " failed to destroy cc: " << cpp_strerror(errno) << dendl;
360     assert(r == 0);
361   }
362 }
363
364 int Infiniband::CompletionChannel::init()
365 {
366   ldout(cct, 20) << __func__ << " started." << dendl;
367   channel = ibv_create_comp_channel(infiniband.device->ctxt);
368   if (!channel) {
369     lderr(cct) << __func__ << " failed to create receive completion channel: "
370                           << cpp_strerror(errno) << dendl;
371     return -1;
372   }
373   int rc = NetHandler(cct).set_nonblock(channel->fd);
374   if (rc < 0) {
375     ibv_destroy_comp_channel(channel);
376     return -1;
377   }
378   return 0;
379 }
380
381 void Infiniband::CompletionChannel::ack_events()
382 {
383   ibv_ack_cq_events(cq, cq_events_that_need_ack);
384   cq_events_that_need_ack = 0;
385 }
386
387 bool Infiniband::CompletionChannel::get_cq_event()
388 {
389   ibv_cq *cq = NULL;
390   void *ev_ctx;
391   if (ibv_get_cq_event(channel, &cq, &ev_ctx)) {
392     if (errno != EAGAIN && errno != EINTR)
393       lderr(cct) << __func__ << " failed to retrieve CQ event: "
394                  << cpp_strerror(errno) << dendl;
395     return false;
396   }
397
398   /* accumulate number of cq events that need to
399    *    * be acked, and periodically ack them
400    *       */
401   if (++cq_events_that_need_ack == MAX_ACK_EVENT) {
402     ldout(cct, 20) << __func__ << " ack aq events." << dendl;
403     ibv_ack_cq_events(cq, MAX_ACK_EVENT);
404     cq_events_that_need_ack = 0;
405   }
406
407   return true;
408 }
409
410
411 Infiniband::CompletionQueue::~CompletionQueue()
412 {
413   if (cq) {
414     int r = ibv_destroy_cq(cq);
415     if (r < 0)
416       lderr(cct) << __func__ << " failed to destroy cq: " << cpp_strerror(errno) << dendl;
417     assert(r == 0);
418   }
419 }
420
421 int Infiniband::CompletionQueue::init()
422 {
423   cq = ibv_create_cq(infiniband.device->ctxt, queue_depth, this, channel->get_channel(), 0);
424   if (!cq) {
425     lderr(cct) << __func__ << " failed to create receive completion queue: "
426       << cpp_strerror(errno) << dendl;
427     return -1;
428   }
429
430   if (ibv_req_notify_cq(cq, 0)) {
431     lderr(cct) << __func__ << " ibv_req_notify_cq failed: " << cpp_strerror(errno) << dendl;
432     ibv_destroy_cq(cq);
433     cq = nullptr;
434     return -1;
435   }
436
437   channel->bind_cq(cq);
438   ldout(cct, 20) << __func__ << " successfully create cq=" << cq << dendl;
439   return 0;
440 }
441
442 int Infiniband::CompletionQueue::rearm_notify(bool solicite_only)
443 {
444   ldout(cct, 20) << __func__ << " started." << dendl;
445   int r = ibv_req_notify_cq(cq, 0);
446   if (r < 0)
447     lderr(cct) << __func__ << " failed to notify cq: " << cpp_strerror(errno) << dendl;
448   return r;
449 }
450
451 int Infiniband::CompletionQueue::poll_cq(int num_entries, ibv_wc *ret_wc_array) {
452   int r = ibv_poll_cq(cq, num_entries, ret_wc_array);
453   if (r < 0) {
454     lderr(cct) << __func__ << " poll_completion_queue occur met error: "
455       << cpp_strerror(errno) << dendl;
456     return -1;
457   }
458   return r;
459 }
460
461
462 Infiniband::ProtectionDomain::ProtectionDomain(CephContext *cct, Device *device)
463   : pd(ibv_alloc_pd(device->ctxt))
464 {
465   if (pd == NULL) {
466     lderr(cct) << __func__ << " failed to allocate infiniband protection domain: " << cpp_strerror(errno) << dendl;
467     ceph_abort();
468   }
469 }
470
471 Infiniband::ProtectionDomain::~ProtectionDomain()
472 {
473   ibv_dealloc_pd(pd);
474 }
475
476
477 Infiniband::MemoryManager::Chunk::Chunk(ibv_mr* m, uint32_t len, char* b)
478   : mr(m), bytes(len), offset(0), buffer(b)
479 {
480 }
481
482 Infiniband::MemoryManager::Chunk::~Chunk()
483 {
484 }
485
486 void Infiniband::MemoryManager::Chunk::set_offset(uint32_t o)
487 {
488   offset = o;
489 }
490
491 uint32_t Infiniband::MemoryManager::Chunk::get_offset()
492 {
493   return offset;
494 }
495
496 void Infiniband::MemoryManager::Chunk::set_bound(uint32_t b)
497 {
498   bound = b;
499 }
500
501 void Infiniband::MemoryManager::Chunk::prepare_read(uint32_t b)
502 {
503   offset = 0;
504   bound = b;
505 }
506
507 uint32_t Infiniband::MemoryManager::Chunk::get_bound()
508 {
509   return bound;
510 }
511
512 uint32_t Infiniband::MemoryManager::Chunk::read(char* buf, uint32_t len)
513 {
514   uint32_t left = bound - offset;
515   if (left >= len) {
516     memcpy(buf, buffer+offset, len);
517     offset += len;
518     return len;
519   } else {
520     memcpy(buf, buffer+offset, left);
521     offset = 0;
522     bound = 0;
523     return left;
524   }
525 }
526
527 uint32_t Infiniband::MemoryManager::Chunk::write(char* buf, uint32_t len)
528 {
529   uint32_t left = bytes - offset;
530   if (left >= len) {
531     memcpy(buffer+offset, buf, len);
532     offset += len;
533     return len;
534   } else {
535     memcpy(buffer+offset, buf, left);
536     offset = bytes;
537     return left;
538   }
539 }
540
541 bool Infiniband::MemoryManager::Chunk::full()
542 {
543   return offset == bytes;
544 }
545
546 bool Infiniband::MemoryManager::Chunk::over()
547 {
548   return Infiniband::MemoryManager::Chunk::offset == bound;
549 }
550
551 void Infiniband::MemoryManager::Chunk::clear()
552 {
553   offset = 0;
554   bound = 0;
555 }
556
557 void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
558 {
559   ib->post_chunk(this);
560 }
561
562 Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
563   : manager(m), buffer_size(s), lock("cluster_lock")
564 {
565 }
566
567 Infiniband::MemoryManager::Cluster::~Cluster()
568 {
569   int r = ibv_dereg_mr(chunk_base->mr);
570   assert(r == 0);
571   const auto chunk_end = chunk_base + num_chunk;
572   for (auto chunk = chunk_base; chunk != chunk_end; chunk++) {
573     chunk->~Chunk();
574   }
575
576   ::free(chunk_base);
577   if (manager.enabled_huge_page)
578     manager.free_huge_pages(base);
579   else
580     ::free(base);
581 }
582
583 int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
584 {
585   assert(!base);
586   num_chunk = num;
587   uint32_t bytes = buffer_size * num;
588   if (manager.enabled_huge_page) {
589     base = (char*)manager.malloc_huge_pages(bytes);
590   } else {
591     base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
592   }
593   end = base + bytes;
594   assert(base);
595   chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
596   memset(chunk_base, 0, sizeof(Chunk) * num);
597   free_chunks.reserve(num);
598   ibv_mr* m = ibv_reg_mr(manager.pd->pd, base, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
599   assert(m);
600   Chunk* chunk = chunk_base;
601   for (uint32_t offset = 0; offset < bytes; offset += buffer_size){
602     new(chunk) Chunk(m, buffer_size, base+offset);
603     free_chunks.push_back(chunk);
604     chunk++;
605   }
606   return 0;
607 }
608
609 void Infiniband::MemoryManager::Cluster::take_back(std::vector<Chunk*> &ck)
610 {
611   Mutex::Locker l(lock);
612   for (auto c : ck) {
613     c->clear();
614     free_chunks.push_back(c);
615   }
616 }
617
618 int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks, size_t bytes)
619 {
620   uint32_t num = bytes / buffer_size + 1;
621   if (bytes % buffer_size == 0)
622     --num;
623   int r = num;
624   Mutex::Locker l(lock);
625   if (free_chunks.empty())
626     return 0;
627   if (!bytes) {
628     r = free_chunks.size();
629     for (auto c : free_chunks)
630       chunks.push_back(c);
631     free_chunks.clear();
632     return r;
633   }
634   if (free_chunks.size() < num) {
635     num = free_chunks.size();
636     r = num;
637   }
638   for (uint32_t i = 0; i < num; ++i) {
639     chunks.push_back(free_chunks.back());
640     free_chunks.pop_back();
641   }
642   return r;
643 }
644
645
646 Infiniband::MemoryManager::MemoryManager(Device *d, ProtectionDomain *p, bool hugepage)
647   : device(d), pd(p)
648 {
649   enabled_huge_page = hugepage;
650 }
651
652 Infiniband::MemoryManager::~MemoryManager()
653 {
654   if (channel)
655     delete channel;
656   if (send)
657     delete send;
658 }
659
660 void* Infiniband::MemoryManager::malloc_huge_pages(size_t size)
661 {
662   size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
663   char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
664   if (ptr == MAP_FAILED) {
665     ptr = (char *)malloc(real_size);
666     if (ptr == NULL) return NULL;
667     real_size = 0;
668   }
669   *((size_t *)ptr) = real_size;
670   return ptr + HUGE_PAGE_SIZE;
671 }
672
673 void Infiniband::MemoryManager::free_huge_pages(void *ptr)
674 {
675   if (ptr == NULL) return;
676   void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
677   size_t real_size = *((size_t *)real_ptr);
678   assert(real_size % HUGE_PAGE_SIZE == 0);
679   if (real_size != 0)
680     munmap(real_ptr, real_size);
681   else
682     free(real_ptr);
683 }
684
685 void Infiniband::MemoryManager::register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num)
686 {
687   assert(device);
688   assert(pd);
689   channel = new Cluster(*this, size);
690   channel->fill(rx_num);
691
692   send = new Cluster(*this, size);
693   send->fill(tx_num);
694 }
695
696 void Infiniband::MemoryManager::return_tx(std::vector<Chunk*> &chunks)
697 {
698   send->take_back(chunks);
699 }
700
701 int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t bytes)
702 {
703   return send->get_buffers(c, bytes);
704 }
705
706 int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes)
707 {
708   return channel->get_buffers(chunks, bytes);
709 }
710
711
712 Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
713   : cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
714 {
715 }
716
717 void Infiniband::init()
718 {
719   Mutex::Locker l(lock);
720
721   if (initialized)
722     return;
723
724   device_list = new DeviceList(cct);
725   initialized = true;
726
727   device = device_list->get_device(device_name.c_str());
728   device->binding_port(cct, port_num);
729   assert(device);
730   ib_physical_port = device->active_port->get_port_num();
731   pd = new ProtectionDomain(cct, device);
732   assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);
733
734   max_recv_wr = device->device_attr->max_srq_wr;
735   if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) {
736     max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;
737     ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
738   } else {
739     ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl;
740   }
741
742   max_send_wr = device->device_attr->max_qp_wr;
743   if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) {
744     max_send_wr = cct->_conf->ms_async_rdma_send_buffers;
745     ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers"  << dendl;
746   } else {
747     ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl;
748   }
749
750   ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe
751                 << " completion entries" << dendl;
752
753   memory_manager = new MemoryManager(device, pd,
754                                      cct->_conf->ms_async_rdma_enable_hugepage);
755   memory_manager->register_rx_tx(
756       cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
757
758   srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
759   post_channel_cluster();
760
761   dispatcher->polling_start();
762 }
763
764 Infiniband::~Infiniband()
765 {
766   if (!initialized)
767     return;
768
769   if (dispatcher)
770     dispatcher->polling_stop();
771
772   ibv_destroy_srq(srq);
773   delete memory_manager;
774   delete pd;
775 }
776
777 void Infiniband::set_dispatcher(RDMADispatcher *d)
778 {
779   assert(!d ^ !dispatcher);
780
781   dispatcher = d;
782 }
783
784 /**
785  * Create a shared receive queue. This basically wraps the verbs call. 
786  *
787  * \param[in] max_wr
788  *      The max number of outstanding work requests in the SRQ.
789  * \param[in] max_sge
790  *      The max number of scatter elements per WR.
791  * \return
792  *      A valid ibv_srq pointer, or NULL on error.
793  */
794 ibv_srq* Infiniband::create_shared_receive_queue(uint32_t max_wr, uint32_t max_sge)
795 {
796   ibv_srq_init_attr sia;
797   memset(&sia, 0, sizeof(sia));
798   sia.srq_context = device->ctxt;
799   sia.attr.max_wr = max_wr;
800   sia.attr.max_sge = max_sge;
801   return ibv_create_srq(pd->pd, &sia);
802 }
803
804 int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
805 {
806   return memory_manager->get_send_buffers(c, bytes);
807 }
808
809 /**
810  * Create a new QueuePair. This factory should be used in preference to
811  * the QueuePair constructor directly, since this lets derivatives of
812  * Infiniband, e.g. MockInfiniband (if it existed),
813  * return mocked out QueuePair derivatives.
814  *
815  * \return
816  *      QueuePair on success or NULL if init fails
817  * See QueuePair::QueuePair for parameter documentation.
818  */
819 Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)
820 {
821   Infiniband::QueuePair *qp = new QueuePair(
822       cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);
823   if (qp->init()) {
824     delete qp;
825     return NULL;
826   }
827   return qp;
828 }
829
830 int Infiniband::post_chunk(Chunk* chunk)
831 {
832   ibv_sge isge;
833   isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
834   isge.length = chunk->bytes;
835   isge.lkey = chunk->mr->lkey;
836   ibv_recv_wr rx_work_request;
837
838   memset(&rx_work_request, 0, sizeof(rx_work_request));
839   rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
840   rx_work_request.next = NULL;
841   rx_work_request.sg_list = &isge;
842   rx_work_request.num_sge = 1;
843
844   ibv_recv_wr *badWorkRequest;
845   int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
846   if (ret)
847     return -errno;
848   return 0;
849 }
850
851 int Infiniband::post_channel_cluster()
852 {
853   vector<Chunk*> free_chunks;
854   int r = memory_manager->get_channel_buffers(free_chunks, 0);
855   assert(r > 0);
856   for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
857     r = post_chunk(*iter);
858     assert(r == 0);
859   }
860   return 0;
861 }
862
863 Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
864 {
865   Infiniband::CompletionChannel *cc = new Infiniband::CompletionChannel(c, *this);
866   if (cc->init()) {
867     delete cc;
868     return NULL;
869   }
870   return cc;
871 }
872
873 Infiniband::CompletionQueue* Infiniband::create_comp_queue(
874     CephContext *cct, CompletionChannel *cc)
875 {
876   Infiniband::CompletionQueue *cq = new Infiniband::CompletionQueue(
877       cct, *this, CQ_DEPTH, cc);
878   if (cq->init()) {
879     delete cq;
880     return NULL;
881   }
882   return cq;
883 }
884
885 // 1 means no valid buffer read, 0 means got enough buffer
886 // else return < 0 means error
887 int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
888 {
889   char msg[TCP_MSG_LEN];
890   char gid[33];
891   ssize_t r = ::read(sd, &msg, sizeof(msg));
892   // Drop incoming qpt
893   if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
894     if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
895       ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
896       return -EINVAL;
897     }
898   }
899   if (r < 0) {
900     r = -errno;
901     lderr(cct) << __func__ << " got error " << r << ": "
902                << cpp_strerror(r) << dendl;
903   } else if (r == 0) { // valid disconnect message of length 0
904     ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
905   } else if ((size_t)r != sizeof(msg)) { // invalid message
906     ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
907     r = -EINVAL;
908   } else { // valid message
909     sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
910     wire_gid_to_gid(gid, &(im.gid));
911     ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid  << dendl;
912   }
913   return r;
914 }
915
916 int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
917 {
918   int retry = 0;
919   ssize_t r;
920
921   char msg[TCP_MSG_LEN];
922   char gid[33];
923 retry:
924   gid_to_wire_gid(&(im.gid), gid);
925   sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
926   ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
927                  << ", " << im.peer_qpn << ", "  << gid  << dendl;
928   r = ::write(sd, msg, sizeof(msg));
929   // Drop incoming qpt
930   if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
931     if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
932       ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
933       return -EINVAL;
934     }
935   }
936
937   if ((size_t)r != sizeof(msg)) {
938     // FIXME need to handle EAGAIN instead of retry
939     if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
940       retry++;
941       goto retry;
942     }
943     if (r < 0)
944       lderr(cct) << __func__ << " send returned error " << errno << ": "
945                  << cpp_strerror(errno) << dendl;
946     else
947       lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
948     return -errno;
949   }
950   return 0;
951 }
952
953 void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
954 {
955   char tmp[9];
956   uint32_t v32;
957   int i;
958
959   for (tmp[8] = 0, i = 0; i < 4; ++i) {
960     memcpy(tmp, wgid + i * 8, 8);
961     sscanf(tmp, "%x", &v32);
962     *(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
963   }
964 }
965
966 void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
967 {
968   for (int i = 0; i < 4; ++i)
969     sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
970 }
971
972 Infiniband::QueuePair::~QueuePair()
973 {
974   if (qp) {
975     ldout(cct, 20) << __func__ << " destroy qp=" << qp << dendl;
976     assert(!ibv_destroy_qp(qp));
977   }
978 }
979
980 /**
981  * Given a string representation of the `status' field from Verbs
982  * struct `ibv_wc'.
983  *
984  * \param[in] status
985  *      The integer status obtained in ibv_wc.status.
986  * \return
987  *      A string corresponding to the given status.
988  */
989 const char* Infiniband::wc_status_to_string(int status)
990 {
991   static const char *lookup[] = {
992       "SUCCESS",
993       "LOC_LEN_ERR",
994       "LOC_QP_OP_ERR",
995       "LOC_EEC_OP_ERR",
996       "LOC_PROT_ERR",
997       "WR_FLUSH_ERR",
998       "MW_BIND_ERR",
999       "BAD_RESP_ERR",
1000       "LOC_ACCESS_ERR",
1001       "REM_INV_REQ_ERR",
1002       "REM_ACCESS_ERR",
1003       "REM_OP_ERR",
1004       "RETRY_EXC_ERR",
1005       "RNR_RETRY_EXC_ERR",
1006       "LOC_RDD_VIOL_ERR",
1007       "REM_INV_RD_REQ_ERR",
1008       "REM_ABORT_ERR",
1009       "INV_EECN_ERR",
1010       "INV_EEC_STATE_ERR",
1011       "FATAL_ERR",
1012       "RESP_TIMEOUT_ERR",
1013       "GENERAL_ERR"
1014   };
1015
1016   if (status < IBV_WC_SUCCESS || status > IBV_WC_GENERAL_ERR)
1017     return "<status out of range!>";
1018   return lookup[status];
1019 }
1020
1021 const char* Infiniband::qp_state_string(int status) {
1022   switch(status) {
1023     case IBV_QPS_RESET : return "IBV_QPS_RESET";
1024     case IBV_QPS_INIT  : return "IBV_QPS_INIT";
1025     case IBV_QPS_RTR   : return "IBV_QPS_RTR";
1026     case IBV_QPS_RTS   : return "IBV_QPS_RTS";
1027     case IBV_QPS_SQD   : return "IBV_QPS_SQD";
1028     case IBV_QPS_SQE   : return "IBV_QPS_SQE";
1029     case IBV_QPS_ERR   : return "IBV_QPS_ERR";
1030     default: return " out of range.";
1031   }
1032 }