Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / rdma / RDMAConnectedSocketImpl.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 XSKY <haomai@xsky.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #include "RDMAStack.h"
18
19 #define dout_subsys ceph_subsys_ms
20 #undef dout_prefix
21 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "
22
23 RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
24                                                  RDMAWorker *w)
25   : cct(cct), connected(0), error(0), infiniband(ib),
26     dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
27     is_server(false), con_handler(new C_handle_connection(this)),
28     active(false), pending(false)
29 {
30   qp = infiniband->create_queue_pair(
31                                      cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC);
32   my_msg.qpn = qp->get_local_qp_number();
33   my_msg.psn = qp->get_initial_psn();
34   my_msg.lid = infiniband->get_lid();
35   my_msg.peer_qpn = 0;
36   my_msg.gid = infiniband->get_gid();
37   notify_fd = dispatcher->register_qp(qp, this);
38   dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
39   dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
40 }
41
42 RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
43 {
44   ldout(cct, 20) << __func__ << " destruct." << dendl;
45   cleanup();
46   worker->remove_pending_conn(this);
47   dispatcher->erase_qpn(my_msg.qpn);
48   Mutex::Locker l(lock);
49   if (notify_fd >= 0)
50     ::close(notify_fd);
51   if (tcp_fd >= 0)
52     ::close(tcp_fd);
53   error = ECONNRESET;
54   int ret = 0;
55   for (unsigned i=0; i < wc.size(); ++i) {
56     ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
57     assert(ret == 0);
58     dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
59   }
60   for (unsigned i=0; i < buffers.size(); ++i) {
61     ret = infiniband->post_chunk(buffers[i]);
62     assert(ret == 0);
63     dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
64   }
65 }
66
67 void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
68 {
69   Mutex::Locker l(lock);
70   if (wc.empty())
71     wc = std::move(v);
72   else
73     wc.insert(wc.end(), v.begin(), v.end());
74   notify();
75 }
76
77 void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
78 {
79   Mutex::Locker l(lock);
80   if (wc.empty())
81     return ;
82   w.swap(wc);
83 }
84
85 int RDMAConnectedSocketImpl::activate()
86 {
87   ibv_qp_attr qpa;
88   int r;
89
90   // now connect up the qps and switch to RTR
91   memset(&qpa, 0, sizeof(qpa));
92   qpa.qp_state = IBV_QPS_RTR;
93   qpa.path_mtu = IBV_MTU_1024;
94   qpa.dest_qp_num = peer_msg.qpn;
95   qpa.rq_psn = peer_msg.psn;
96   qpa.max_dest_rd_atomic = 1;
97   qpa.min_rnr_timer = 12;
98   //qpa.ah_attr.is_global = 0;
99   qpa.ah_attr.is_global = 1;
100   qpa.ah_attr.grh.hop_limit = 6;
101   qpa.ah_attr.grh.dgid = peer_msg.gid;
102
103   qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
104
105   qpa.ah_attr.dlid = peer_msg.lid;
106   qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
107   qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
108   qpa.ah_attr.src_path_bits = 0;
109   qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
110
111   ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
112
113   r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
114       IBV_QP_AV |
115       IBV_QP_PATH_MTU |
116       IBV_QP_DEST_QPN |
117       IBV_QP_RQ_PSN |
118       IBV_QP_MIN_RNR_TIMER |
119       IBV_QP_MAX_DEST_RD_ATOMIC);
120   if (r) {
121     lderr(cct) << __func__ << " failed to transition to RTR state: "
122                << cpp_strerror(errno) << dendl;
123     return -1;
124   }
125
126   ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
127
128   // now move to RTS
129   qpa.qp_state = IBV_QPS_RTS;
130
131   // How long to wait before retrying if packet lost or server dead.
132   // Supposedly the timeout is 4.096us*2^timeout.  However, the actual
133   // timeout appears to be 4.096us*2^(timeout+1), so the setting
134   // below creates a 135ms timeout.
135   qpa.timeout = 14;
136
137   // How many times to retry after timeouts before giving up.
138   qpa.retry_cnt = 7;
139
140   // How many times to retry after RNR (receiver not ready) condition
141   // before giving up. Occurs when the remote side has not yet posted
142   // a receive request.
143   qpa.rnr_retry = 7; // 7 is infinite retry.
144   qpa.sq_psn = my_msg.psn;
145   qpa.max_rd_atomic = 1;
146
147   r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
148       IBV_QP_TIMEOUT |
149       IBV_QP_RETRY_CNT |
150       IBV_QP_RNR_RETRY |
151       IBV_QP_SQ_PSN |
152       IBV_QP_MAX_QP_RD_ATOMIC);
153   if (r) {
154     lderr(cct) << __func__ << " failed to transition to RTS state: "
155                << cpp_strerror(errno) << dendl;
156     return -1;
157   }
158
159   // the queue pair should be ready to use once the client has finished
160   // setting up their end.
161   ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
162   ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl;
163
164   if (!is_server) {
165     connected = 1; //indicate successfully
166     ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
167     submit(false);
168   }
169   active = true;
170
171   return 0;
172 }
173
174 int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
175   ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
176                  << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
177   NetHandler net(cct);
178   tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
179
180   if (tcp_fd < 0) {
181     return -errno;
182   }
183   net.set_close_on_exec(tcp_fd);
184
185   int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);
186   if (r < 0) {
187     ::close(tcp_fd);
188     tcp_fd = -1;
189     return -errno;
190   }
191
192   ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
193   net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
194   my_msg.peer_qpn = 0;
195   r = infiniband->send_msg(cct, tcp_fd, my_msg);
196   if (r < 0)
197     return r;
198
199   worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
200   return 0;
201 }
202
203 void RDMAConnectedSocketImpl::handle_connection() {
204   ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
205   int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
206   if (r < 0) {
207     if (r != -EAGAIN) {
208       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
209       ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
210       fault();
211     }
212     return;
213   }
214
215   if (!is_server) {// syn + ack from server
216     my_msg.peer_qpn = peer_msg.qpn;
217     ldout(cct, 20) << __func__ << " peer msg :  < " << peer_msg.qpn << ", " << peer_msg.psn
218                    <<  ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;
219     if (!connected) {
220       r = activate();
221       assert(!r);
222     }
223     notify();
224     r = infiniband->send_msg(cct, tcp_fd, my_msg);
225     if (r < 0) {
226       ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
227       dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
228       fault();
229     }
230   } else {
231     if (peer_msg.peer_qpn == 0) {// syn from client
232       if (active) {
233         ldout(cct, 10) << __func__ << " server is already active." << dendl;
234         return ;
235       }
236       r = infiniband->send_msg(cct, tcp_fd, my_msg);
237       if (r < 0) {
238         ldout(cct, 1) << __func__ << " server ack failed." << dendl;
239         dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
240         fault();
241         return ;
242       }
243       r = activate();
244       assert(!r);
245     } else { // ack from client
246       connected = 1;
247       cleanup();
248       submit(false);
249       notify();
250     }
251   }
252 }
253
254 ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
255 {
256   uint64_t i = 0;
257   int r = ::read(notify_fd, &i, sizeof(i));
258   ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;
259   ssize_t read = 0;
260   if (!buffers.empty())
261     read = read_buffers(buf,len);
262
263   std::vector<ibv_wc> cqe;
264   get_wc(cqe);
265   if (cqe.empty()) {
266     if (!buffers.empty()) {
267       notify();
268     }
269     if (read > 0) {
270       return read;
271     }
272     if (error) {
273       return -error;
274     } else {
275       return -EAGAIN;
276     }
277   }
278
279   ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
280   for (size_t i = 0; i < cqe.size(); ++i) {
281     ibv_wc* response = &cqe[i];
282     assert(response->status == IBV_WC_SUCCESS);
283     Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
284     ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
285     chunk->prepare_read(response->byte_len);
286     worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
287     if (response->byte_len == 0) {
288       dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
289       if (connected) {
290         error = ECONNRESET;
291         ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
292       }
293       assert(infiniband->post_chunk(chunk) == 0);
294       dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
295     } else {
296       if (read == (ssize_t)len) {
297         buffers.push_back(chunk);
298         ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
299       } else if (read + response->byte_len > (ssize_t)len) {
300         read += chunk->read(buf+read, (ssize_t)len-read);
301         buffers.push_back(chunk);
302         ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
303       } else {
304         read += chunk->read(buf+read, response->byte_len);
305         assert(infiniband->post_chunk(chunk) == 0);
306         dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
307       }
308     }
309   }
310
311   worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
312   if (is_server && connected == 0) {
313     ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
314     connected = 1; //if so, we don't need the last handshake
315     cleanup();
316     submit(false);
317   }
318
319   if (!buffers.empty()) {
320     notify();
321   }
322
323   if (read == 0 && error)
324     return -error;
325   return read == 0 ? -EAGAIN : read;
326 }
327
328 ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
329 {
330   size_t read = 0, tmp = 0;
331   auto c = buffers.begin();
332   for (; c != buffers.end() ; ++c) {
333     tmp = (*c)->read(buf+read, len-read);
334     read += tmp;
335     ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound()  << ". Chunk:" << *c  << dendl;
336     if ((*c)->over()) {
337       assert(infiniband->post_chunk(*c) == 0);
338       dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
339       ldout(cct, 25) << __func__ << " one chunk over." << dendl;
340     }
341     if (read == len) {
342       break;
343     }
344   }
345
346   if (c != buffers.end() && (*c)->over())
347     ++c;
348   buffers.erase(buffers.begin(), c);
349   ldout(cct, 25) << __func__ << " got " << read  << " bytes, buffers size: " << buffers.size() << dendl;
350   return read;
351 }
352
353 ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
354 {
355   if (error)
356     return -error;
357   static const int MAX_COMPLETIONS = 16;
358   ibv_wc wc[MAX_COMPLETIONS];
359   ssize_t size = 0;
360
361   ibv_wc*  response;
362   Chunk* chunk;
363   bool loaded = false;
364   auto iter = buffers.begin();
365   if (iter != buffers.end()) {
366     chunk = *iter;
367     // FIXME need to handle release
368     // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
369     buffers.erase(iter);
370     loaded = true;
371     size = chunk->bound;
372   }
373
374   std::vector<ibv_wc> cqe;
375   get_wc(cqe);
376   if (cqe.empty())
377     return size == 0 ? -EAGAIN : size;
378
379   ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
380
381   for (size_t i = 0; i < cqe.size(); ++i) {
382     response = &wc[i];
383     chunk = reinterpret_cast<Chunk*>(response->wr_id);
384     chunk->prepare_read(response->byte_len);
385     if (!loaded && i == 0) {
386       // FIXME need to handle release
387       // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
388       size = chunk->bound;
389       continue;
390     }
391     buffers.push_back(chunk);
392     iter++;
393   }
394
395   if (size == 0)
396     return -EAGAIN;
397   return size;
398 }
399
400 ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
401 {
402   if (error) {
403     if (!active)
404       return -EPIPE;
405     return -error;
406   }
407   size_t bytes = bl.length();
408   if (!bytes)
409     return 0;
410   {
411     Mutex::Locker l(lock);
412     pending_bl.claim_append(bl);
413     if (!connected) {
414       ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
415       return bytes;
416     }
417   }
418   ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
419   ssize_t r = submit(more);
420   if (r < 0 && r != -EAGAIN)
421     return r;
422   return bytes;
423 }
424
425 ssize_t RDMAConnectedSocketImpl::submit(bool more)
426 {
427   if (error)
428     return -error;
429   Mutex::Locker l(lock);
430   size_t bytes = pending_bl.length();
431   ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
432                  << pending_bl.buffers().size() << dendl;
433   if (!bytes)
434     return 0;
435
436   auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers, unsigned bytes,
437                                  std::list<bufferptr>::const_iterator &start,
438                                  std::list<bufferptr>::const_iterator &end) -> unsigned {
439     assert(start != end);
440     auto chunk_idx = tx_buffers.size();
441     int ret = worker->get_reged_mem(this, tx_buffers, bytes);
442     if (ret == 0) {
443       ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
444       worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
445       return 0;
446     }
447
448     unsigned total_copied = 0;
449     Chunk *current_chunk = tx_buffers[chunk_idx];
450     while (start != end) {
451       const uintptr_t addr = reinterpret_cast<const uintptr_t>(start->c_str());
452       unsigned copied = 0;
453       while (copied < start->length()) {
454         uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied);
455         copied += r;
456         total_copied += r;
457         bytes -= r;
458         if (current_chunk->full()){
459           current_chunk = tx_buffers[++chunk_idx];
460           if (chunk_idx == tx_buffers.size())
461             return total_copied;
462         }
463       }
464       ++start;
465     }
466     assert(bytes == 0);
467     return total_copied;
468   };
469
470   std::vector<Chunk*> tx_buffers;
471   std::list<bufferptr>::const_iterator it = pending_bl.buffers().begin();
472   std::list<bufferptr>::const_iterator copy_it = it;
473   unsigned total = 0;
474   unsigned need_reserve_bytes = 0;
475   while (it != pending_bl.buffers().end()) {
476     if (infiniband->is_tx_buffer(it->raw_c_str())) {
477       if (need_reserve_bytes) {
478         unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
479         total += copied;
480         if (copied < need_reserve_bytes)
481           goto sending;
482         need_reserve_bytes = 0;
483       }
484       assert(copy_it == it);
485       tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
486       total += it->length();
487       ++copy_it;
488     } else {
489       need_reserve_bytes += it->length();
490     }
491     ++it;
492   }
493   if (need_reserve_bytes)
494     total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
495
496  sending:
497   if (total == 0)
498     return -EAGAIN;
499   assert(total <= pending_bl.length());
500   bufferlist swapped;
501   if (total < pending_bl.length()) {
502     worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
503     pending_bl.splice(total, pending_bl.length()-total, &swapped);
504     pending_bl.swap(swapped);
505   } else {
506     pending_bl.clear();
507   }
508
509   ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
510                  << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl;
511
512   int r = post_work_request(tx_buffers);
513   if (r < 0)
514     return r;
515
516   ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;
517   return pending_bl.length() ? -EAGAIN : 0;
518 }
519
520 int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
521 {
522   ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
523   vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
524   ibv_sge isge[tx_buffers.size()];
525   uint32_t current_sge = 0;
526   ibv_send_wr iswr[tx_buffers.size()];
527   uint32_t current_swr = 0;
528   ibv_send_wr* pre_wr = NULL;
529
530   memset(iswr, 0, sizeof(iswr));
531   memset(isge, 0, sizeof(isge));
532   current_buffer = tx_buffers.begin();
533   while (current_buffer != tx_buffers.end()) {
534     isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);
535     isge[current_sge].length = (*current_buffer)->get_offset();
536     isge[current_sge].lkey = (*current_buffer)->mr->lkey;
537     ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length  << dendl;
538
539     iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);
540     iswr[current_swr].next = NULL;
541     iswr[current_swr].sg_list = &isge[current_sge];
542     iswr[current_swr].num_sge = 1;
543     iswr[current_swr].opcode = IBV_WR_SEND;
544     iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
545     /*if (isge[current_sge].length < infiniband->max_inline_data) {
546       iswr[current_swr].send_flags = IBV_SEND_INLINE;
547       ldout(cct, 20) << __func__ << " send_inline." << dendl;
548       }*/
549
550     worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
551     if (pre_wr)
552       pre_wr->next = &iswr[current_swr];
553     pre_wr = &iswr[current_swr];
554     ++current_sge;
555     ++current_swr;
556     ++current_buffer;
557   }
558
559   ibv_send_wr *bad_tx_work_request;
560   if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
561     ldout(cct, 1) << __func__ << " failed to send data"
562                   << " (most probably should be peer not ready): "
563                   << cpp_strerror(errno) << dendl;
564     worker->perf_logger->inc(l_msgr_rdma_tx_failed);
565     return -errno;
566   }
567   worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
568   ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
569   return 0;
570 }
571
572 void RDMAConnectedSocketImpl::fin() {
573   ibv_send_wr wr;
574   memset(&wr, 0, sizeof(wr));
575   wr.wr_id = reinterpret_cast<uint64_t>(qp);
576   wr.num_sge = 0;
577   wr.opcode = IBV_WR_SEND;
578   wr.send_flags = IBV_SEND_SIGNALED;
579   ibv_send_wr* bad_tx_work_request;
580   if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
581     ldout(cct, 1) << __func__ << " failed to send message="
582                   << " ibv_post_send failed(most probably should be peer not ready): "
583                   << cpp_strerror(errno) << dendl;
584     worker->perf_logger->inc(l_msgr_rdma_tx_failed);
585     return ;
586   }
587 }
588
589 void RDMAConnectedSocketImpl::cleanup() {
590   if (con_handler && tcp_fd >= 0) {
591     (static_cast<C_handle_connection*>(con_handler))->close();
592     worker->center.submit_to(worker->center.get_id(), [this]() {
593       worker->center.delete_file_event(tcp_fd, EVENT_READABLE);
594     }, false);
595     delete con_handler;
596     con_handler = nullptr;
597   }
598 }
599
600 void RDMAConnectedSocketImpl::notify()
601 {
602   uint64_t i = 1;
603   int ret;
604
605   ret = write(notify_fd, &i, sizeof(i));
606   assert(ret = sizeof(i));
607 }
608
609 void RDMAConnectedSocketImpl::shutdown()
610 {
611   if (!error)
612     fin();
613   error = ECONNRESET;
614   active = false;
615 }
616
617 void RDMAConnectedSocketImpl::close()
618 {
619   if (!error)
620     fin();
621   error = ECONNRESET;
622   active = false;
623 }
624
625 void RDMAConnectedSocketImpl::fault()
626 {
627   ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
628   /*if (qp) {
629     qp->to_dead();
630     qp = NULL;
631     }*/
632   error = ECONNRESET;
633   connected = 1;
634   notify();
635 }
636
637 void RDMAConnectedSocketImpl::set_accept_fd(int sd)
638 {
639   tcp_fd = sd;
640   is_server = true;
641   worker->center.submit_to(worker->center.get_id(), [this]() {
642                            worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
643                            }, true);
644 }