initial code repo
[stor4nfv.git] / src / ceph / src / msg / async / rdma / RDMAConnectedSocketImpl.cc
diff --git a/src/ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
new file mode 100644 (file)
index 0000000..66dc488
--- /dev/null
@@ -0,0 +1,644 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "RDMAStack.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << " RDMAConnectedSocketImpl "
+
+RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+                                                RDMAWorker *w)
+  : cct(cct), connected(0), error(0), infiniband(ib),
+    dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
+    is_server(false), con_handler(new C_handle_connection(this)),
+    active(false), pending(false)
+{
+  qp = infiniband->create_queue_pair(
+                                    cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC);
+  my_msg.qpn = qp->get_local_qp_number();
+  my_msg.psn = qp->get_initial_psn();
+  my_msg.lid = infiniband->get_lid();
+  my_msg.peer_qpn = 0;
+  my_msg.gid = infiniband->get_gid();
+  notify_fd = dispatcher->register_qp(qp, this);
+  dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
+  dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
+}
+
+RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
+{
+  ldout(cct, 20) << __func__ << " destruct." << dendl;
+  cleanup();
+  worker->remove_pending_conn(this);
+  dispatcher->erase_qpn(my_msg.qpn);
+  Mutex::Locker l(lock);
+  if (notify_fd >= 0)
+    ::close(notify_fd);
+  if (tcp_fd >= 0)
+    ::close(tcp_fd);
+  error = ECONNRESET;
+  int ret = 0;
+  for (unsigned i=0; i < wc.size(); ++i) {
+    ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
+    assert(ret == 0);
+    dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
+  }
+  for (unsigned i=0; i < buffers.size(); ++i) {
+    ret = infiniband->post_chunk(buffers[i]);
+    assert(ret == 0);
+    dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
+  }
+}
+
+void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
+{
+  Mutex::Locker l(lock);
+  if (wc.empty())
+    wc = std::move(v);
+  else
+    wc.insert(wc.end(), v.begin(), v.end());
+  notify();
+}
+
+void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
+{
+  Mutex::Locker l(lock);
+  if (wc.empty())
+    return ;
+  w.swap(wc);
+}
+
+int RDMAConnectedSocketImpl::activate()
+{
+  ibv_qp_attr qpa;
+  int r;
+
+  // now connect up the qps and switch to RTR
+  memset(&qpa, 0, sizeof(qpa));
+  qpa.qp_state = IBV_QPS_RTR;
+  qpa.path_mtu = IBV_MTU_1024;
+  qpa.dest_qp_num = peer_msg.qpn;
+  qpa.rq_psn = peer_msg.psn;
+  qpa.max_dest_rd_atomic = 1;
+  qpa.min_rnr_timer = 12;
+  //qpa.ah_attr.is_global = 0;
+  qpa.ah_attr.is_global = 1;
+  qpa.ah_attr.grh.hop_limit = 6;
+  qpa.ah_attr.grh.dgid = peer_msg.gid;
+
+  qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
+
+  qpa.ah_attr.dlid = peer_msg.lid;
+  qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
+  qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
+  qpa.ah_attr.src_path_bits = 0;
+  qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
+
+  ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
+
+  r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
+      IBV_QP_AV |
+      IBV_QP_PATH_MTU |
+      IBV_QP_DEST_QPN |
+      IBV_QP_RQ_PSN |
+      IBV_QP_MIN_RNR_TIMER |
+      IBV_QP_MAX_DEST_RD_ATOMIC);
+  if (r) {
+    lderr(cct) << __func__ << " failed to transition to RTR state: "
+               << cpp_strerror(errno) << dendl;
+    return -1;
+  }
+
+  ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
+
+  // now move to RTS
+  qpa.qp_state = IBV_QPS_RTS;
+
+  // How long to wait before retrying if packet lost or server dead.
+  // Supposedly the timeout is 4.096us*2^timeout.  However, the actual
+  // timeout appears to be 4.096us*2^(timeout+1), so the setting
+  // below creates a 135ms timeout.
+  qpa.timeout = 14;
+
+  // How many times to retry after timeouts before giving up.
+  qpa.retry_cnt = 7;
+
+  // How many times to retry after RNR (receiver not ready) condition
+  // before giving up. Occurs when the remote side has not yet posted
+  // a receive request.
+  qpa.rnr_retry = 7; // 7 is infinite retry.
+  qpa.sq_psn = my_msg.psn;
+  qpa.max_rd_atomic = 1;
+
+  r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
+      IBV_QP_TIMEOUT |
+      IBV_QP_RETRY_CNT |
+      IBV_QP_RNR_RETRY |
+      IBV_QP_SQ_PSN |
+      IBV_QP_MAX_QP_RD_ATOMIC);
+  if (r) {
+    lderr(cct) << __func__ << " failed to transition to RTS state: "
+               << cpp_strerror(errno) << dendl;
+    return -1;
+  }
+
+  // the queue pair should be ready to use once the client has finished
+  // setting up their end.
+  ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
+  ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl;
+
+  if (!is_server) {
+    connected = 1; //indicate successfully
+    ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
+    submit(false);
+  }
+  active = true;
+
+  return 0;
+}
+
+int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
+  ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
+                 << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
+  NetHandler net(cct);
+  tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
+
+  if (tcp_fd < 0) {
+    return -errno;
+  }
+  net.set_close_on_exec(tcp_fd);
+
+  int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);
+  if (r < 0) {
+    ::close(tcp_fd);
+    tcp_fd = -1;
+    return -errno;
+  }
+
+  ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
+  net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
+  my_msg.peer_qpn = 0;
+  r = infiniband->send_msg(cct, tcp_fd, my_msg);
+  if (r < 0)
+    return r;
+
+  worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
+  return 0;
+}
+
+void RDMAConnectedSocketImpl::handle_connection() {
+  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
+  int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
+  if (r < 0) {
+    if (r != -EAGAIN) {
+      dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
+      ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
+      fault();
+    }
+    return;
+  }
+
+  if (!is_server) {// syn + ack from server
+    my_msg.peer_qpn = peer_msg.qpn;
+    ldout(cct, 20) << __func__ << " peer msg :  < " << peer_msg.qpn << ", " << peer_msg.psn
+                   <<  ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;
+    if (!connected) {
+      r = activate();
+      assert(!r);
+    }
+    notify();
+    r = infiniband->send_msg(cct, tcp_fd, my_msg);
+    if (r < 0) {
+      ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
+      dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
+      fault();
+    }
+  } else {
+    if (peer_msg.peer_qpn == 0) {// syn from client
+      if (active) {
+        ldout(cct, 10) << __func__ << " server is already active." << dendl;
+        return ;
+      }
+      r = infiniband->send_msg(cct, tcp_fd, my_msg);
+      if (r < 0) {
+        ldout(cct, 1) << __func__ << " server ack failed." << dendl;
+        dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
+        fault();
+        return ;
+      }
+      r = activate();
+      assert(!r);
+    } else { // ack from client
+      connected = 1;
+      cleanup();
+      submit(false);
+      notify();
+    }
+  }
+}
+
+ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
+{
+  uint64_t i = 0;
+  int r = ::read(notify_fd, &i, sizeof(i));
+  ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;
+  ssize_t read = 0;
+  if (!buffers.empty())
+    read = read_buffers(buf,len);
+
+  std::vector<ibv_wc> cqe;
+  get_wc(cqe);
+  if (cqe.empty()) {
+    if (!buffers.empty()) {
+      notify();
+    }
+    if (read > 0) {
+      return read;
+    }
+    if (error) {
+      return -error;
+    } else {
+      return -EAGAIN;
+    }
+  }
+
+  ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
+  for (size_t i = 0; i < cqe.size(); ++i) {
+    ibv_wc* response = &cqe[i];
+    assert(response->status == IBV_WC_SUCCESS);
+    Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
+    ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
+    chunk->prepare_read(response->byte_len);
+    worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
+    if (response->byte_len == 0) {
+      dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
+      if (connected) {
+        error = ECONNRESET;
+        ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
+      }
+      assert(infiniband->post_chunk(chunk) == 0);
+      dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
+    } else {
+      if (read == (ssize_t)len) {
+        buffers.push_back(chunk);
+        ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
+      } else if (read + response->byte_len > (ssize_t)len) {
+        read += chunk->read(buf+read, (ssize_t)len-read);
+        buffers.push_back(chunk);
+        ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
+      } else {
+        read += chunk->read(buf+read, response->byte_len);
+        assert(infiniband->post_chunk(chunk) == 0);
+        dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
+      }
+    }
+  }
+
+  worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
+  if (is_server && connected == 0) {
+    ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
+    connected = 1; //if so, we don't need the last handshake
+    cleanup();
+    submit(false);
+  }
+
+  if (!buffers.empty()) {
+    notify();
+  }
+
+  if (read == 0 && error)
+    return -error;
+  return read == 0 ? -EAGAIN : read;
+}
+
+ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
+{
+  size_t read = 0, tmp = 0;
+  auto c = buffers.begin();
+  for (; c != buffers.end() ; ++c) {
+    tmp = (*c)->read(buf+read, len-read);
+    read += tmp;
+    ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound()  << ". Chunk:" << *c  << dendl;
+    if ((*c)->over()) {
+      assert(infiniband->post_chunk(*c) == 0);
+      dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
+      ldout(cct, 25) << __func__ << " one chunk over." << dendl;
+    }
+    if (read == len) {
+      break;
+    }
+  }
+
+  if (c != buffers.end() && (*c)->over())
+    ++c;
+  buffers.erase(buffers.begin(), c);
+  ldout(cct, 25) << __func__ << " got " << read  << " bytes, buffers size: " << buffers.size() << dendl;
+  return read;
+}
+
+ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
+{
+  if (error)
+    return -error;
+  static const int MAX_COMPLETIONS = 16;
+  ibv_wc wc[MAX_COMPLETIONS];
+  ssize_t size = 0;
+
+  ibv_wc*  response;
+  Chunk* chunk;
+  bool loaded = false;
+  auto iter = buffers.begin();
+  if (iter != buffers.end()) {
+    chunk = *iter;
+    // FIXME need to handle release
+    // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
+    buffers.erase(iter);
+    loaded = true;
+    size = chunk->bound;
+  }
+
+  std::vector<ibv_wc> cqe;
+  get_wc(cqe);
+  if (cqe.empty())
+    return size == 0 ? -EAGAIN : size;
+
+  ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
+
+  for (size_t i = 0; i < cqe.size(); ++i) {
+    response = &wc[i];
+    chunk = reinterpret_cast<Chunk*>(response->wr_id);
+    chunk->prepare_read(response->byte_len);
+    if (!loaded && i == 0) {
+      // FIXME need to handle release
+      // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
+      size = chunk->bound;
+      continue;
+    }
+    buffers.push_back(chunk);
+    iter++;
+  }
+
+  if (size == 0)
+    return -EAGAIN;
+  return size;
+}
+
+ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
+{
+  if (error) {
+    if (!active)
+      return -EPIPE;
+    return -error;
+  }
+  size_t bytes = bl.length();
+  if (!bytes)
+    return 0;
+  {
+    Mutex::Locker l(lock);
+    pending_bl.claim_append(bl);
+    if (!connected) {
+      ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
+      return bytes;
+    }
+  }
+  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
+  ssize_t r = submit(more);
+  if (r < 0 && r != -EAGAIN)
+    return r;
+  return bytes;
+}
+
+ssize_t RDMAConnectedSocketImpl::submit(bool more)
+{
+  if (error)
+    return -error;
+  Mutex::Locker l(lock);
+  size_t bytes = pending_bl.length();
+  ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
+                 << pending_bl.buffers().size() << dendl;
+  if (!bytes)
+    return 0;
+
+  auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers, unsigned bytes,
+                                 std::list<bufferptr>::const_iterator &start,
+                                 std::list<bufferptr>::const_iterator &end) -> unsigned {
+    assert(start != end);
+    auto chunk_idx = tx_buffers.size();
+    int ret = worker->get_reged_mem(this, tx_buffers, bytes);
+    if (ret == 0) {
+      ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
+      worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
+      return 0;
+    }
+
+    unsigned total_copied = 0;
+    Chunk *current_chunk = tx_buffers[chunk_idx];
+    while (start != end) {
+      const uintptr_t addr = reinterpret_cast<const uintptr_t>(start->c_str());
+      unsigned copied = 0;
+      while (copied < start->length()) {
+        uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied);
+        copied += r;
+        total_copied += r;
+        bytes -= r;
+        if (current_chunk->full()){
+          current_chunk = tx_buffers[++chunk_idx];
+          if (chunk_idx == tx_buffers.size())
+            return total_copied;
+        }
+      }
+      ++start;
+    }
+    assert(bytes == 0);
+    return total_copied;
+  };
+
+  std::vector<Chunk*> tx_buffers;
+  std::list<bufferptr>::const_iterator it = pending_bl.buffers().begin();
+  std::list<bufferptr>::const_iterator copy_it = it;
+  unsigned total = 0;
+  unsigned need_reserve_bytes = 0;
+  while (it != pending_bl.buffers().end()) {
+    if (infiniband->is_tx_buffer(it->raw_c_str())) {
+      if (need_reserve_bytes) {
+        unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
+        total += copied;
+        if (copied < need_reserve_bytes)
+          goto sending;
+        need_reserve_bytes = 0;
+      }
+      assert(copy_it == it);
+      tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
+      total += it->length();
+      ++copy_it;
+    } else {
+      need_reserve_bytes += it->length();
+    }
+    ++it;
+  }
+  if (need_reserve_bytes)
+    total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
+
+ sending:
+  if (total == 0)
+    return -EAGAIN;
+  assert(total <= pending_bl.length());
+  bufferlist swapped;
+  if (total < pending_bl.length()) {
+    worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
+    pending_bl.splice(total, pending_bl.length()-total, &swapped);
+    pending_bl.swap(swapped);
+  } else {
+    pending_bl.clear();
+  }
+
+  ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
+                 << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl;
+
+  int r = post_work_request(tx_buffers);
+  if (r < 0)
+    return r;
+
+  ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;
+  return pending_bl.length() ? -EAGAIN : 0;
+}
+
+int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
+{
+  ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
+  vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
+  ibv_sge isge[tx_buffers.size()];
+  uint32_t current_sge = 0;
+  ibv_send_wr iswr[tx_buffers.size()];
+  uint32_t current_swr = 0;
+  ibv_send_wr* pre_wr = NULL;
+
+  memset(iswr, 0, sizeof(iswr));
+  memset(isge, 0, sizeof(isge));
+  current_buffer = tx_buffers.begin();
+  while (current_buffer != tx_buffers.end()) {
+    isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);
+    isge[current_sge].length = (*current_buffer)->get_offset();
+    isge[current_sge].lkey = (*current_buffer)->mr->lkey;
+    ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length  << dendl;
+
+    iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);
+    iswr[current_swr].next = NULL;
+    iswr[current_swr].sg_list = &isge[current_sge];
+    iswr[current_swr].num_sge = 1;
+    iswr[current_swr].opcode = IBV_WR_SEND;
+    iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
+    /*if (isge[current_sge].length < infiniband->max_inline_data) {
+      iswr[current_swr].send_flags = IBV_SEND_INLINE;
+      ldout(cct, 20) << __func__ << " send_inline." << dendl;
+      }*/
+
+    worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
+    if (pre_wr)
+      pre_wr->next = &iswr[current_swr];
+    pre_wr = &iswr[current_swr];
+    ++current_sge;
+    ++current_swr;
+    ++current_buffer;
+  }
+
+  ibv_send_wr *bad_tx_work_request;
+  if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
+    ldout(cct, 1) << __func__ << " failed to send data"
+                  << " (most probably should be peer not ready): "
+                  << cpp_strerror(errno) << dendl;
+    worker->perf_logger->inc(l_msgr_rdma_tx_failed);
+    return -errno;
+  }
+  worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
+  ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
+  return 0;
+}
+
+void RDMAConnectedSocketImpl::fin() {
+  ibv_send_wr wr;
+  memset(&wr, 0, sizeof(wr));
+  wr.wr_id = reinterpret_cast<uint64_t>(qp);
+  wr.num_sge = 0;
+  wr.opcode = IBV_WR_SEND;
+  wr.send_flags = IBV_SEND_SIGNALED;
+  ibv_send_wr* bad_tx_work_request;
+  if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
+    ldout(cct, 1) << __func__ << " failed to send message="
+                  << " ibv_post_send failed(most probably should be peer not ready): "
+                  << cpp_strerror(errno) << dendl;
+    worker->perf_logger->inc(l_msgr_rdma_tx_failed);
+    return ;
+  }
+}
+
+void RDMAConnectedSocketImpl::cleanup() {
+  if (con_handler && tcp_fd >= 0) {
+    (static_cast<C_handle_connection*>(con_handler))->close();
+    worker->center.submit_to(worker->center.get_id(), [this]() {
+      worker->center.delete_file_event(tcp_fd, EVENT_READABLE);
+    }, false);
+    delete con_handler;
+    con_handler = nullptr;
+  }
+}
+
+void RDMAConnectedSocketImpl::notify()
+{
+  uint64_t i = 1;
+  int ret;
+
+  ret = write(notify_fd, &i, sizeof(i));
+  assert(ret = sizeof(i));
+}
+
+void RDMAConnectedSocketImpl::shutdown()
+{
+  if (!error)
+    fin();
+  error = ECONNRESET;
+  active = false;
+}
+
+void RDMAConnectedSocketImpl::close()
+{
+  if (!error)
+    fin();
+  error = ECONNRESET;
+  active = false;
+}
+
+void RDMAConnectedSocketImpl::fault()
+{
+  ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
+  /*if (qp) {
+    qp->to_dead();
+    qp = NULL;
+    }*/
+  error = ECONNRESET;
+  connected = 1;
+  notify();
+}
+
+void RDMAConnectedSocketImpl::set_accept_fd(int sd)
+{
+  tcp_fd = sd;
+  is_server = true;
+  worker->center.submit_to(worker->center.get_id(), [this]() {
+                          worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
+                          }, true);
+}