// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2016 XSKY * * Author: Haomai Wang * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include "RDMAStack.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix #define dout_prefix *_dout << " RDMAConnectedSocketImpl " RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, RDMAWorker *w) : cct(cct), connected(0), error(0), infiniband(ib), dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"), is_server(false), con_handler(new C_handle_connection(this)), active(false), pending(false) { qp = infiniband->create_queue_pair( cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC); my_msg.qpn = qp->get_local_qp_number(); my_msg.psn = qp->get_initial_psn(); my_msg.lid = infiniband->get_lid(); my_msg.peer_qpn = 0; my_msg.gid = infiniband->get_gid(); notify_fd = dispatcher->register_qp(qp, this); dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair); dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair); } RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() { ldout(cct, 20) << __func__ << " destruct." << dendl; cleanup(); worker->remove_pending_conn(this); dispatcher->erase_qpn(my_msg.qpn); Mutex::Locker l(lock); if (notify_fd >= 0) ::close(notify_fd); if (tcp_fd >= 0) ::close(tcp_fd); error = ECONNRESET; int ret = 0; for (unsigned i=0; i < wc.size(); ++i) { ret = infiniband->post_chunk(reinterpret_cast(wc[i].wr_id)); assert(ret == 0); dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); } for (unsigned i=0; i < buffers.size(); ++i) { ret = infiniband->post_chunk(buffers[i]); assert(ret == 0); dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); } } void RDMAConnectedSocketImpl::pass_wc(std::vector &&v) { Mutex::Locker l(lock); if (wc.empty()) wc = std::move(v); else wc.insert(wc.end(), v.begin(), v.end()); notify(); } void RDMAConnectedSocketImpl::get_wc(std::vector &w) { Mutex::Locker l(lock); if (wc.empty()) return ; w.swap(wc); } int RDMAConnectedSocketImpl::activate() { ibv_qp_attr qpa; int r; // now connect up the qps and switch to RTR memset(&qpa, 0, sizeof(qpa)); qpa.qp_state = IBV_QPS_RTR; qpa.path_mtu = IBV_MTU_1024; qpa.dest_qp_num = peer_msg.qpn; qpa.rq_psn = peer_msg.psn; qpa.max_dest_rd_atomic = 1; qpa.min_rnr_timer = 12; //qpa.ah_attr.is_global = 0; qpa.ah_attr.is_global = 1; qpa.ah_attr.grh.hop_limit = 6; qpa.ah_attr.grh.dgid = peer_msg.gid; qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx(); qpa.ah_attr.dlid = peer_msg.lid; qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp; qpa.ah_attr.src_path_bits = 0; qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port()); ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl; r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MIN_RNR_TIMER | IBV_QP_MAX_DEST_RD_ATOMIC); if (r) { lderr(cct) << __func__ << " failed to transition to RTR state: " << cpp_strerror(errno) << dendl; return -1; } ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl; // now move to RTS qpa.qp_state = IBV_QPS_RTS; // How long to wait before retrying if packet lost or server dead. // Supposedly the timeout is 4.096us*2^timeout. However, the actual // timeout appears to be 4.096us*2^(timeout+1), so the setting // below creates a 135ms timeout. qpa.timeout = 14; // How many times to retry after timeouts before giving up. qpa.retry_cnt = 7; // How many times to retry after RNR (receiver not ready) condition // before giving up. Occurs when the remote side has not yet posted // a receive request. qpa.rnr_retry = 7; // 7 is infinite retry. qpa.sq_psn = my_msg.psn; qpa.max_rd_atomic = 1; r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC); if (r) { lderr(cct) << __func__ << " failed to transition to RTS state: " << cpp_strerror(errno) << dendl; return -1; } // the queue pair should be ready to use once the client has finished // setting up their end. ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl; ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl; if (!is_server) { connected = 1; //indicate successfully ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl; submit(false); } active = true; return 0; } int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; NetHandler net(cct); tcp_fd = net.connect(peer_addr, opts.connect_bind_addr); if (tcp_fd < 0) { return -errno; } net.set_close_on_exec(tcp_fd); int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size); if (r < 0) { ::close(tcp_fd); tcp_fd = -1; return -errno; } ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; net.set_priority(tcp_fd, opts.priority, peer_addr.get_family()); my_msg.peer_qpn = 0; r = infiniband->send_msg(cct, tcp_fd, my_msg); if (r < 0) return r; worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); return 0; } void RDMAConnectedSocketImpl::handle_connection() { ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl; int r = infiniband->recv_msg(cct, tcp_fd, peer_msg); if (r < 0) { if (r != -EAGAIN) { dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl; fault(); } return; } if (!is_server) {// syn + ack from server my_msg.peer_qpn = peer_msg.qpn; ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl; if (!connected) { r = activate(); assert(!r); } notify(); r = infiniband->send_msg(cct, tcp_fd, my_msg); if (r < 0) { ldout(cct, 1) << __func__ << " send client ack failed." << dendl; dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); fault(); } } else { if (peer_msg.peer_qpn == 0) {// syn from client if (active) { ldout(cct, 10) << __func__ << " server is already active." << dendl; return ; } r = infiniband->send_msg(cct, tcp_fd, my_msg); if (r < 0) { ldout(cct, 1) << __func__ << " server ack failed." << dendl; dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); fault(); return ; } r = activate(); assert(!r); } else { // ack from client connected = 1; cleanup(); submit(false); notify(); } } } ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) { uint64_t i = 0; int r = ::read(notify_fd, &i, sizeof(i)); ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl; ssize_t read = 0; if (!buffers.empty()) read = read_buffers(buf,len); std::vector cqe; get_wc(cqe); if (cqe.empty()) { if (!buffers.empty()) { notify(); } if (read > 0) { return read; } if (error) { return -error; } else { return -EAGAIN; } } ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl; for (size_t i = 0; i < cqe.size(); ++i) { ibv_wc* response = &cqe[i]; assert(response->status == IBV_WC_SUCCESS); Chunk* chunk = reinterpret_cast(response->wr_id); ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl; chunk->prepare_read(response->byte_len); worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len); if (response->byte_len == 0) { dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin); if (connected) { error = ECONNRESET; ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; } assert(infiniband->post_chunk(chunk) == 0); dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); } else { if (read == (ssize_t)len) { buffers.push_back(chunk); ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; } else if (read + response->byte_len > (ssize_t)len) { read += chunk->read(buf+read, (ssize_t)len-read); buffers.push_back(chunk); ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; } else { read += chunk->read(buf+read, response->byte_len); assert(infiniband->post_chunk(chunk) == 0); dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); } } } worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size()); if (is_server && connected == 0) { ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; connected = 1; //if so, we don't need the last handshake cleanup(); submit(false); } if (!buffers.empty()) { notify(); } if (read == 0 && error) return -error; return read == 0 ? -EAGAIN : read; } ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) { size_t read = 0, tmp = 0; auto c = buffers.begin(); for (; c != buffers.end() ; ++c) { tmp = (*c)->read(buf+read, len-read); read += tmp; ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl; if ((*c)->over()) { assert(infiniband->post_chunk(*c) == 0); dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); ldout(cct, 25) << __func__ << " one chunk over." << dendl; } if (read == len) { break; } } if (c != buffers.end() && (*c)->over()) ++c; buffers.erase(buffers.begin(), c); ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl; return read; } ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) { if (error) return -error; static const int MAX_COMPLETIONS = 16; ibv_wc wc[MAX_COMPLETIONS]; ssize_t size = 0; ibv_wc* response; Chunk* chunk; bool loaded = false; auto iter = buffers.begin(); if (iter != buffers.end()) { chunk = *iter; // FIXME need to handle release // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); buffers.erase(iter); loaded = true; size = chunk->bound; } std::vector cqe; get_wc(cqe); if (cqe.empty()) return size == 0 ? -EAGAIN : size; ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl; for (size_t i = 0; i < cqe.size(); ++i) { response = &wc[i]; chunk = reinterpret_cast(response->wr_id); chunk->prepare_read(response->byte_len); if (!loaded && i == 0) { // FIXME need to handle release // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); size = chunk->bound; continue; } buffers.push_back(chunk); iter++; } if (size == 0) return -EAGAIN; return size; } ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) { if (error) { if (!active) return -EPIPE; return -error; } size_t bytes = bl.length(); if (!bytes) return 0; { Mutex::Locker l(lock); pending_bl.claim_append(bl); if (!connected) { ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl; return bytes; } } ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl; ssize_t r = submit(more); if (r < 0 && r != -EAGAIN) return r; return bytes; } ssize_t RDMAConnectedSocketImpl::submit(bool more) { if (error) return -error; Mutex::Locker l(lock); size_t bytes = pending_bl.length(); ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: " << pending_bl.buffers().size() << dendl; if (!bytes) return 0; auto fill_tx_via_copy = [this](std::vector &tx_buffers, unsigned bytes, std::list::const_iterator &start, std::list::const_iterator &end) -> unsigned { assert(start != end); auto chunk_idx = tx_buffers.size(); int ret = worker->get_reged_mem(this, tx_buffers, bytes); if (ret == 0) { ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl; worker->perf_logger->inc(l_msgr_rdma_tx_no_mem); return 0; } unsigned total_copied = 0; Chunk *current_chunk = tx_buffers[chunk_idx]; while (start != end) { const uintptr_t addr = reinterpret_cast(start->c_str()); unsigned copied = 0; while (copied < start->length()) { uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied); copied += r; total_copied += r; bytes -= r; if (current_chunk->full()){ current_chunk = tx_buffers[++chunk_idx]; if (chunk_idx == tx_buffers.size()) return total_copied; } } ++start; } assert(bytes == 0); return total_copied; }; std::vector tx_buffers; std::list::const_iterator it = pending_bl.buffers().begin(); std::list::const_iterator copy_it = it; unsigned total = 0; unsigned need_reserve_bytes = 0; while (it != pending_bl.buffers().end()) { if (infiniband->is_tx_buffer(it->raw_c_str())) { if (need_reserve_bytes) { unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); total += copied; if (copied < need_reserve_bytes) goto sending; need_reserve_bytes = 0; } assert(copy_it == it); tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str())); total += it->length(); ++copy_it; } else { need_reserve_bytes += it->length(); } ++it; } if (need_reserve_bytes) total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); sending: if (total == 0) return -EAGAIN; assert(total <= pending_bl.length()); bufferlist swapped; if (total < pending_bl.length()) { worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem); pending_bl.splice(total, pending_bl.length()-total, &swapped); pending_bl.swap(swapped); } else { pending_bl.clear(); } ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers " << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl; int r = post_work_request(tx_buffers); if (r < 0) return r; ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl; return pending_bl.length() ? -EAGAIN : 0; } int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) { ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl; vector::iterator current_buffer = tx_buffers.begin(); ibv_sge isge[tx_buffers.size()]; uint32_t current_sge = 0; ibv_send_wr iswr[tx_buffers.size()]; uint32_t current_swr = 0; ibv_send_wr* pre_wr = NULL; memset(iswr, 0, sizeof(iswr)); memset(isge, 0, sizeof(isge)); current_buffer = tx_buffers.begin(); while (current_buffer != tx_buffers.end()) { isge[current_sge].addr = reinterpret_cast((*current_buffer)->buffer); isge[current_sge].length = (*current_buffer)->get_offset(); isge[current_sge].lkey = (*current_buffer)->mr->lkey; ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl; iswr[current_swr].wr_id = reinterpret_cast(*current_buffer); iswr[current_swr].next = NULL; iswr[current_swr].sg_list = &isge[current_sge]; iswr[current_swr].num_sge = 1; iswr[current_swr].opcode = IBV_WR_SEND; iswr[current_swr].send_flags = IBV_SEND_SIGNALED; /*if (isge[current_sge].length < infiniband->max_inline_data) { iswr[current_swr].send_flags = IBV_SEND_INLINE; ldout(cct, 20) << __func__ << " send_inline." << dendl; }*/ worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length); if (pre_wr) pre_wr->next = &iswr[current_swr]; pre_wr = &iswr[current_swr]; ++current_sge; ++current_swr; ++current_buffer; } ibv_send_wr *bad_tx_work_request; if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) { ldout(cct, 1) << __func__ << " failed to send data" << " (most probably should be peer not ready): " << cpp_strerror(errno) << dendl; worker->perf_logger->inc(l_msgr_rdma_tx_failed); return -errno; } worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size()); ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; return 0; } void RDMAConnectedSocketImpl::fin() { ibv_send_wr wr; memset(&wr, 0, sizeof(wr)); wr.wr_id = reinterpret_cast(qp); wr.num_sge = 0; wr.opcode = IBV_WR_SEND; wr.send_flags = IBV_SEND_SIGNALED; ibv_send_wr* bad_tx_work_request; if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { ldout(cct, 1) << __func__ << " failed to send message=" << " ibv_post_send failed(most probably should be peer not ready): " << cpp_strerror(errno) << dendl; worker->perf_logger->inc(l_msgr_rdma_tx_failed); return ; } } void RDMAConnectedSocketImpl::cleanup() { if (con_handler && tcp_fd >= 0) { (static_cast(con_handler))->close(); worker->center.submit_to(worker->center.get_id(), [this]() { worker->center.delete_file_event(tcp_fd, EVENT_READABLE); }, false); delete con_handler; con_handler = nullptr; } } void RDMAConnectedSocketImpl::notify() { uint64_t i = 1; int ret; ret = write(notify_fd, &i, sizeof(i)); assert(ret = sizeof(i)); } void RDMAConnectedSocketImpl::shutdown() { if (!error) fin(); error = ECONNRESET; active = false; } void RDMAConnectedSocketImpl::close() { if (!error) fin(); error = ECONNRESET; active = false; } void RDMAConnectedSocketImpl::fault() { ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl; /*if (qp) { qp->to_dead(); qp = NULL; }*/ error = ECONNRESET; connected = 1; notify(); } void RDMAConnectedSocketImpl::set_accept_fd(int sd) { tcp_fd = sd; is_server = true; worker->center.submit_to(worker->center.get_id(), [this]() { worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); }, true); }