--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_ASYNCCONNECTION_H
+#define CEPH_MSG_ASYNCCONNECTION_H
+
+#include <atomic>
+#include <pthread.h>
+#include <climits>
+#include <list>
+#include <mutex>
+#include <map>
+using namespace std;
+
+#include "auth/AuthSessionHandler.h"
+#include "common/ceph_time.h"
+#include "common/perf_counters.h"
+#include "include/buffer.h"
+#include "msg/Connection.h"
+#include "msg/Messenger.h"
+
+#include "Event.h"
+#include "Stack.h"
+
+class AsyncMessenger;
+class Worker;
+
+static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
+
+/*
+ * AsyncConnection maintains a logic session between two endpoints. In other
+ * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
+ * will handle with network fault or read/write transactions. If one file
+ * descriptor broken, AsyncConnection will maintain the message queue and
+ * sequence, try to reconnect peer endpoint.
+ */
+class AsyncConnection : public Connection {
+
+ ssize_t read_bulk(char *buf, unsigned len);
+ ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
+ ssize_t try_send(bufferlist &bl, bool more=false) {
+ std::lock_guard<std::mutex> l(write_lock);
+ outcoming_bl.claim_append(bl);
+ return _try_send(more);
+ }
+ ssize_t _try_send(bool more=false);
+ ssize_t _send(Message *m);
+ void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
+ ssize_t read_until(unsigned needed, char *p);
+ ssize_t _process_connection();
+ void _connect();
+ void _stop();
+ int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
+ ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
+ void was_session_reset();
+ void fault();
+ void discard_out_queue();
+ void discard_requeued_up_to(uint64_t seq);
+ void requeue_sent();
+ int randomize_out_seq();
+ void handle_ack(uint64_t seq);
+ void _append_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
+ ssize_t write_message(Message *m, bufferlist& bl, bool more);
+ void inject_delay();
+ ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
+ bufferlist &authorizer_reply) {
+ bufferlist reply_bl;
+ reply.tag = tag;
+ reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
+ reply.authorizer_len = authorizer_reply.length();
+ reply_bl.append((char*)&reply, sizeof(reply));
+ if (reply.authorizer_len) {
+ reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
+ }
+ ssize_t r = try_send(reply_bl);
+ if (r < 0) {
+ inject_delay();
+ return -1;
+ }
+
+ state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
+ return 0;
+ }
+ bool is_queued() const {
+ return !out_q.empty() || outcoming_bl.length();
+ }
+ void shutdown_socket() {
+ for (auto &&t : register_time_events)
+ center->delete_time_event(t);
+ register_time_events.clear();
+ if (last_tick_id) {
+ center->delete_time_event(last_tick_id);
+ last_tick_id = 0;
+ }
+ if (cs) {
+ center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
+ cs.shutdown();
+ cs.close();
+ }
+ }
+ Message *_get_next_outgoing(bufferlist *bl) {
+ Message *m = 0;
+ while (!m && !out_q.empty()) {
+ map<int, list<pair<bufferlist, Message*> > >::reverse_iterator it = out_q.rbegin();
+ if (!it->second.empty()) {
+ list<pair<bufferlist, Message*> >::iterator p = it->second.begin();
+ m = p->second;
+ if (bl)
+ bl->swap(p->first);
+ it->second.erase(p);
+ }
+ if (it->second.empty())
+ out_q.erase(it->first);
+ }
+ return m;
+ }
+ bool _has_next_outgoing() const {
+ return !out_q.empty();
+ }
+ void reset_recv_state();
+
+ /**
+ * The DelayedDelivery is for injecting delays into Message delivery off
+ * the socket. It is only enabled if delays are requested, and if they
+ * are then it pulls Messages off the DelayQueue and puts them into the
+ * AsyncMessenger event queue.
+ */
+ class DelayedDelivery : public EventCallback {
+ std::set<uint64_t> register_time_events; // need to delete it if stop
+ std::deque<std::pair<utime_t, Message*> > delay_queue;
+ std::mutex delay_lock;
+ AsyncMessenger *msgr;
+ EventCenter *center;
+ DispatchQueue *dispatch_queue;
+ uint64_t conn_id;
+ std::atomic_bool stop_dispatch;
+
+ public:
+ explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
+ DispatchQueue *q, uint64_t cid)
+ : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
+ stop_dispatch(false) { }
+ ~DelayedDelivery() override {
+ assert(register_time_events.empty());
+ assert(delay_queue.empty());
+ }
+ void set_center(EventCenter *c) { center = c; }
+ void do_request(int id) override;
+ void queue(double delay_period, utime_t release, Message *m) {
+ std::lock_guard<std::mutex> l(delay_lock);
+ delay_queue.push_back(std::make_pair(release, m));
+ register_time_events.insert(center->create_time_event(delay_period*1000000, this));
+ }
+ void discard() {
+ stop_dispatch = true;
+ center->submit_to(center->get_id(), [this] () mutable {
+ std::lock_guard<std::mutex> l(delay_lock);
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front().second;
+ dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
+ m->put();
+ delay_queue.pop_front();
+ }
+ for (auto i : register_time_events)
+ center->delete_time_event(i);
+ register_time_events.clear();
+ stop_dispatch = false;
+ }, true);
+ }
+ bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
+ void flush();
+ } *delay_state;
+
+ public:
+ AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w);
+ ~AsyncConnection() override;
+ void maybe_start_delay_thread();
+
+ ostream& _conn_prefix(std::ostream *_dout);
+
+ bool is_connected() override {
+ return can_write.load() == WriteStatus::CANWRITE;
+ }
+
+ // Only call when AsyncConnection first construct
+ void connect(const entity_addr_t& addr, int type) {
+ set_peer_type(type);
+ set_peer_addr(addr);
+ policy = msgr->get_policy(type);
+ _connect();
+ }
+ // Only call when AsyncConnection first construct
+ void accept(ConnectedSocket socket, entity_addr_t &addr);
+ int send_message(Message *m) override;
+
+ void send_keepalive() override;
+ void mark_down() override;
+ void mark_disposable() override {
+ std::lock_guard<std::mutex> l(lock);
+ policy.lossy = true;
+ }
+
+ private:
+ enum {
+ STATE_NONE,
+ STATE_OPEN,
+ STATE_OPEN_KEEPALIVE2,
+ STATE_OPEN_KEEPALIVE2_ACK,
+ STATE_OPEN_TAG_ACK,
+ STATE_OPEN_MESSAGE_HEADER,
+ STATE_OPEN_MESSAGE_THROTTLE_MESSAGE,
+ STATE_OPEN_MESSAGE_THROTTLE_BYTES,
+ STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE,
+ STATE_OPEN_MESSAGE_READ_FRONT,
+ STATE_OPEN_MESSAGE_READ_MIDDLE,
+ STATE_OPEN_MESSAGE_READ_DATA_PREPARE,
+ STATE_OPEN_MESSAGE_READ_DATA,
+ STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH,
+ STATE_OPEN_TAG_CLOSE,
+ STATE_WAIT_SEND,
+ STATE_CONNECTING,
+ STATE_CONNECTING_RE,
+ STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY,
+ STATE_CONNECTING_SEND_CONNECT_MSG,
+ STATE_CONNECTING_WAIT_CONNECT_REPLY,
+ STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH,
+ STATE_CONNECTING_WAIT_ACK_SEQ,
+ STATE_CONNECTING_READY,
+ STATE_ACCEPTING,
+ STATE_ACCEPTING_WAIT_BANNER_ADDR,
+ STATE_ACCEPTING_WAIT_CONNECT_MSG,
+ STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH,
+ STATE_ACCEPTING_WAIT_SEQ,
+ STATE_ACCEPTING_READY,
+ STATE_STANDBY,
+ STATE_CLOSED,
+ STATE_WAIT, // just wait for racing connection
+ };
+
+ static const int TCP_PREFETCH_MIN_SIZE;
+ static const char *get_state_name(int state) {
+ const char* const statenames[] = {"STATE_NONE",
+ "STATE_OPEN",
+ "STATE_OPEN_KEEPALIVE2",
+ "STATE_OPEN_KEEPALIVE2_ACK",
+ "STATE_OPEN_TAG_ACK",
+ "STATE_OPEN_MESSAGE_HEADER",
+ "STATE_OPEN_MESSAGE_THROTTLE_MESSAGE",
+ "STATE_OPEN_MESSAGE_THROTTLE_BYTES",
+ "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE",
+ "STATE_OPEN_MESSAGE_READ_FRONT",
+ "STATE_OPEN_MESSAGE_READ_MIDDLE",
+ "STATE_OPEN_MESSAGE_READ_DATA_PREPARE",
+ "STATE_OPEN_MESSAGE_READ_DATA",
+ "STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH",
+ "STATE_OPEN_TAG_CLOSE",
+ "STATE_WAIT_SEND",
+ "STATE_CONNECTING",
+ "STATE_CONNECTING_RE",
+ "STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY",
+ "STATE_CONNECTING_SEND_CONNECT_MSG",
+ "STATE_CONNECTING_WAIT_CONNECT_REPLY",
+ "STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH",
+ "STATE_CONNECTING_WAIT_ACK_SEQ",
+ "STATE_CONNECTING_READY",
+ "STATE_ACCEPTING",
+ "STATE_ACCEPTING_WAIT_BANNER_ADDR",
+ "STATE_ACCEPTING_WAIT_CONNECT_MSG",
+ "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH",
+ "STATE_ACCEPTING_WAIT_SEQ",
+ "STATE_ACCEPTING_READY",
+ "STATE_STANDBY",
+ "STATE_CLOSED",
+ "STATE_WAIT"};
+ return statenames[state];
+ }
+
+ AsyncMessenger *async_msgr;
+ uint64_t conn_id;
+ PerfCounters *logger;
+ int global_seq;
+ __u32 connect_seq, peer_global_seq;
+ std::atomic<uint64_t> out_seq{0};
+ std::atomic<uint64_t> ack_left{0}, in_seq{0};
+ int state;
+ int state_after_send;
+ ConnectedSocket cs;
+ int port;
+ Messenger::Policy policy;
+
+ DispatchQueue *dispatch_queue;
+
+ // lockfree, only used in own thread
+ bufferlist outcoming_bl;
+ bool open_write = false;
+
+ std::mutex write_lock;
+ enum class WriteStatus {
+ NOWRITE,
+ REPLACING,
+ CANWRITE,
+ CLOSED
+ };
+ std::atomic<WriteStatus> can_write;
+ list<Message*> sent; // the first bufferlist need to inject seq
+ map<int, list<pair<bufferlist, Message*> > > out_q; // priority queue for outbound msgs
+ bool keepalive;
+
+ std::mutex lock;
+ utime_t backoff; // backoff time
+ EventCallbackRef read_handler;
+ EventCallbackRef write_handler;
+ EventCallbackRef wakeup_handler;
+ EventCallbackRef tick_handler;
+ struct iovec msgvec[ASYNC_IOV_MAX];
+ char *recv_buf;
+ uint32_t recv_max_prefetch;
+ uint32_t recv_start;
+ uint32_t recv_end;
+ set<uint64_t> register_time_events; // need to delete it if stop
+ ceph::coarse_mono_clock::time_point last_active;
+ uint64_t last_tick_id = 0;
+ const uint64_t inactive_timeout_us;
+
+ // Tis section are temp variables used by state transition
+
+ // Open state
+ utime_t recv_stamp;
+ utime_t throttle_stamp;
+ unsigned msg_left;
+ uint64_t cur_msg_size;
+ ceph_msg_header current_header;
+ bufferlist data_buf;
+ bufferlist::iterator data_blp;
+ bufferlist front, middle, data;
+ ceph_msg_connect connect_msg;
+ // Connecting state
+ bool got_bad_auth;
+ AuthAuthorizer *authorizer;
+ bufferlist authorizer_buf;
+ ceph_msg_connect_reply connect_reply;
+ // Accepting state
+ entity_addr_t socket_addr;
+ CryptoKey session_key;
+ bool replacing; // when replacing process happened, we will reply connect
+ // side with RETRY tag and accept side will clear replaced
+ // connection. So when connect side reissue connect_msg,
+ // there won't exists conflicting connection so we use
+ // "replacing" to skip RESETSESSION to avoid detect wrong
+ // presentation
+ bool is_reset_from_peer;
+ bool once_ready;
+
+ // used only for local state, it will be overwrite when state transition
+ char *state_buffer;
+ // used only by "read_until"
+ uint64_t state_offset;
+ Worker *worker;
+ EventCenter *center;
+ ceph::shared_ptr<AuthSessionHandler> session_security;
+
+ public:
+ // used by eventcallback
+ void handle_write();
+ void process();
+ void wakeup_from(uint64_t id);
+ void tick(uint64_t id);
+ void local_deliver();
+ void stop(bool queue_reset) {
+ lock.lock();
+ bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
+ _stop();
+ lock.unlock();
+ if (need_queue_reset)
+ dispatch_queue->queue_reset(this);
+ }
+ void cleanup() {
+ shutdown_socket();
+ delete read_handler;
+ delete write_handler;
+ delete wakeup_handler;
+ delete tick_handler;
+ if (delay_state) {
+ delete delay_state;
+ delay_state = NULL;
+ }
+ }
+ PerfCounters *get_perf_counter() {
+ return logger;
+ }
+}; /* AsyncConnection */
+
+typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
+
+#endif