X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2Frdma%2FRDMAConnectedSocketImpl.cc;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2Frdma%2FRDMAConnectedSocketImpl.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=66dc488cccf3fc7a6b27935a2be6412886304995;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.cc deleted file mode 100644 index 66dc488..0000000 --- a/src/ceph/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ /dev/null @@ -1,644 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 XSKY - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "RDMAStack.h" - -#define dout_subsys ceph_subsys_ms -#undef dout_prefix -#define dout_prefix *_dout << " RDMAConnectedSocketImpl " - -RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, - RDMAWorker *w) - : cct(cct), connected(0), error(0), infiniband(ib), - dispatcher(s), worker(w), lock("RDMAConnectedSocketImpl::lock"), - is_server(false), con_handler(new C_handle_connection(this)), - active(false), pending(false) -{ - qp = infiniband->create_queue_pair( - cct, s->get_tx_cq(), s->get_rx_cq(), IBV_QPT_RC); - my_msg.qpn = qp->get_local_qp_number(); - my_msg.psn = qp->get_initial_psn(); - my_msg.lid = infiniband->get_lid(); - my_msg.peer_qpn = 0; - my_msg.gid = infiniband->get_gid(); - notify_fd = dispatcher->register_qp(qp, this); - dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair); - dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair); -} - -RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() -{ - ldout(cct, 20) << __func__ << " destruct." << dendl; - cleanup(); - worker->remove_pending_conn(this); - dispatcher->erase_qpn(my_msg.qpn); - Mutex::Locker l(lock); - if (notify_fd >= 0) - ::close(notify_fd); - if (tcp_fd >= 0) - ::close(tcp_fd); - error = ECONNRESET; - int ret = 0; - for (unsigned i=0; i < wc.size(); ++i) { - ret = infiniband->post_chunk(reinterpret_cast(wc[i].wr_id)); - assert(ret == 0); - dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); - } - for (unsigned i=0; i < buffers.size(); ++i) { - ret = infiniband->post_chunk(buffers[i]); - assert(ret == 0); - dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); - } -} - -void RDMAConnectedSocketImpl::pass_wc(std::vector &&v) -{ - Mutex::Locker l(lock); - if (wc.empty()) - wc = std::move(v); - else - wc.insert(wc.end(), v.begin(), v.end()); - notify(); -} - -void RDMAConnectedSocketImpl::get_wc(std::vector &w) -{ - Mutex::Locker l(lock); - if (wc.empty()) - return ; - w.swap(wc); -} - -int RDMAConnectedSocketImpl::activate() -{ - ibv_qp_attr qpa; - int r; - - // now connect up the qps and switch to RTR - memset(&qpa, 0, sizeof(qpa)); - qpa.qp_state = IBV_QPS_RTR; - qpa.path_mtu = IBV_MTU_1024; - qpa.dest_qp_num = peer_msg.qpn; - qpa.rq_psn = peer_msg.psn; - qpa.max_dest_rd_atomic = 1; - qpa.min_rnr_timer = 12; - //qpa.ah_attr.is_global = 0; - qpa.ah_attr.is_global = 1; - qpa.ah_attr.grh.hop_limit = 6; - qpa.ah_attr.grh.dgid = peer_msg.gid; - - qpa.ah_attr.grh.sgid_index = infiniband->get_device()->get_gid_idx(); - - qpa.ah_attr.dlid = peer_msg.lid; - qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl; - qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp; - qpa.ah_attr.src_path_bits = 0; - qpa.ah_attr.port_num = (uint8_t)(infiniband->get_ib_physical_port()); - - ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl; - - r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | - IBV_QP_AV | - IBV_QP_PATH_MTU | - IBV_QP_DEST_QPN | - IBV_QP_RQ_PSN | - IBV_QP_MIN_RNR_TIMER | - IBV_QP_MAX_DEST_RD_ATOMIC); - if (r) { - lderr(cct) << __func__ << " failed to transition to RTR state: " - << cpp_strerror(errno) << dendl; - return -1; - } - - ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl; - - // now move to RTS - qpa.qp_state = IBV_QPS_RTS; - - // How long to wait before retrying if packet lost or server dead. - // Supposedly the timeout is 4.096us*2^timeout. However, the actual - // timeout appears to be 4.096us*2^(timeout+1), so the setting - // below creates a 135ms timeout. - qpa.timeout = 14; - - // How many times to retry after timeouts before giving up. - qpa.retry_cnt = 7; - - // How many times to retry after RNR (receiver not ready) condition - // before giving up. Occurs when the remote side has not yet posted - // a receive request. - qpa.rnr_retry = 7; // 7 is infinite retry. - qpa.sq_psn = my_msg.psn; - qpa.max_rd_atomic = 1; - - r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE | - IBV_QP_TIMEOUT | - IBV_QP_RETRY_CNT | - IBV_QP_RNR_RETRY | - IBV_QP_SQ_PSN | - IBV_QP_MAX_QP_RD_ATOMIC); - if (r) { - lderr(cct) << __func__ << " failed to transition to RTS state: " - << cpp_strerror(errno) << dendl; - return -1; - } - - // the queue pair should be ready to use once the client has finished - // setting up their end. - ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl; - ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl; - - if (!is_server) { - connected = 1; //indicate successfully - ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << my_msg.qpn << dendl; - submit(false); - } - active = true; - - return 0; -} - -int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const SocketOptions &opts) { - ldout(cct, 20) << __func__ << " nonblock:" << opts.nonblock << ", nodelay:" - << opts.nodelay << ", rbuf_size: " << opts.rcbuf_size << dendl; - NetHandler net(cct); - tcp_fd = net.connect(peer_addr, opts.connect_bind_addr); - - if (tcp_fd < 0) { - return -errno; - } - net.set_close_on_exec(tcp_fd); - - int r = net.set_socket_options(tcp_fd, opts.nodelay, opts.rcbuf_size); - if (r < 0) { - ::close(tcp_fd); - tcp_fd = -1; - return -errno; - } - - ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl; - net.set_priority(tcp_fd, opts.priority, peer_addr.get_family()); - my_msg.peer_qpn = 0; - r = infiniband->send_msg(cct, tcp_fd, my_msg); - if (r < 0) - return r; - - worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); - return 0; -} - -void RDMAConnectedSocketImpl::handle_connection() { - ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl; - int r = infiniband->recv_msg(cct, tcp_fd, peer_msg); - if (r < 0) { - if (r != -EAGAIN) { - dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); - ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl; - fault(); - } - return; - } - - if (!is_server) {// syn + ack from server - my_msg.peer_qpn = peer_msg.qpn; - ldout(cct, 20) << __func__ << " peer msg : < " << peer_msg.qpn << ", " << peer_msg.psn - << ", " << peer_msg.lid << ", " << peer_msg.peer_qpn << "> " << dendl; - if (!connected) { - r = activate(); - assert(!r); - } - notify(); - r = infiniband->send_msg(cct, tcp_fd, my_msg); - if (r < 0) { - ldout(cct, 1) << __func__ << " send client ack failed." << dendl; - dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); - fault(); - } - } else { - if (peer_msg.peer_qpn == 0) {// syn from client - if (active) { - ldout(cct, 10) << __func__ << " server is already active." << dendl; - return ; - } - r = infiniband->send_msg(cct, tcp_fd, my_msg); - if (r < 0) { - ldout(cct, 1) << __func__ << " server ack failed." << dendl; - dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors); - fault(); - return ; - } - r = activate(); - assert(!r); - } else { // ack from client - connected = 1; - cleanup(); - submit(false); - notify(); - } - } -} - -ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) -{ - uint64_t i = 0; - int r = ::read(notify_fd, &i, sizeof(i)); - ldout(cct, 20) << __func__ << " notify_fd : " << i << " in " << my_msg.qpn << " r = " << r << dendl; - ssize_t read = 0; - if (!buffers.empty()) - read = read_buffers(buf,len); - - std::vector cqe; - get_wc(cqe); - if (cqe.empty()) { - if (!buffers.empty()) { - notify(); - } - if (read > 0) { - return read; - } - if (error) { - return -error; - } else { - return -EAGAIN; - } - } - - ldout(cct, 20) << __func__ << " poll queue got " << cqe.size() << " responses. QP: " << my_msg.qpn << dendl; - for (size_t i = 0; i < cqe.size(); ++i) { - ibv_wc* response = &cqe[i]; - assert(response->status == IBV_WC_SUCCESS); - Chunk* chunk = reinterpret_cast(response->wr_id); - ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl; - chunk->prepare_read(response->byte_len); - worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len); - if (response->byte_len == 0) { - dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin); - if (connected) { - error = ECONNRESET; - ldout(cct, 20) << __func__ << " got remote close msg..." << dendl; - } - assert(infiniband->post_chunk(chunk) == 0); - dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); - } else { - if (read == (ssize_t)len) { - buffers.push_back(chunk); - ldout(cct, 25) << __func__ << " buffers add a chunk: " << response->byte_len << dendl; - } else if (read + response->byte_len > (ssize_t)len) { - read += chunk->read(buf+read, (ssize_t)len-read); - buffers.push_back(chunk); - ldout(cct, 25) << __func__ << " buffers add a chunk: " << chunk->get_offset() << ":" << chunk->get_bound() << dendl; - } else { - read += chunk->read(buf+read, response->byte_len); - assert(infiniband->post_chunk(chunk) == 0); - dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); - } - } - } - - worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size()); - if (is_server && connected == 0) { - ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; - connected = 1; //if so, we don't need the last handshake - cleanup(); - submit(false); - } - - if (!buffers.empty()) { - notify(); - } - - if (read == 0 && error) - return -error; - return read == 0 ? -EAGAIN : read; -} - -ssize_t RDMAConnectedSocketImpl::read_buffers(char* buf, size_t len) -{ - size_t read = 0, tmp = 0; - auto c = buffers.begin(); - for (; c != buffers.end() ; ++c) { - tmp = (*c)->read(buf+read, len-read); - read += tmp; - ldout(cct, 25) << __func__ << " this iter read: " << tmp << " bytes." << " offset: " << (*c)->get_offset() << " ,bound: " << (*c)->get_bound() << ". Chunk:" << *c << dendl; - if ((*c)->over()) { - assert(infiniband->post_chunk(*c) == 0); - dispatcher->perf_logger->dec(l_msgr_rdma_inqueue_rx_chunks); - ldout(cct, 25) << __func__ << " one chunk over." << dendl; - } - if (read == len) { - break; - } - } - - if (c != buffers.end() && (*c)->over()) - ++c; - buffers.erase(buffers.begin(), c); - ldout(cct, 25) << __func__ << " got " << read << " bytes, buffers size: " << buffers.size() << dendl; - return read; -} - -ssize_t RDMAConnectedSocketImpl::zero_copy_read(bufferptr &data) -{ - if (error) - return -error; - static const int MAX_COMPLETIONS = 16; - ibv_wc wc[MAX_COMPLETIONS]; - ssize_t size = 0; - - ibv_wc* response; - Chunk* chunk; - bool loaded = false; - auto iter = buffers.begin(); - if (iter != buffers.end()) { - chunk = *iter; - // FIXME need to handle release - // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); - buffers.erase(iter); - loaded = true; - size = chunk->bound; - } - - std::vector cqe; - get_wc(cqe); - if (cqe.empty()) - return size == 0 ? -EAGAIN : size; - - ldout(cct, 20) << __func__ << " pool completion queue got " << cqe.size() << " responses."<< dendl; - - for (size_t i = 0; i < cqe.size(); ++i) { - response = &wc[i]; - chunk = reinterpret_cast(response->wr_id); - chunk->prepare_read(response->byte_len); - if (!loaded && i == 0) { - // FIXME need to handle release - // auto del = std::bind(&Chunk::post_srq, std::move(chunk), infiniband); - size = chunk->bound; - continue; - } - buffers.push_back(chunk); - iter++; - } - - if (size == 0) - return -EAGAIN; - return size; -} - -ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more) -{ - if (error) { - if (!active) - return -EPIPE; - return -error; - } - size_t bytes = bl.length(); - if (!bytes) - return 0; - { - Mutex::Locker l(lock); - pending_bl.claim_append(bl); - if (!connected) { - ldout(cct, 20) << __func__ << " fake send to upper, QP: " << my_msg.qpn << dendl; - return bytes; - } - } - ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << dendl; - ssize_t r = submit(more); - if (r < 0 && r != -EAGAIN) - return r; - return bytes; -} - -ssize_t RDMAConnectedSocketImpl::submit(bool more) -{ - if (error) - return -error; - Mutex::Locker l(lock); - size_t bytes = pending_bl.length(); - ldout(cct, 20) << __func__ << " we need " << bytes << " bytes. iov size: " - << pending_bl.buffers().size() << dendl; - if (!bytes) - return 0; - - auto fill_tx_via_copy = [this](std::vector &tx_buffers, unsigned bytes, - std::list::const_iterator &start, - std::list::const_iterator &end) -> unsigned { - assert(start != end); - auto chunk_idx = tx_buffers.size(); - int ret = worker->get_reged_mem(this, tx_buffers, bytes); - if (ret == 0) { - ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl; - worker->perf_logger->inc(l_msgr_rdma_tx_no_mem); - return 0; - } - - unsigned total_copied = 0; - Chunk *current_chunk = tx_buffers[chunk_idx]; - while (start != end) { - const uintptr_t addr = reinterpret_cast(start->c_str()); - unsigned copied = 0; - while (copied < start->length()) { - uint32_t r = current_chunk->write((char*)addr+copied, start->length() - copied); - copied += r; - total_copied += r; - bytes -= r; - if (current_chunk->full()){ - current_chunk = tx_buffers[++chunk_idx]; - if (chunk_idx == tx_buffers.size()) - return total_copied; - } - } - ++start; - } - assert(bytes == 0); - return total_copied; - }; - - std::vector tx_buffers; - std::list::const_iterator it = pending_bl.buffers().begin(); - std::list::const_iterator copy_it = it; - unsigned total = 0; - unsigned need_reserve_bytes = 0; - while (it != pending_bl.buffers().end()) { - if (infiniband->is_tx_buffer(it->raw_c_str())) { - if (need_reserve_bytes) { - unsigned copied = fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); - total += copied; - if (copied < need_reserve_bytes) - goto sending; - need_reserve_bytes = 0; - } - assert(copy_it == it); - tx_buffers.push_back(infiniband->get_tx_chunk_by_buffer(it->raw_c_str())); - total += it->length(); - ++copy_it; - } else { - need_reserve_bytes += it->length(); - } - ++it; - } - if (need_reserve_bytes) - total += fill_tx_via_copy(tx_buffers, need_reserve_bytes, copy_it, it); - - sending: - if (total == 0) - return -EAGAIN; - assert(total <= pending_bl.length()); - bufferlist swapped; - if (total < pending_bl.length()) { - worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem); - pending_bl.splice(total, pending_bl.length()-total, &swapped); - pending_bl.swap(swapped); - } else { - pending_bl.clear(); - } - - ldout(cct, 20) << __func__ << " left bytes: " << pending_bl.length() << " in buffers " - << pending_bl.buffers().size() << " tx chunks " << tx_buffers.size() << dendl; - - int r = post_work_request(tx_buffers); - if (r < 0) - return r; - - ldout(cct, 20) << __func__ << " finished sending " << bytes << " bytes." << dendl; - return pending_bl.length() ? -EAGAIN : 0; -} - -int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) -{ - ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " " << tx_buffers[0] << dendl; - vector::iterator current_buffer = tx_buffers.begin(); - ibv_sge isge[tx_buffers.size()]; - uint32_t current_sge = 0; - ibv_send_wr iswr[tx_buffers.size()]; - uint32_t current_swr = 0; - ibv_send_wr* pre_wr = NULL; - - memset(iswr, 0, sizeof(iswr)); - memset(isge, 0, sizeof(isge)); - current_buffer = tx_buffers.begin(); - while (current_buffer != tx_buffers.end()) { - isge[current_sge].addr = reinterpret_cast((*current_buffer)->buffer); - isge[current_sge].length = (*current_buffer)->get_offset(); - isge[current_sge].lkey = (*current_buffer)->mr->lkey; - ldout(cct, 25) << __func__ << " sending buffer: " << *current_buffer << " length: " << isge[current_sge].length << dendl; - - iswr[current_swr].wr_id = reinterpret_cast(*current_buffer); - iswr[current_swr].next = NULL; - iswr[current_swr].sg_list = &isge[current_sge]; - iswr[current_swr].num_sge = 1; - iswr[current_swr].opcode = IBV_WR_SEND; - iswr[current_swr].send_flags = IBV_SEND_SIGNALED; - /*if (isge[current_sge].length < infiniband->max_inline_data) { - iswr[current_swr].send_flags = IBV_SEND_INLINE; - ldout(cct, 20) << __func__ << " send_inline." << dendl; - }*/ - - worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length); - if (pre_wr) - pre_wr->next = &iswr[current_swr]; - pre_wr = &iswr[current_swr]; - ++current_sge; - ++current_swr; - ++current_buffer; - } - - ibv_send_wr *bad_tx_work_request; - if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) { - ldout(cct, 1) << __func__ << " failed to send data" - << " (most probably should be peer not ready): " - << cpp_strerror(errno) << dendl; - worker->perf_logger->inc(l_msgr_rdma_tx_failed); - return -errno; - } - worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size()); - ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl; - return 0; -} - -void RDMAConnectedSocketImpl::fin() { - ibv_send_wr wr; - memset(&wr, 0, sizeof(wr)); - wr.wr_id = reinterpret_cast(qp); - wr.num_sge = 0; - wr.opcode = IBV_WR_SEND; - wr.send_flags = IBV_SEND_SIGNALED; - ibv_send_wr* bad_tx_work_request; - if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { - ldout(cct, 1) << __func__ << " failed to send message=" - << " ibv_post_send failed(most probably should be peer not ready): " - << cpp_strerror(errno) << dendl; - worker->perf_logger->inc(l_msgr_rdma_tx_failed); - return ; - } -} - -void RDMAConnectedSocketImpl::cleanup() { - if (con_handler && tcp_fd >= 0) { - (static_cast(con_handler))->close(); - worker->center.submit_to(worker->center.get_id(), [this]() { - worker->center.delete_file_event(tcp_fd, EVENT_READABLE); - }, false); - delete con_handler; - con_handler = nullptr; - } -} - -void RDMAConnectedSocketImpl::notify() -{ - uint64_t i = 1; - int ret; - - ret = write(notify_fd, &i, sizeof(i)); - assert(ret = sizeof(i)); -} - -void RDMAConnectedSocketImpl::shutdown() -{ - if (!error) - fin(); - error = ECONNRESET; - active = false; -} - -void RDMAConnectedSocketImpl::close() -{ - if (!error) - fin(); - error = ECONNRESET; - active = false; -} - -void RDMAConnectedSocketImpl::fault() -{ - ldout(cct, 1) << __func__ << " tcp fd " << tcp_fd << dendl; - /*if (qp) { - qp->to_dead(); - qp = NULL; - }*/ - error = ECONNRESET; - connected = 1; - notify(); -} - -void RDMAConnectedSocketImpl::set_accept_fd(int sd) -{ - tcp_fd = sd; - is_server = true; - worker->center.submit_to(worker->center.get_id(), [this]() { - worker->center.create_file_event(tcp_fd, EVENT_READABLE, con_handler); - }, true); -}