1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_ASYNC_STACK_H
18 #define CEPH_MSG_ASYNC_STACK_H
20 #include "include/Spinlock.h"
21 #include "common/perf_counters.h"
22 #include "common/simple_spin.h"
23 #include "msg/msg_types.h"
24 #include "msg/async/Event.h"
27 class ConnectedSocketImpl {
29 virtual ~ConnectedSocketImpl() {}
30 virtual int is_connected() = 0;
31 virtual ssize_t read(char*, size_t) = 0;
32 virtual ssize_t zero_copy_read(bufferptr&) = 0;
33 virtual ssize_t send(bufferlist &bl, bool more) = 0;
34 virtual void shutdown() = 0;
35 virtual void close() = 0;
36 virtual int fd() const = 0;
39 class ConnectedSocket;
40 struct SocketOptions {
45 entity_addr_t connect_bind_addr;
49 class ServerSocketImpl {
51 virtual ~ServerSocketImpl() {}
52 virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
53 virtual void abort_accept() = 0;
54 /// Get file descriptor
55 virtual int fd() const = 0;
59 /// \addtogroup networking-module
62 /// A TCP (or other stream-based protocol) connection.
64 /// A \c ConnectedSocket represents a full-duplex stream between
65 /// two endpoints, a local endpoint and a remote endpoint.
66 class ConnectedSocket {
67 std::unique_ptr<ConnectedSocketImpl> _csi;
70 /// Constructs a \c ConnectedSocket not corresponding to a connection
73 explicit ConnectedSocket(std::unique_ptr<ConnectedSocketImpl> csi)
74 : _csi(std::move(csi)) {}
80 /// Moves a \c ConnectedSocket object.
81 ConnectedSocket(ConnectedSocket&& cs) = default;
82 /// Move-assigns a \c ConnectedSocket object.
83 ConnectedSocket& operator=(ConnectedSocket&& cs) = default;
86 return _csi->is_connected();
88 /// Read the input stream with copy.
90 /// Copy an object returning data sent from the remote endpoint.
91 ssize_t read(char* buf, size_t len) {
92 return _csi->read(buf, len);
94 /// Gets the input stream.
96 /// Gets an object returning data sent from the remote endpoint.
97 ssize_t zero_copy_read(bufferptr &data) {
98 return _csi->zero_copy_read(data);
100 /// Gets the output stream.
102 /// Gets an object that sends data to the remote endpoint.
103 ssize_t send(bufferlist &bl, bool more) {
104 return _csi->send(bl, more);
106 /// Disables output to the socket.
108 /// Current or future writes that have not been successfully flushed
109 /// will immediately fail with an error. This is useful to abort
110 /// operations on a socket that is not making progress due to a
113 return _csi->shutdown();
115 /// Disables input from the socket.
117 /// Current or future reads will immediately fail with an error.
118 /// This is useful to abort operations on a socket that is not making
119 /// progress due to a peer failure.
125 /// Get file descriptor
130 explicit operator bool() const {
136 /// \addtogroup networking-module
139 /// A listening socket, waiting to accept incoming network connections.
141 std::unique_ptr<ServerSocketImpl> _ssi;
143 /// Constructs a \c ServerSocket not corresponding to a connection
146 explicit ServerSocket(std::unique_ptr<ServerSocketImpl> ssi)
147 : _ssi(std::move(ssi)) {}
150 _ssi->abort_accept();
153 /// Moves a \c ServerSocket object.
154 ServerSocket(ServerSocket&& ss) = default;
155 /// Move-assigns a \c ServerSocket object.
156 ServerSocket& operator=(ServerSocket&& cs) = default;
158 /// Accepts the next connection to successfully connect to this socket.
160 /// \Accepts a \ref ConnectedSocket representing the connection, and
161 /// a \ref entity_addr_t describing the remote endpoint.
162 int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
163 return _ssi->accept(sock, opt, out, w);
166 /// Stops any \ref accept() in progress.
168 /// Current and future \ref accept() calls will terminate immediately
170 void abort_accept() {
171 _ssi->abort_accept();
175 /// Get file descriptor
180 explicit operator bool() const {
189 l_msgr_first = 94000,
190 l_msgr_recv_messages,
191 l_msgr_send_messages,
194 l_msgr_created_connections,
195 l_msgr_active_connections,
197 l_msgr_running_total_time,
198 l_msgr_running_send_time,
199 l_msgr_running_recv_time,
200 l_msgr_running_fast_dispatch_time,
206 std::mutex init_lock;
207 std::condition_variable init_cond;
214 PerfCounters *perf_logger;
217 std::atomic_uint references;
220 Worker(const Worker&) = delete;
221 Worker& operator=(const Worker&) = delete;
223 Worker(CephContext *c, unsigned i)
224 : cct(c), perf_logger(NULL), id(i), references(0), center(c) {
226 sprintf(name, "AsyncMessenger::Worker-%u", id);
227 // initialize perf_logger
228 PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
230 plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
231 plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
232 plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
233 plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network sent bytes");
234 plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
235 plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
237 plb.add_time(l_msgr_running_total_time, "msgr_running_total_time", "The total time of thread running");
238 plb.add_time(l_msgr_running_send_time, "msgr_running_send_time", "The total time of message sending");
239 plb.add_time(l_msgr_running_recv_time, "msgr_running_recv_time", "The total time of message receiving");
240 plb.add_time(l_msgr_running_fast_dispatch_time, "msgr_running_fast_dispatch_time", "The total time of fast dispatch");
242 perf_logger = plb.create_perf_counters();
243 cct->get_perfcounters_collection()->add(perf_logger);
247 cct->get_perfcounters_collection()->remove(perf_logger);
252 virtual int listen(entity_addr_t &addr,
253 const SocketOptions &opts, ServerSocket *) = 0;
254 virtual int connect(const entity_addr_t &addr,
255 const SocketOptions &opts, ConnectedSocket *socket) = 0;
256 virtual void destroy() {}
258 virtual void initialize() {}
259 PerfCounters *get_perf_counter() { return perf_logger; }
260 void release_worker() {
261 int oldref = references.fetch_sub(1);
267 init_cond.notify_all();
271 std::lock_guard<std::mutex> l(init_lock);
274 void wait_for_init() {
275 std::unique_lock<std::mutex> l(init_lock);
282 init_cond.notify_all();
288 class NetworkStack : public CephContext::ForkWatcher {
290 unsigned num_workers = 0;
292 bool started = false;
294 std::function<void ()> add_thread(unsigned i);
298 vector<Worker*> workers;
300 explicit NetworkStack(CephContext *c, const string &t);
302 NetworkStack(const NetworkStack &) = delete;
303 NetworkStack& operator=(const NetworkStack &) = delete;
304 ~NetworkStack() override {
305 for (auto &&w : workers)
309 static std::shared_ptr<NetworkStack> create(
310 CephContext *c, const string &type);
312 static Worker* create_worker(
313 CephContext *c, const string &t, unsigned i);
314 // backend need to override this method if supports zero copy read
315 virtual bool support_zero_copy_read() const { return false; }
316 // backend need to override this method if backend doesn't support shared
318 // For example, posix backend has in kernel global listen table. If one
319 // thread bind a port, other threads also aware this.
320 // But for dpdk backend, we maintain listen table in each thread. So we
321 // need to let each thread do binding port.
322 virtual bool support_local_listen_table() const { return false; }
323 virtual bool nonblock_connect_need_writable_event() const { return true; }
327 virtual Worker *get_worker();
328 Worker *get_worker(unsigned i) {
332 unsigned get_num_worker() const {
336 // direct is used in tests only
337 virtual void spawn_worker(unsigned i, std::function<void ()> &&) = 0;
338 virtual void join_worker(unsigned i) = 0;
340 void handle_pre_fork() override {
344 void handle_post_fork() override {
348 virtual bool is_ready() { return true; };
349 virtual void ready() { };
352 #endif //CEPH_MSG_ASYNC_STACK_H