1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include "RDMAStack.h"
19 #define dout_subsys ceph_subsys_ms
21 #define dout_prefix *_dout << " RDMAConnectedSocketImpl "
23 RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
25 : cct(cct), connected(0), error(0), infiniband(ib),
26 dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"),
27 is_server(false), con_handler(new C_handle_connection(this)),
28 active(false), pending(false)
30 qp = infiniband->create_queue_pair(
31 cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC);
32 my_msg.qpn = qp->get_local_qp_number();
33 my_msg.psn = qp->get_initial_psn();
34 my_msg.lid = infiniband->get_lid();
36 my_msg.gid = infiniband->get_gid();
37 notify_fd = dispatcher->register_qp(qp, this);
38 dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
39 dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
42 RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
44 ldout(cct, 20) << __func__ << " destruct." << dendl;
46 worker->remove_pending_conn(this);
47 dispatcher->erase_qpn(my_msg.qpn);
48 Mutex::Locker l(lock);
55 for (unsigned i=0; i < wc.size(); ++i) {
56 ret = infiniband->post_chunk(reinterpret_cast<Chunk*>(wc[i].wr_id));
58 dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
60 for (unsigned i=0; i < buffers.size(); ++i) {
61 ret = infiniband->post_chunk(buffers[i]);
63 dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
67 void RDMAConnectedSocketImpl::pass_wc(std::vector<ibv_wc> &&v)
69 Mutex::Locker l(lock);
73 wc.insert(wc.end(), v.begin(), v.end());
77 void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
79 Mutex::Locker l(lock);
85 int RDMAConnectedSocketImpl::activate()
90 // now connect up the qps and switch to RTR
91 memset(&qpa, 0, sizeof(qpa));
92 qpa.qp_state = IBV_QPS_RTR;
93 qpa.path_mtu = IBV_MTU_1024;
94 qpa.dest_qp_num = peer_msg.qpn;
95 qpa.rq_psn = peer_msg.psn;
96 qpa.max_dest_rd_atomic = 1;
97 qpa.min_rnr_timer = 12;
98 //qpa.ah_attr.is_global = 0;
99 qpa.ah_attr.is_global = 1;
100 qpa.ah_attr.grh.hop_limit = 6;
101 qpa.ah_attr.grh.dgid = peer_msg.gid;
103 qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx();
105 qpa.ah_attr.dlid = peer_msg.lid;
106 qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
107 qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
108 qpa.ah_attr.src_path_bits = 0;
109 qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port());
111 ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
113 r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
118 IBV_QP_MIN_RNR_TIMER |
119 IBV_QP_MAX_DEST_RD_ATOMIC);
121 lderr(cct) << __func__ << " failed to transition to RTR state: "
122 << cpp_strerror(errno) << dendl;
126 ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
129 qpa.qp_state = IBV_QPS_RTS;
131 // How long to wait before retrying if packet lost or server dead.
132 // Supposedly the timeout is 4.096us*2^timeout. However, the actual
133 // timeout appears to be 4.096us*2^(timeout+1), so the setting
134 // below creates a 135ms timeout.
137 // How many times to retry after timeouts before giving up.
140 // How many times to retry after RNR (receiver not ready) condition
141 // before giving up. Occurs when the remote side has not yet posted
142 // a receive request.
143 qpa.rnr_retry = 7; // 7 is infinite retry.
144 qpa.sq_psn = my_msg.psn;
145 qpa.max_rd_atomic = 1;
147 r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
152 IBV_QP_MAX_QP_RD_ATOMIC);
154 lderr(cct) << __func__ << " failed to transition to RTS state: "
155 << cpp_strerror(errno) << dendl;
159 // the queue pair should be ready to use once the client has finished
160 // setting up their end.
161 ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
162 ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl;
165 connected = 1; //indicate successfully
166 ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl;
174 int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) {
175 ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:"
176 << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl;
178 tcp_fd = net.connect(peer_addr, opts.connect_bind_addr);
183 net.set_close_on_exec(tcp_fd);
185 int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size);
192 ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
193 net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
195 r = infiniband->send_msg(cct, tcp_fd, my_msg);
199 worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);
203 void RDMAConnectedSocketImpl::handle_connection() {
204 ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
205 int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
208 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
209 ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
215 if (!is_server) {// syn + ack from server
216 my_msg.peer_qpn = peer_msg.qpn;
217 ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn
218 << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl;
224 r = infiniband->send_msg(cct, tcp_fd, my_msg);
226 ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
227 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
231 if (peer_msg.peer_qpn == 0) {// syn from client
233 ldout(cct, 10) << __func__ << " server is already active." << dendl;
236 r = infiniband->send_msg(cct, tcp_fd, my_msg);
238 ldout(cct, 1) << __func__ << " server ack failed." << dendl;
239 dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
245 } else { // ack from client
254 ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
257 int r = ::read(notify_fd, &i, sizeof(i));
258 ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl;
260 if (!buffers.empty())
261 read = read_buffers(buf,len);
263 std::vector<ibv_wc> cqe;
266 if (!buffers.empty()) {
279 ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl;
280 for (size_t i = 0; i < cqe.size(); ++i) {
281 ibv_wc* response = &cqe[i];
282 assert(response->status == IBV_WC_SUCCESS);
283 Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
284 ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
285 chunk->prepare_read(response->byte_len);
286 worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
287 if (response->byte_len == 0) {
288 dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
291 ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
293 assert(infiniband->post_chunk(chunk) == 0);
294 dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
296 if (read == (ssize_t)len) {
297 buffers.push_back(chunk);
298 ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl;
299 } else if (read + response->byte_len > (ssize_t)len) {
300 read += chunk->read(buf+read, (ssize_t)len-read);
301 buffers.push_back(chunk);
302 ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl;
304 read += chunk->read(buf+read, response->byte_len);
305 assert(infiniband->post_chunk(chunk) == 0);
306 dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
311 worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
312 if (is_server && connected == 0) {
313 ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
314 connected = 1; //if so, we don't need the last handshake
319 if (!buffers.empty()) {
323 if (read == 0 && error)
325 return read == 0 ? -EAGAIN : read;
328 ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len)
330 size_t read = 0, tmp = 0;
331 auto c = buffers.begin();
332 for (; c != buffers.end() ; ++c) {
333 tmp = (*c)->read(buf+read, len-read);
335 ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl;
337 assert(infiniband->post_chunk(*c) == 0);
338 dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks);
339 ldout(cct, 25) << __func__ << " one chunk over." << dendl;
346 if (c != buffers.end() && (*c)->over())
348 buffers.erase(buffers.begin(), c);
349 ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl;
353 ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data)
357 static const int MAX_COMPLETIONS = 16;
358 ibv_wc wc[MAX_COMPLETIONS];
364 auto iter = buffers.begin();
365 if (iter != buffers.end()) {
367 // FIXME need to handle release
368 // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
374 std::vector<ibv_wc> cqe;
377 return size == 0 ? -EAGAIN : size;
379 ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl;
381 for (size_t i = 0; i < cqe.size(); ++i) {
383 chunk = reinterpret_cast<Chunk*>(response->wr_id);
384 chunk->prepare_read(response->byte_len);
385 if (!loaded && i == 0) {
386 // FIXME need to handle release
387 // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband);
391 buffers.push_back(chunk);
400 ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
407 size_t bytes = bl.length();
411 Mutex::Locker l(lock);
412 pending_bl.claim_append(bl);
414 ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl;
418 ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl;
419 ssize_t r = submit(more);
420 if (r < 0 && r != -EAGAIN)
425 ssize_t RDMAConnectedSocketImpl::submit(bool more)
429 Mutex::Locker l(lock);
430 size_t bytes = pending_bl.length();
431 ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: "
432 << pending_bl.buffers().size() << dendl;
436 auto fill_tx_via_copy = [this](std::vector<Chunk*> &tx_buffers, unsigned bytes,
437 std::list<bufferptr>::const_iterator &start,
438 std::list<bufferptr>::const_iterator &end) -> unsigned {
439 assert(start != end);
440 auto chunk_idx = tx_buffers.size();
441 int ret = worker->get_reged_mem(this, tx_buffers, bytes);
443 ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
444 worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
448 unsigned total_copied = 0;
449 Chunk *current_chunk = tx_buffers[chunk_idx];
450 while (start != end) {
451 const uintptr_t addr = reinterpret_cast<const uintptr_t>(start->c_str());
453 while (copied < start->length()) {
454 uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied);
458 if (current_chunk->full()){
459 current_chunk = tx_buffers[++chunk_idx];
460 if (chunk_idx == tx_buffers.size())
470 std::vector<Chunk*> tx_buffers;
471 std::list<bufferptr>::const_iterator it = pending_bl.buffers().begin();
472 std::list<bufferptr>::const_iterator copy_it = it;
474 unsigned need_reserve_bytes = 0;
475 while (it != pending_bl.buffers().end()) {
476 if (infiniband->is_tx_buffer(it->raw_c_str())) {
477 if (need_reserve_bytes) {
478 unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
480 if (copied < need_reserve_bytes)
482 need_reserve_bytes = 0;
484 assert(copy_it == it);
485 tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str()));
486 total += it->length();
489 need_reserve_bytes += it->length();
493 if (need_reserve_bytes)
494 total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it);
499 assert(total <= pending_bl.length());
501 if (total < pending_bl.length()) {
502 worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
503 pending_bl.splice(total, pending_bl.length()-total, &swapped);
504 pending_bl.swap(swapped);
509 ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers "
510 << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl;
512 int r = post_work_request(tx_buffers);
516 ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl;
517 return pending_bl.length() ? -EAGAIN : 0;
520 int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
522 ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl;
523 vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
524 ibv_sge isge[tx_buffers.size()];
525 uint32_t current_sge = 0;
526 ibv_send_wr iswr[tx_buffers.size()];
527 uint32_t current_swr = 0;
528 ibv_send_wr* pre_wr = NULL;
530 memset(iswr, 0, sizeof(iswr));
531 memset(isge, 0, sizeof(isge));
532 current_buffer = tx_buffers.begin();
533 while (current_buffer != tx_buffers.end()) {
534 isge[current_sge].addr = reinterpret_cast<uint64_t>((*current_buffer)->buffer);
535 isge[current_sge].length = (*current_buffer)->get_offset();
536 isge[current_sge].lkey = (*current_buffer)->mr->lkey;
537 ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl;
539 iswr[current_swr].wr_id = reinterpret_cast<uint64_t>(*current_buffer);
540 iswr[current_swr].next = NULL;
541 iswr[current_swr].sg_list = &isge[current_sge];
542 iswr[current_swr].num_sge = 1;
543 iswr[current_swr].opcode = IBV_WR_SEND;
544 iswr[current_swr].send_flags = IBV_SEND_SIGNALED;
545 /*if (isge[current_sge].length < infiniband->max_inline_data) {
546 iswr[current_swr].send_flags = IBV_SEND_INLINE;
547 ldout(cct, 20) << __func__ << " send_inline." << dendl;
550 worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
552 pre_wr->next = &iswr[current_swr];
553 pre_wr = &iswr[current_swr];
559 ibv_send_wr *bad_tx_work_request;
560 if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
561 ldout(cct, 1) << __func__ << " failed to send data"
562 << " (most probably should be peer not ready): "
563 << cpp_strerror(errno) << dendl;
564 worker->perf_logger->inc(l_msgr_rdma_tx_failed);
567 worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
568 ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
572 void RDMAConnectedSocketImpl::fin() {
574 memset(&wr, 0, sizeof(wr));
575 wr.wr_id = reinterpret_cast<uint64_t>(qp);
577 wr.opcode = IBV_WR_SEND;
578 wr.send_flags = IBV_SEND_SIGNALED;
579 ibv_send_wr* bad_tx_work_request;
580 if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
581 ldout(cct, 1) << __func__ << " failed to send message="
582 << " ibv_post_send failed(most probably should be peer not ready): "
583 << cpp_strerror(errno) << dendl;
584 worker->perf_logger->inc(l_msgr_rdma_tx_failed);
589 void RDMAConnectedSocketImpl::cleanup() {
590 if (con_handler && tcp_fd >= 0) {
591 (static_cast<C_handle_connection*>(con_handler))->close();
592 worker->center.submit_to(worker->center.get_id(), [this]() {
593 worker->center.delete_file_event(tcp_fd, EVENT_READABLE);
596 con_handler = nullptr;
600 void RDMAConnectedSocketImpl::notify()
605 ret = write(notify_fd, &i, sizeof(i));
606 assert(ret = sizeof(i));
609 void RDMAConnectedSocketImpl::shutdown()
617 void RDMAConnectedSocketImpl::close()
625 void RDMAConnectedSocketImpl::fault()
627 ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl;
637 void RDMAConnectedSocketImpl::set_accept_fd(int sd)
641 worker->center.submit_to(worker->center.get_id(), [this]() {
642 worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler);