X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2Frdma%2FRDMAStack.h;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2Frdma%2FRDMAStack.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=3c2d90094d140ad374d8926896ceabb57df36592;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/rdma/RDMAStack.h b/src/ceph/src/msg/async/rdma/RDMAStack.h deleted file mode 100644 index 3c2d900..0000000 --- a/src/ceph/src/msg/async/rdma/RDMAStack.h +++ /dev/null @@ -1,320 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 XSKY - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_MSG_RDMASTACK_H -#define CEPH_MSG_RDMASTACK_H - -#include - -#include -#include -#include - -#include "common/ceph_context.h" -#include "common/debug.h" -#include "common/errno.h" -#include "msg/async/Stack.h" -#include "Infiniband.h" - -class RDMAConnectedSocketImpl; -class RDMAServerSocketImpl; -class RDMAStack; -class RDMAWorker; - -enum { - l_msgr_rdma_dispatcher_first = 94000, - - l_msgr_rdma_polling, - l_msgr_rdma_inflight_tx_chunks, - l_msgr_rdma_inqueue_rx_chunks, - - l_msgr_rdma_tx_total_wc, - l_msgr_rdma_tx_total_wc_errors, - l_msgr_rdma_tx_wc_retry_errors, - l_msgr_rdma_tx_wc_wr_flush_errors, - - l_msgr_rdma_rx_total_wc, - l_msgr_rdma_rx_total_wc_errors, - l_msgr_rdma_rx_fin, - - l_msgr_rdma_handshake_errors, - - l_msgr_rdma_total_async_events, - l_msgr_rdma_async_last_wqe_events, - - l_msgr_rdma_created_queue_pair, - l_msgr_rdma_active_queue_pair, - - l_msgr_rdma_dispatcher_last, -}; - - -class RDMADispatcher { - typedef Infiniband::MemoryManager::Chunk Chunk; - typedef Infiniband::QueuePair QueuePair; - - std::thread t; - CephContext *cct; - Infiniband::CompletionQueue* tx_cq; - Infiniband::CompletionQueue* rx_cq; - Infiniband::CompletionChannel *tx_cc, *rx_cc; - EventCallbackRef async_handler; - bool done = false; - std::atomic num_dead_queue_pair = {0}; - std::atomic num_qp_conn = {0}; - Mutex lock; // protect `qp_conns`, `dead_queue_pairs` - // qp_num -> InfRcConnection - // The main usage of `qp_conns` is looking up connection by qp_num, - // so the lifecycle of element in `qp_conns` is the lifecycle of qp. - //// make qp queue into dead state - /** - * 1. Connection call mark_down - * 2. Move the Queue Pair into the Error state(QueuePair::to_dead) - * 3. Wait for the affiliated event IBV_EVENT_QP_LAST_WQE_REACHED(handle_async_event) - * 4. Wait for CQ to be empty(handle_tx_event) - * 5. Destroy the QP by calling ibv_destroy_qp()(handle_tx_event) - * - * @param qp The qp needed to dead - */ - ceph::unordered_map > qp_conns; - - /// if a queue pair is closed when transmit buffers are active - /// on it, the transmit buffers never get returned via tx_cq. To - /// work around this problem, don't delete queue pairs immediately. Instead, - /// save them in this vector and delete them at a safe time, when there are - /// no outstanding transmit buffers to be lost. - std::vector dead_queue_pairs; - - std::atomic num_pending_workers = {0}; - Mutex w_lock; // protect pending workers - // fixme: lockfree - std::list pending_workers; - RDMAStack* stack; - - class C_handle_cq_async : public EventCallback { - RDMADispatcher *dispatcher; - public: - C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {} - void do_request(int fd) { - // worker->handle_tx_event(); - dispatcher->handle_async_event(); - } - }; - - public: - PerfCounters *perf_logger; - - explicit RDMADispatcher(CephContext* c, RDMAStack* s); - virtual ~RDMADispatcher(); - void handle_async_event(); - - void polling_start(); - void polling_stop(); - void polling(); - int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi); - void make_pending_worker(RDMAWorker* w) { - Mutex::Locker l(w_lock); - auto it = std::find(pending_workers.begin(), pending_workers.end(), w); - if (it != pending_workers.end()) - return; - pending_workers.push_back(w); - ++num_pending_workers; - } - RDMAStack* get_stack() { return stack; } - RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp); - void erase_qpn_lockless(uint32_t qpn); - void erase_qpn(uint32_t qpn); - Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; } - Infiniband::CompletionQueue* get_rx_cq() const { return rx_cq; } - void notify_pending_workers(); - void handle_tx_event(ibv_wc *cqe, int n); - void post_tx_buffer(std::vector &chunks); - - std::atomic inflight = {0}; -}; - - -enum { - l_msgr_rdma_first = 95000, - - l_msgr_rdma_tx_no_mem, - l_msgr_rdma_tx_parital_mem, - l_msgr_rdma_tx_failed, - l_msgr_rdma_rx_no_registered_mem, - - l_msgr_rdma_tx_chunks, - l_msgr_rdma_tx_bytes, - l_msgr_rdma_rx_chunks, - l_msgr_rdma_rx_bytes, - l_msgr_rdma_pending_sent_conns, - - l_msgr_rdma_last, -}; - -class RDMAWorker : public Worker { - typedef Infiniband::CompletionQueue CompletionQueue; - typedef Infiniband::CompletionChannel CompletionChannel; - typedef Infiniband::MemoryManager::Chunk Chunk; - typedef Infiniband::MemoryManager MemoryManager; - typedef std::vector::iterator ChunkIter; - RDMAStack *stack; - EventCallbackRef tx_handler; - std::list pending_sent_conns; - RDMADispatcher* dispatcher = nullptr; - Mutex lock; - - class C_handle_cq_tx : public EventCallback { - RDMAWorker *worker; - public: - C_handle_cq_tx(RDMAWorker *w): worker(w) {} - void do_request(int fd) { - worker->handle_pending_message(); - } - }; - - public: - PerfCounters *perf_logger; - explicit RDMAWorker(CephContext *c, unsigned i); - virtual ~RDMAWorker(); - virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override; - virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; - virtual void initialize() override; - RDMAStack *get_stack() { return stack; } - int get_reged_mem(RDMAConnectedSocketImpl *o, std::vector &c, size_t bytes); - void remove_pending_conn(RDMAConnectedSocketImpl *o) { - assert(center.in_thread()); - pending_sent_conns.remove(o); - } - void handle_pending_message(); - void set_stack(RDMAStack *s) { stack = s; } - void notify_worker() { - center.dispatch_event_external(tx_handler); - } -}; - -class RDMAConnectedSocketImpl : public ConnectedSocketImpl { - public: - typedef Infiniband::MemoryManager::Chunk Chunk; - typedef Infiniband::CompletionChannel CompletionChannel; - typedef Infiniband::CompletionQueue CompletionQueue; - - private: - CephContext *cct; - Infiniband::QueuePair *qp; - IBSYNMsg peer_msg; - IBSYNMsg my_msg; - int connected; - int error; - Infiniband* infiniband; - RDMADispatcher* dispatcher; - RDMAWorker* worker; - std::vector buffers; - int notify_fd = -1; - bufferlist pending_bl; - - Mutex lock; - std::vector wc; - bool is_server; - EventCallbackRef con_handler; - int tcp_fd = -1; - bool active;// qp is active ? - bool pending; - - void notify(); - ssize_t read_buffers(char* buf, size_t len); - int post_work_request(std::vector&); - - public: - RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s, - RDMAWorker *w); - virtual ~RDMAConnectedSocketImpl(); - - void pass_wc(std::vector &&v); - void get_wc(std::vector &w); - virtual int is_connected() override { return connected; } - - virtual ssize_t read(char* buf, size_t len) override; - virtual ssize_t zero_copy_read(bufferptr &data) override; - virtual ssize_t send(bufferlist &bl, bool more) override; - virtual void shutdown() override; - virtual void close() override; - virtual int fd() const override { return notify_fd; } - void fault(); - const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); } - ssize_t submit(bool more); - int activate(); - void fin(); - void handle_connection(); - void cleanup(); - void set_accept_fd(int sd); - int try_connect(const entity_addr_t&, const SocketOptions &opt); - bool is_pending() {return pending;} - void set_pending(bool val) {pending = val;} - class C_handle_connection : public EventCallback { - RDMAConnectedSocketImpl *csi; - bool active; - public: - C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {} - void do_request(int fd) { - if (active) - csi->handle_connection(); - } - void close() { - active = false; - } - }; -}; - -class RDMAServerSocketImpl : public ServerSocketImpl { - CephContext *cct; - NetHandler net; - int server_setup_socket; - Infiniband* infiniband; - RDMADispatcher *dispatcher; - RDMAWorker *worker; - entity_addr_t sa; - - public: - RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a); - - int listen(entity_addr_t &sa, const SocketOptions &opt); - virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; - virtual void abort_accept() override; - virtual int fd() const override { return server_setup_socket; } - int get_fd() { return server_setup_socket; } -}; - -class RDMAStack : public NetworkStack { - vector threads; - RDMADispatcher *dispatcher; - - std::atomic fork_finished = {false}; - - public: - explicit RDMAStack(CephContext *cct, const string &t); - virtual ~RDMAStack(); - virtual bool support_zero_copy_read() const override { return false; } - virtual bool nonblock_connect_need_writable_event() const { return false; } - - virtual void spawn_worker(unsigned i, std::function &&func) override; - virtual void join_worker(unsigned i) override; - RDMADispatcher *get_dispatcher() { return dispatcher; } - - virtual bool is_ready() override { return fork_finished.load(); }; - virtual void ready() override { fork_finished = true; }; -}; - -#endif