// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2016 XSKY * * Author: Haomai Wang * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_MSG_ASYNC_STACK_H #define CEPH_MSG_ASYNC_STACK_H #include "include/Spinlock.h" #include "common/perf_counters.h" #include "common/simple_spin.h" #include "msg/msg_types.h" #include "msg/async/Event.h" class Worker; class ConnectedSocketImpl { public: virtual ~ConnectedSocketImpl() {} virtual int is_connected() = 0; virtual ssize_t read(char*, size_t) = 0; virtual ssize_t zero_copy_read(bufferptr&) = 0; virtual ssize_t send(bufferlist &bl, bool more) = 0; virtual void shutdown() = 0; virtual void close() = 0; virtual int fd() const = 0; }; class ConnectedSocket; struct SocketOptions { bool nonblock = true; bool nodelay = true; int rcbuf_size = 0; int priority = -1; entity_addr_t connect_bind_addr; }; /// \cond internal class ServerSocketImpl { public: virtual ~ServerSocketImpl() {} virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0; virtual void abort_accept() = 0; /// Get file descriptor virtual int fd() const = 0; }; /// \endcond /// \addtogroup networking-module /// @{ /// A TCP (or other stream-based protocol) connection. /// /// A \c ConnectedSocket represents a full-duplex stream between /// two endpoints, a local endpoint and a remote endpoint. class ConnectedSocket { std::unique_ptr _csi; public: /// Constructs a \c ConnectedSocket not corresponding to a connection ConnectedSocket() {}; /// \cond internal explicit ConnectedSocket(std::unique_ptr csi) : _csi(std::move(csi)) {} /// \endcond ~ConnectedSocket() { if (_csi) _csi->close(); } /// Moves a \c ConnectedSocket object. ConnectedSocket(ConnectedSocket&& cs) = default; /// Move-assigns a \c ConnectedSocket object. ConnectedSocket& operator=(ConnectedSocket&& cs) = default; int is_connected() { return _csi->is_connected(); } /// Read the input stream with copy. /// /// Copy an object returning data sent from the remote endpoint. ssize_t read(char* buf, size_t len) { return _csi->read(buf, len); } /// Gets the input stream. /// /// Gets an object returning data sent from the remote endpoint. ssize_t zero_copy_read(bufferptr &data) { return _csi->zero_copy_read(data); } /// Gets the output stream. /// /// Gets an object that sends data to the remote endpoint. ssize_t send(bufferlist &bl, bool more) { return _csi->send(bl, more); } /// Disables output to the socket. /// /// Current or future writes that have not been successfully flushed /// will immediately fail with an error. This is useful to abort /// operations on a socket that is not making progress due to a /// peer failure. void shutdown() { return _csi->shutdown(); } /// Disables input from the socket. /// /// Current or future reads will immediately fail with an error. /// This is useful to abort operations on a socket that is not making /// progress due to a peer failure. void close() { _csi->close(); _csi.reset(); } /// Get file descriptor int fd() const { return _csi->fd(); } explicit operator bool() const { return _csi.get(); } }; /// @} /// \addtogroup networking-module /// @{ /// A listening socket, waiting to accept incoming network connections. class ServerSocket { std::unique_ptr _ssi; public: /// Constructs a \c ServerSocket not corresponding to a connection ServerSocket() {} /// \cond internal explicit ServerSocket(std::unique_ptr ssi) : _ssi(std::move(ssi)) {} ~ServerSocket() { if (_ssi) _ssi->abort_accept(); } /// \endcond /// Moves a \c ServerSocket object. ServerSocket(ServerSocket&& ss) = default; /// Move-assigns a \c ServerSocket object. ServerSocket& operator=(ServerSocket&& cs) = default; /// Accepts the next connection to successfully connect to this socket. /// /// \Accepts a \ref ConnectedSocket representing the connection, and /// a \ref entity_addr_t describing the remote endpoint. int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { return _ssi->accept(sock, opt, out, w); } /// Stops any \ref accept() in progress. /// /// Current and future \ref accept() calls will terminate immediately /// with an error. void abort_accept() { _ssi->abort_accept(); _ssi.reset(); } /// Get file descriptor int fd() const { return _ssi->fd(); } explicit operator bool() const { return _ssi.get(); } }; /// @} class NetworkStack; enum { l_msgr_first = 94000, l_msgr_recv_messages, l_msgr_send_messages, l_msgr_recv_bytes, l_msgr_send_bytes, l_msgr_created_connections, l_msgr_active_connections, l_msgr_running_total_time, l_msgr_running_send_time, l_msgr_running_recv_time, l_msgr_running_fast_dispatch_time, l_msgr_last, }; class Worker { std::mutex init_lock; std::condition_variable init_cond; bool init = false; public: bool done = false; CephContext *cct; PerfCounters *perf_logger; unsigned id; std::atomic_uint references; EventCenter center; Worker(const Worker&) = delete; Worker& operator=(const Worker&) = delete; Worker(CephContext *c, unsigned i) : cct(c), perf_logger(NULL), id(i), references(0), center(c) { char name[128]; sprintf(name, "AsyncMessenger::Worker-%u", id); // initialize perf_logger PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network sent bytes"); plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); plb.add_time(l_msgr_running_total_time, "msgr_running_total_time", "The total time of thread running"); plb.add_time(l_msgr_running_send_time, "msgr_running_send_time", "The total time of message sending"); plb.add_time(l_msgr_running_recv_time, "msgr_running_recv_time", "The total time of message receiving"); plb.add_time(l_msgr_running_fast_dispatch_time, "msgr_running_fast_dispatch_time", "The total time of fast dispatch"); perf_logger = plb.create_perf_counters(); cct->get_perfcounters_collection()->add(perf_logger); } virtual ~Worker() { if (perf_logger) { cct->get_perfcounters_collection()->remove(perf_logger); delete perf_logger; } } virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) = 0; virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) = 0; virtual void destroy() {} virtual void initialize() {} PerfCounters *get_perf_counter() { return perf_logger; } void release_worker() { int oldref = references.fetch_sub(1); assert(oldref > 0); } void init_done() { init_lock.lock(); init = true; init_cond.notify_all(); init_lock.unlock(); } bool is_init() { std::lock_guard l(init_lock); return init; } void wait_for_init() { std::unique_lock l(init_lock); while (!init) init_cond.wait(l); } void reset() { init_lock.lock(); init = false; init_cond.notify_all(); init_lock.unlock(); done = false; } }; class NetworkStack : public CephContext::ForkWatcher { std::string type; unsigned num_workers = 0; Spinlock pool_spin; bool started = false; std::function add_thread(unsigned i); protected: CephContext *cct; vector workers; explicit NetworkStack(CephContext *c, const string &t); public: NetworkStack(const NetworkStack &) = delete; NetworkStack& operator=(const NetworkStack &) = delete; ~NetworkStack() override { for (auto &&w : workers) delete w; } static std::shared_ptr create( CephContext *c, const string &type); static Worker* create_worker( CephContext *c, const string &t, unsigned i); // backend need to override this method if supports zero copy read virtual bool support_zero_copy_read() const { return false; } // backend need to override this method if backend doesn't support shared // listen table. // For example, posix backend has in kernel global listen table. If one // thread bind a port, other threads also aware this. // But for dpdk backend, we maintain listen table in each thread. So we // need to let each thread do binding port. virtual bool support_local_listen_table() const { return false; } virtual bool nonblock_connect_need_writable_event() const { return true; } void start(); void stop(); virtual Worker *get_worker(); Worker *get_worker(unsigned i) { return workers[i]; } void drain(); unsigned get_num_worker() const { return num_workers; } // direct is used in tests only virtual void spawn_worker(unsigned i, std::function &&) = 0; virtual void join_worker(unsigned i) = 0; void handle_pre_fork() override { stop(); } void handle_post_fork() override { start(); } virtual bool is_ready() { return true; }; virtual void ready() { }; }; #endif //CEPH_MSG_ASYNC_STACK_H