X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2Frdma%2FRDMAConnectedSocketImpl.cc;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2Frdma%2FRDMAConnectedSocketImpl.cc;h=66dc488cccf3fc7a6b27935a2be6412886304995;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.cc new file mode 100644 index 0000000..66dc488 --- /dev/null +++ b/src/ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -0,0 +1,644 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "RDMAStack.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << " RDMAConnectedSocketImpl " + +RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, + RDMAWorker *w) + : cct(cct), connected(0), error(0), infiniband(ib), + dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"), + is_server(false), con_handler(new C_handle_connection(this)), + active(false), pending(false) +{ + qp = infiniband->create_queue_pair( + cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC); + my_msg.qpn = qp->get_local_qp_number(); + my_msg.psn = qp->get_initial_psn(); + my_msg.lid = infiniband->get_lid(); + my_msg.peer_qpn = 0; + my_msg.gid = infiniband->get_gid(); + notify_fd = dispatcher->register_qp(qp, this); + dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair); + dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair); +} + +RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() +{ + ldout(cct, 20) << __func__ << " destruct." << dendl; + cleanup(); + worker->remove_pending_conn(this); + dispatcher->erase_qpn(my_msg.qpn); + Mutex::Locker l(lock); + if (notify_fd >= 0) + ::close(notify_fd); + if (tcp_fd >= 0) + ::close(tcp_fd); + error = ECONNRESET; + int ret = 0; + for (unsigned i=0; i < wc.size(); ++i) { + ret = infiniband->post_chunk(reinterpret_cast(wc[i].wr_id)); + assert(ret == 0); + dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); + } + for (unsigned i=0; i < buffers.size(); ++i) { + ret = infiniband->post_chunk(buffers[i]); + assert(ret == 0); + dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); + } +} + +void RDMAConnectedSocketImpl::pass_wc(std::vector &&v) +{ + Mutex::Locker l(lock); + if (wc.empty()) + wc = std::move(v); + else + wc.insert(wc.end(), v.begin(), v.end()); + notify(); +} + +void RDMAConnectedSocketImpl::get_wc(std::vector &w) +{ + Mutex::Locker l(lock); + if (wc.empty()) + return ; + w.swap(wc); +} + +int RDMAConnectedSocketImpl::activate() +{ + ibv_qp_attr qpa; + int r; + + // now connect up the qps and switch to RTR + memset(&qpa, 0, sizeof(qpa)); + qpa.qp_state = IBV_QPS_RTR; + qpa.path_mtu = IBV_MTU_1024; + qpa.dest_qp_num = peer_msg.qpn; + qpa.rq_psn = peer_msg.psn; + qpa.max_dest_rd_atomic = 1; + qpa.min_rnr_timer = 12; + //qpa.ah_attr.is_global = 0; + qpa.ah_attr.is_global = 1; + qpa.ah_attr.grh.hop_limit = 6; + qpa.ah_attr.grh.dgid = peer_msg.gid; + + qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx(); + + qpa.ah_attr.dlid = peer_msg.lid; + qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; + qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp; + qpa.ah_attr.src_path_bits = 0; + qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port()); + + ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl; + + r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | + IBV_QP_AV | + IBV_QP_PATH_MTU | + IBV_QP_DEST_QPN | + IBV_QP_RQ_PSN | + IBV_QP_MIN_RNR_TIMER | + IBV_QP_MAX_DEST_RD_ATOMIC); + if (r) { + lderr(cct) << __func__ << " failed to transition to RTR state: " + << cpp_strerror(errno) << dendl; + return -1; + } + + ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl; + + // now move to RTS + qpa.qp_state = IBV_QPS_RTS; + + // How long to wait before retrying if packet lost or server dead. + // Supposedly the timeout is 4.096us*2^timeout. However, the actual + // timeout appears to be 4.096us*2^(timeout+1), so the setting + // below creates a 135ms timeout. + qpa.timeout = 14; + + // How many times to retry after timeouts before giving up. + qpa.retry_cnt = 7; + + // How many times to retry after RNR (receiver not ready) condition + // before giving up. Occurs when the remote side has not yet posted + // a receive request. + qpa.rnr_retry = 7; // 7 is infinite retry. + qpa.sq_psn = my_msg.psn; + qpa.max_rd_atomic = 1; + + r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | + IBV_QP_TIMEOUT | + IBV_QP_RETRY_CNT | + IBV_QP_RNR_RETRY | + IBV_QP_SQ_PSN | + IBV_QP_MAX_QP_RD_ATOMIC); + if (r) { + lderr(cct) << __func__ << " failed to transition to RTS state: " + << cpp_strerror(errno) << dendl; + return -1; + } + + // the queue pair should be ready to use once the client has finished + // setting up their end. + ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl; + ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl; + + if (!is_server) { + connected = 1; //indicate successfully + ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl; + submit(false); + } + active = true; + + return 0; +} + +int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { + ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" + << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; + NetHandler net(cct); + tcp_fd = net.connect(peer_addr, opts.connect_bind_addr); + + if (tcp_fd < 0) { + return -errno; + } + net.set_close_on_exec(tcp_fd); + + int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size); + if (r < 0) { + ::close(tcp_fd); + tcp_fd = -1; + return -errno; + } + + ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; + net.set_priority(tcp_fd, opts.priority, peer_addr.get_family()); + my_msg.peer_qpn = 0; + r = infiniband->send_msg(cct, tcp_fd, my_msg); + if (r < 0) + return r; + + worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); + return 0; +} + +void RDMAConnectedSocketImpl::handle_connection() { + ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl; + int r = infiniband->recv_msg(cct, tcp_fd, peer_msg); + if (r < 0) { + if (r != -EAGAIN) { + dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); + ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl; + fault(); + } + return; + } + + if (!is_server) {// syn + ack from server + my_msg.peer_qpn = peer_msg.qpn; + ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn + << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl; + if (!connected) { + r = activate(); + assert(!r); + } + notify(); + r = infiniband->send_msg(cct, tcp_fd, my_msg); + if (r < 0) { + ldout(cct, 1) << __func__ << " send client ack failed." << dendl; + dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); + fault(); + } + } else { + if (peer_msg.peer_qpn == 0) {// syn from client + if (active) { + ldout(cct, 10) << __func__ << " server is already active." << dendl; + return ; + } + r = infiniband->send_msg(cct, tcp_fd, my_msg); + if (r < 0) { + ldout(cct, 1) << __func__ << " server ack failed." << dendl; + dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); + fault(); + return ; + } + r = activate(); + assert(!r); + } else { // ack from client + connected = 1; + cleanup(); + submit(false); + notify(); + } + } +} + +ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) +{ + uint64_t i = 0; + int r = ::read(notify_fd, &i, sizeof(i)); + ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl; + ssize_t read = 0; + if (!buffers.empty()) + read = read_buffers(buf,len); + + std::vector cqe; + get_wc(cqe); + if (cqe.empty()) { + if (!buffers.empty()) { + notify(); + } + if (read > 0) { + return read; + } + if (error) { + return -error; + } else { + return -EAGAIN; + } + } + + ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl; + for (size_t i = 0; i < cqe.size(); ++i) { + ibv_wc* response = &cqe[i]; + assert(response->status == IBV_WC_SUCCESS); + Chunk* chunk = reinterpret_cast(response->wr_id); + ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl; + chunk->prepare_read(response->byte_len); + worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len); + if (response->byte_len == 0) { + dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin); + if (connected) { + error = ECONNRESET; + ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; + } + assert(infiniband->post_chunk(chunk) == 0); + dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); + } else { + if (read == (ssize_t)len) { + buffers.push_back(chunk); + ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; + } else if (read + response->byte_len > (ssize_t)len) { + read += chunk->read(buf+read, (ssize_t)len-read); + buffers.push_back(chunk); + ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; + } else { + read += chunk->read(buf+read, response->byte_len); + assert(infiniband->post_chunk(chunk) == 0); + dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); + } + } + } + + worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size()); + if (is_server && connected == 0) { + ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; + connected = 1; //if so, we don't need the last handshake + cleanup(); + submit(false); + } + + if (!buffers.empty()) { + notify(); + } + + if (read == 0 && error) + return -error; + return read == 0 ? -EAGAIN : read; +} + +ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) +{ + size_t read = 0, tmp = 0; + auto c = buffers.begin(); + for (; c != buffers.end() ; ++c) { + tmp = (*c)->read(buf+read, len-read); + read += tmp; + ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl; + if ((*c)->over()) { + assert(infiniband->post_chunk(*c) == 0); + dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); + ldout(cct, 25) << __func__ << " one chunk over." << dendl; + } + if (read == len) { + break; + } + } + + if (c != buffers.end() && (*c)->over()) + ++c; + buffers.erase(buffers.begin(), c); + ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl; + return read; +} + +ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) +{ + if (error) + return -error; + static const int MAX_COMPLETIONS = 16; + ibv_wc wc[MAX_COMPLETIONS]; + ssize_t size = 0; + + ibv_wc* response; + Chunk* chunk; + bool loaded = false; + auto iter = buffers.begin(); + if (iter != buffers.end()) { + chunk = *iter; + // FIXME need to handle release + // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); + buffers.erase(iter); + loaded = true; + size = chunk->bound; + } + + std::vector cqe; + get_wc(cqe); + if (cqe.empty()) + return size == 0 ? -EAGAIN : size; + + ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl; + + for (size_t i = 0; i < cqe.size(); ++i) { + response = &wc[i]; + chunk = reinterpret_cast(response->wr_id); + chunk->prepare_read(response->byte_len); + if (!loaded && i == 0) { + // FIXME need to handle release + // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); + size = chunk->bound; + continue; + } + buffers.push_back(chunk); + iter++; + } + + if (size == 0) + return -EAGAIN; + return size; +} + +ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) +{ + if (error) { + if (!active) + return -EPIPE; + return -error; + } + size_t bytes = bl.length(); + if (!bytes) + return 0; + { + Mutex::Locker l(lock); + pending_bl.claim_append(bl); + if (!connected) { + ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl; + return bytes; + } + } + ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl; + ssize_t r = submit(more); + if (r < 0 && r != -EAGAIN) + return r; + return bytes; +} + +ssize_t RDMAConnectedSocketImpl::submit(bool more) +{ + if (error) + return -error; + Mutex::Locker l(lock); + size_t bytes = pending_bl.length(); + ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: " + << pending_bl.buffers().size() << dendl; + if (!bytes) + return 0; + + auto fill_tx_via_copy = [this](std::vector &tx_buffers, unsigned bytes, + std::list::const_iterator &start, + std::list::const_iterator &end) -> unsigned { + assert(start != end); + auto chunk_idx = tx_buffers.size(); + int ret = worker->get_reged_mem(this, tx_buffers, bytes); + if (ret == 0) { + ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_no_mem); + return 0; + } + + unsigned total_copied = 0; + Chunk *current_chunk = tx_buffers[chunk_idx]; + while (start != end) { + const uintptr_t addr = reinterpret_cast(start->c_str()); + unsigned copied = 0; + while (copied < start->length()) { + uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied); + copied += r; + total_copied += r; + bytes -= r; + if (current_chunk->full()){ + current_chunk = tx_buffers[++chunk_idx]; + if (chunk_idx == tx_buffers.size()) + return total_copied; + } + } + ++start; + } + assert(bytes == 0); + return total_copied; + }; + + std::vector tx_buffers; + std::list::const_iterator it = pending_bl.buffers().begin(); + std::list::const_iterator copy_it = it; + unsigned total = 0; + unsigned need_reserve_bytes = 0; + while (it != pending_bl.buffers().end()) { + if (infiniband->is_tx_buffer(it->raw_c_str())) { + if (need_reserve_bytes) { + unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); + total += copied; + if (copied < need_reserve_bytes) + goto sending; + need_reserve_bytes = 0; + } + assert(copy_it == it); + tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str())); + total += it->length(); + ++copy_it; + } else { + need_reserve_bytes += it->length(); + } + ++it; + } + if (need_reserve_bytes) + total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); + + sending: + if (total == 0) + return -EAGAIN; + assert(total <= pending_bl.length()); + bufferlist swapped; + if (total < pending_bl.length()) { + worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem); + pending_bl.splice(total, pending_bl.length()-total, &swapped); + pending_bl.swap(swapped); + } else { + pending_bl.clear(); + } + + ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers " + << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl; + + int r = post_work_request(tx_buffers); + if (r < 0) + return r; + + ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl; + return pending_bl.length() ? -EAGAIN : 0; +} + +int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) +{ + ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl; + vector::iterator current_buffer = tx_buffers.begin(); + ibv_sge isge[tx_buffers.size()]; + uint32_t current_sge = 0; + ibv_send_wr iswr[tx_buffers.size()]; + uint32_t current_swr = 0; + ibv_send_wr* pre_wr = NULL; + + memset(iswr, 0, sizeof(iswr)); + memset(isge, 0, sizeof(isge)); + current_buffer = tx_buffers.begin(); + while (current_buffer != tx_buffers.end()) { + isge[current_sge].addr = reinterpret_cast((*current_buffer)->buffer); + isge[current_sge].length = (*current_buffer)->get_offset(); + isge[current_sge].lkey = (*current_buffer)->mr->lkey; + ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl; + + iswr[current_swr].wr_id = reinterpret_cast(*current_buffer); + iswr[current_swr].next = NULL; + iswr[current_swr].sg_list = &isge[current_sge]; + iswr[current_swr].num_sge = 1; + iswr[current_swr].opcode = IBV_WR_SEND; + iswr[current_swr].send_flags = IBV_SEND_SIGNALED; + /*if (isge[current_sge].length < infiniband->max_inline_data) { + iswr[current_swr].send_flags = IBV_SEND_INLINE; + ldout(cct, 20) << __func__ << " send_inline." << dendl; + }*/ + + worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length); + if (pre_wr) + pre_wr->next = &iswr[current_swr]; + pre_wr = &iswr[current_swr]; + ++current_sge; + ++current_swr; + ++current_buffer; + } + + ibv_send_wr *bad_tx_work_request; + if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) { + ldout(cct, 1) << __func__ << " failed to send data" + << " (most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_failed); + return -errno; + } + worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size()); + ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; + return 0; +} + +void RDMAConnectedSocketImpl::fin() { + ibv_send_wr wr; + memset(&wr, 0, sizeof(wr)); + wr.wr_id = reinterpret_cast(qp); + wr.num_sge = 0; + wr.opcode = IBV_WR_SEND; + wr.send_flags = IBV_SEND_SIGNALED; + ibv_send_wr* bad_tx_work_request; + if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { + ldout(cct, 1) << __func__ << " failed to send message=" + << " ibv_post_send failed(most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_failed); + return ; + } +} + +void RDMAConnectedSocketImpl::cleanup() { + if (con_handler && tcp_fd >= 0) { + (static_cast(con_handler))->close(); + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.delete_file_event(tcp_fd, EVENT_READABLE); + }, false); + delete con_handler; + con_handler = nullptr; + } +} + +void RDMAConnectedSocketImpl::notify() +{ + uint64_t i = 1; + int ret; + + ret = write(notify_fd, &i, sizeof(i)); + assert(ret = sizeof(i)); +} + +void RDMAConnectedSocketImpl::shutdown() +{ + if (!error) + fin(); + error = ECONNRESET; + active = false; +} + +void RDMAConnectedSocketImpl::close() +{ + if (!error) + fin(); + error = ECONNRESET; + active = false; +} + +void RDMAConnectedSocketImpl::fault() +{ + ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl; + /*if (qp) { + qp->to_dead(); + qp = NULL; + }*/ + error = ECONNRESET; + connected = 1; + notify(); +} + +void RDMAConnectedSocketImpl::set_accept_fd(int sd) +{ + tcp_fd = sd; + is_server = true; + worker->center.submit_to(worker->center.get_id(), [this]() { + worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); + }, true); +}