X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2Frdma%2FRDMAStack.h;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2Frdma%2FRDMAStack.h;h=3c2d90094d140ad374d8926896ceabb57df36592;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/rdma/RDMAStack.h b/src/ceph/src/msg/async/rdma/RDMAStack.h new file mode 100644 index 0000000..3c2d900 --- /dev/null +++ b/src/ceph/src/msg/async/rdma/RDMAStack.h @@ -0,0 +1,320 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_RDMASTACK_H +#define CEPH_MSG_RDMASTACK_H + +#include + +#include +#include +#include + +#include "common/ceph_context.h" +#include "common/debug.h" +#include "common/errno.h" +#include "msg/async/Stack.h" +#include "Infiniband.h" + +class RDMAConnectedSocketImpl; +class RDMAServerSocketImpl; +class RDMAStack; +class RDMAWorker; + +enum { + l_msgr_rdma_dispatcher_first = 94000, + + l_msgr_rdma_polling, + l_msgr_rdma_inflight_tx_chunks, + l_msgr_rdma_inqueue_rx_chunks, + + l_msgr_rdma_tx_total_wc, + l_msgr_rdma_tx_total_wc_errors, + l_msgr_rdma_tx_wc_retry_errors, + l_msgr_rdma_tx_wc_wr_flush_errors, + + l_msgr_rdma_rx_total_wc, + l_msgr_rdma_rx_total_wc_errors, + l_msgr_rdma_rx_fin, + + l_msgr_rdma_handshake_errors, + + l_msgr_rdma_total_async_events, + l_msgr_rdma_async_last_wqe_events, + + l_msgr_rdma_created_queue_pair, + l_msgr_rdma_active_queue_pair, + + l_msgr_rdma_dispatcher_last, +}; + + +class RDMADispatcher { + typedef Infiniband::MemoryManager::Chunk Chunk; + typedef Infiniband::QueuePair QueuePair; + + std::thread t; + CephContext *cct; + Infiniband::CompletionQueue* tx_cq; + Infiniband::CompletionQueue* rx_cq; + Infiniband::CompletionChannel *tx_cc, *rx_cc; + EventCallbackRef async_handler; + bool done = false; + std::atomic num_dead_queue_pair = {0}; + std::atomic num_qp_conn = {0}; + Mutex lock; // protect `qp_conns`, `dead_queue_pairs` + // qp_num -> InfRcConnection + // The main usage of `qp_conns` is looking up connection by qp_num, + // so the lifecycle of element in `qp_conns` is the lifecycle of qp. + //// make qp queue into dead state + /** + * 1. Connection call mark_down + * 2. Move the Queue Pair into the Error state(QueuePair::to_dead) + * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event) + * 4. Wait for CQ to be empty(handle_tx_event) + * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event) + * + * @param qp The qp needed to dead + */ + ceph::unordered_map > qp_conns; + + /// if a queue pair is closed when transmit buffers are active + /// on it, the transmit buffers never get returned via tx_cq. To + /// work around this problem, don't delete queue pairs immediately. Instead, + /// save them in this vector and delete them at a safe time, when there are + /// no outstanding transmit buffers to be lost. + std::vector dead_queue_pairs; + + std::atomic num_pending_workers = {0}; + Mutex w_lock; // protect pending workers + // fixme: lockfree + std::list pending_workers; + RDMAStack* stack; + + class C_handle_cq_async : public EventCallback { + RDMADispatcher *dispatcher; + public: + C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {} + void do_request(int fd) { + // worker->handle_tx_event(); + dispatcher->handle_async_event(); + } + }; + + public: + PerfCounters *perf_logger; + + explicit RDMADispatcher(CephContext* c, RDMAStack* s); + virtual ~RDMADispatcher(); + void handle_async_event(); + + void polling_start(); + void polling_stop(); + void polling(); + int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi); + void make_pending_worker(RDMAWorker* w) { + Mutex::Locker l(w_lock); + auto it = std::find(pending_workers.begin(), pending_workers.end(), w); + if (it != pending_workers.end()) + return; + pending_workers.push_back(w); + ++num_pending_workers; + } + RDMAStack* get_stack() { return stack; } + RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp); + void erase_qpn_lockless(uint32_t qpn); + void erase_qpn(uint32_t qpn); + Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; } + Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; } + void notify_pending_workers(); + void handle_tx_event(ibv_wc *cqe, int n); + void post_tx_buffer(std::vector &chunks); + + std::atomic inflight = {0}; +}; + + +enum { + l_msgr_rdma_first = 95000, + + l_msgr_rdma_tx_no_mem, + l_msgr_rdma_tx_parital_mem, + l_msgr_rdma_tx_failed, + l_msgr_rdma_rx_no_registered_mem, + + l_msgr_rdma_tx_chunks, + l_msgr_rdma_tx_bytes, + l_msgr_rdma_rx_chunks, + l_msgr_rdma_rx_bytes, + l_msgr_rdma_pending_sent_conns, + + l_msgr_rdma_last, +}; + +class RDMAWorker : public Worker { + typedef Infiniband::CompletionQueue CompletionQueue; + typedef Infiniband::CompletionChannel CompletionChannel; + typedef Infiniband::MemoryManager::Chunk Chunk; + typedef Infiniband::MemoryManager MemoryManager; + typedef std::vector::iterator ChunkIter; + RDMAStack *stack; + EventCallbackRef tx_handler; + std::list pending_sent_conns; + RDMADispatcher* dispatcher = nullptr; + Mutex lock; + + class C_handle_cq_tx : public EventCallback { + RDMAWorker *worker; + public: + C_handle_cq_tx(RDMAWorker *w): worker(w) {} + void do_request(int fd) { + worker->handle_pending_message(); + } + }; + + public: + PerfCounters *perf_logger; + explicit RDMAWorker(CephContext *c, unsigned i); + virtual ~RDMAWorker(); + virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override; + virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; + virtual void initialize() override; + RDMAStack *get_stack() { return stack; } + int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes); + void remove_pending_conn(RDMAConnectedSocketImpl *o) { + assert(center.in_thread()); + pending_sent_conns.remove(o); + } + void handle_pending_message(); + void set_stack(RDMAStack *s) { stack = s; } + void notify_worker() { + center.dispatch_event_external(tx_handler); + } +}; + +class RDMAConnectedSocketImpl : public ConnectedSocketImpl { + public: + typedef Infiniband::MemoryManager::Chunk Chunk; + typedef Infiniband::CompletionChannel CompletionChannel; + typedef Infiniband::CompletionQueue CompletionQueue; + + private: + CephContext *cct; + Infiniband::QueuePair *qp; + IBSYNMsg peer_msg; + IBSYNMsg my_msg; + int connected; + int error; + Infiniband* infiniband; + RDMADispatcher* dispatcher; + RDMAWorker* worker; + std::vector buffers; + int notify_fd = -1; + bufferlist pending_bl; + + Mutex lock; + std::vector wc; + bool is_server; + EventCallbackRef con_handler; + int tcp_fd = -1; + bool active;// qp is active ? + bool pending; + + void notify(); + ssize_t read_buffers(char* buf, size_t len); + int post_work_request(std::vector&); + + public: + RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, + RDMAWorker *w); + virtual ~RDMAConnectedSocketImpl(); + + void pass_wc(std::vector &&v); + void get_wc(std::vector &w); + virtual int is_connected() override { return connected; } + + virtual ssize_t read(char* buf, size_t len) override; + virtual ssize_t zero_copy_read(bufferptr &data) override; + virtual ssize_t send(bufferlist &bl, bool more) override; + virtual void shutdown() override; + virtual void close() override; + virtual int fd() const override { return notify_fd; } + void fault(); + const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); } + ssize_t submit(bool more); + int activate(); + void fin(); + void handle_connection(); + void cleanup(); + void set_accept_fd(int sd); + int try_connect(const entity_addr_t&, const SocketOptions &opt); + bool is_pending() {return pending;} + void set_pending(bool val) {pending = val;} + class C_handle_connection : public EventCallback { + RDMAConnectedSocketImpl *csi; + bool active; + public: + C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {} + void do_request(int fd) { + if (active) + csi->handle_connection(); + } + void close() { + active = false; + } + }; +}; + +class RDMAServerSocketImpl : public ServerSocketImpl { + CephContext *cct; + NetHandler net; + int server_setup_socket; + Infiniband* infiniband; + RDMADispatcher *dispatcher; + RDMAWorker *worker; + entity_addr_t sa; + + public: + RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a); + + int listen(entity_addr_t &sa, const SocketOptions &opt); + virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; + virtual void abort_accept() override; + virtual int fd() const override { return server_setup_socket; } + int get_fd() { return server_setup_socket; } +}; + +class RDMAStack : public NetworkStack { + vector threads; + RDMADispatcher *dispatcher; + + std::atomic fork_finished = {false}; + + public: + explicit RDMAStack(CephContext *cct, const string &t); + virtual ~RDMAStack(); + virtual bool support_zero_copy_read() const override { return false; } + virtual bool nonblock_connect_need_writable_event() const { return false; } + + virtual void spawn_worker(unsigned i, std::function &&func) override; + virtual void join_worker(unsigned i) override; + RDMADispatcher *get_dispatcher() { return dispatcher; } + + virtual bool is_ready() override { return fork_finished.load(); }; + virtual void ready() override { fork_finished = true; }; +}; + +#endif