X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FStack.h;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FStack.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=a201406016ef575ed050b230171bb59d3a25f3e1;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/Stack.h b/src/ceph/src/msg/async/Stack.h deleted file mode 100644 index a201406..0000000 --- a/src/ceph/src/msg/async/Stack.h +++ /dev/null @@ -1,352 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 XSKY - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_MSG_ASYNC_STACK_H -#define CEPH_MSG_ASYNC_STACK_H - -#include "include/Spinlock.h" -#include "common/perf_counters.h" -#include "common/simple_spin.h" -#include "msg/msg_types.h" -#include "msg/async/Event.h" - -class Worker; -class ConnectedSocketImpl { - public: - virtual ~ConnectedSocketImpl() {} - virtual int is_connected() = 0; - virtual ssize_t read(char*, size_t) = 0; - virtual ssize_t zero_copy_read(bufferptr&) = 0; - virtual ssize_t send(bufferlist &bl, bool more) = 0; - virtual void shutdown() = 0; - virtual void close() = 0; - virtual int fd() const = 0; -}; - -class ConnectedSocket; -struct SocketOptions { - bool nonblock = true; - bool nodelay = true; - int rcbuf_size = 0; - int priority = -1; - entity_addr_t connect_bind_addr; -}; - -/// \cond internal -class ServerSocketImpl { - public: - virtual ~ServerSocketImpl() {} - virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0; - virtual void abort_accept() = 0; - /// Get file descriptor - virtual int fd() const = 0; -}; -/// \endcond - -/// \addtogroup networking-module -/// @{ - -/// A TCP (or other stream-based protocol) connection. -/// -/// A \c ConnectedSocket represents a full-duplex stream between -/// two endpoints, a local endpoint and a remote endpoint. -class ConnectedSocket { - std::unique_ptr _csi; - - public: - /// Constructs a \c ConnectedSocket not corresponding to a connection - ConnectedSocket() {}; - /// \cond internal - explicit ConnectedSocket(std::unique_ptr csi) - : _csi(std::move(csi)) {} - /// \endcond - ~ConnectedSocket() { - if (_csi) - _csi->close(); - } - /// Moves a \c ConnectedSocket object. - ConnectedSocket(ConnectedSocket&& cs) = default; - /// Move-assigns a \c ConnectedSocket object. - ConnectedSocket& operator=(ConnectedSocket&& cs) = default; - - int is_connected() { - return _csi->is_connected(); - } - /// Read the input stream with copy. - /// - /// Copy an object returning data sent from the remote endpoint. - ssize_t read(char* buf, size_t len) { - return _csi->read(buf, len); - } - /// Gets the input stream. - /// - /// Gets an object returning data sent from the remote endpoint. - ssize_t zero_copy_read(bufferptr &data) { - return _csi->zero_copy_read(data); - } - /// Gets the output stream. - /// - /// Gets an object that sends data to the remote endpoint. - ssize_t send(bufferlist &bl, bool more) { - return _csi->send(bl, more); - } - /// Disables output to the socket. - /// - /// Current or future writes that have not been successfully flushed - /// will immediately fail with an error. This is useful to abort - /// operations on a socket that is not making progress due to a - /// peer failure. - void shutdown() { - return _csi->shutdown(); - } - /// Disables input from the socket. - /// - /// Current or future reads will immediately fail with an error. - /// This is useful to abort operations on a socket that is not making - /// progress due to a peer failure. - void close() { - _csi->close(); - _csi.reset(); - } - - /// Get file descriptor - int fd() const { - return _csi->fd(); - } - - explicit operator bool() const { - return _csi.get(); - } -}; -/// @} - -/// \addtogroup networking-module -/// @{ - -/// A listening socket, waiting to accept incoming network connections. -class ServerSocket { - std::unique_ptr _ssi; - public: - /// Constructs a \c ServerSocket not corresponding to a connection - ServerSocket() {} - /// \cond internal - explicit ServerSocket(std::unique_ptr ssi) - : _ssi(std::move(ssi)) {} - ~ServerSocket() { - if (_ssi) - _ssi->abort_accept(); - } - /// \endcond - /// Moves a \c ServerSocket object. - ServerSocket(ServerSocket&& ss) = default; - /// Move-assigns a \c ServerSocket object. - ServerSocket& operator=(ServerSocket&& cs) = default; - - /// Accepts the next connection to successfully connect to this socket. - /// - /// \Accepts a \ref ConnectedSocket representing the connection, and - /// a \ref entity_addr_t describing the remote endpoint. - int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { - return _ssi->accept(sock, opt, out, w); - } - - /// Stops any \ref accept() in progress. - /// - /// Current and future \ref accept() calls will terminate immediately - /// with an error. - void abort_accept() { - _ssi->abort_accept(); - _ssi.reset(); - } - - /// Get file descriptor - int fd() const { - return _ssi->fd(); - } - - explicit operator bool() const { - return _ssi.get(); - } -}; -/// @} - -class NetworkStack; - -enum { - l_msgr_first = 94000, - l_msgr_recv_messages, - l_msgr_send_messages, - l_msgr_recv_bytes, - l_msgr_send_bytes, - l_msgr_created_connections, - l_msgr_active_connections, - - l_msgr_running_total_time, - l_msgr_running_send_time, - l_msgr_running_recv_time, - l_msgr_running_fast_dispatch_time, - - l_msgr_last, -}; - -class Worker { - std::mutex init_lock; - std::condition_variable init_cond; - bool init = false; - - public: - bool done = false; - - CephContext *cct; - PerfCounters *perf_logger; - unsigned id; - - std::atomic_uint references; - EventCenter center; - - Worker(const Worker&) = delete; - Worker& operator=(const Worker&) = delete; - - Worker(CephContext *c, unsigned i) - : cct(c), perf_logger(NULL), id(i), references(0), center(c) { - char name[128]; - sprintf(name, "AsyncMessenger::Worker-%u", id); - // initialize perf_logger - PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); - - plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); - plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); - plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network sent bytes"); - plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); - plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); - - plb.add_time(l_msgr_running_total_time, "msgr_running_total_time", "The total time of thread running"); - plb.add_time(l_msgr_running_send_time, "msgr_running_send_time", "The total time of message sending"); - plb.add_time(l_msgr_running_recv_time, "msgr_running_recv_time", "The total time of message receiving"); - plb.add_time(l_msgr_running_fast_dispatch_time, "msgr_running_fast_dispatch_time", "The total time of fast dispatch"); - - perf_logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(perf_logger); - } - virtual ~Worker() { - if (perf_logger) { - cct->get_perfcounters_collection()->remove(perf_logger); - delete perf_logger; - } - } - - virtual int listen(entity_addr_t &addr, - const SocketOptions &opts, ServerSocket *) = 0; - virtual int connect(const entity_addr_t &addr, - const SocketOptions &opts, ConnectedSocket *socket) = 0; - virtual void destroy() {} - - virtual void initialize() {} - PerfCounters *get_perf_counter() { return perf_logger; } - void release_worker() { - int oldref = references.fetch_sub(1); - assert(oldref > 0); - } - void init_done() { - init_lock.lock(); - init = true; - init_cond.notify_all(); - init_lock.unlock(); - } - bool is_init() { - std::lock_guard l(init_lock); - return init; - } - void wait_for_init() { - std::unique_lock l(init_lock); - while (!init) - init_cond.wait(l); - } - void reset() { - init_lock.lock(); - init = false; - init_cond.notify_all(); - init_lock.unlock(); - done = false; - } -}; - -class NetworkStack : public CephContext::ForkWatcher { - std::string type; - unsigned num_workers = 0; - Spinlock pool_spin; - bool started = false; - - std::function add_thread(unsigned i); - - protected: - CephContext *cct; - vector workers; - - explicit NetworkStack(CephContext *c, const string &t); - public: - NetworkStack(const NetworkStack &) = delete; - NetworkStack& operator=(const NetworkStack &) = delete; - ~NetworkStack() override { - for (auto &&w : workers) - delete w; - } - - static std::shared_ptr create( - CephContext *c, const string &type); - - static Worker* create_worker( - CephContext *c, const string &t, unsigned i); - // backend need to override this method if supports zero copy read - virtual bool support_zero_copy_read() const { return false; } - // backend need to override this method if backend doesn't support shared - // listen table. - // For example, posix backend has in kernel global listen table. If one - // thread bind a port, other threads also aware this. - // But for dpdk backend, we maintain listen table in each thread. So we - // need to let each thread do binding port. - virtual bool support_local_listen_table() const { return false; } - virtual bool nonblock_connect_need_writable_event() const { return true; } - - void start(); - void stop(); - virtual Worker *get_worker(); - Worker *get_worker(unsigned i) { - return workers[i]; - } - void drain(); - unsigned get_num_worker() const { - return num_workers; - } - - // direct is used in tests only - virtual void spawn_worker(unsigned i, std::function &&) = 0; - virtual void join_worker(unsigned i) = 0; - - void handle_pre_fork() override { - stop(); - } - - void handle_post_fork() override { - start(); - } - - virtual bool is_ready() { return true; }; - virtual void ready() { }; -}; - -#endif //CEPH_MSG_ASYNC_STACK_H