X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FAsyncConnection.h;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FAsyncConnection.h;h=ab2ff2c4ab05054a8a2b579d0eed26c33a97f9ad;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/AsyncConnection.h b/src/ceph/src/msg/async/AsyncConnection.h new file mode 100644 index 0000000..ab2ff2c --- /dev/null +++ b/src/ceph/src/msg/async/AsyncConnection.h @@ -0,0 +1,408 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 UnitedStack + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_ASYNCCONNECTION_H +#define CEPH_MSG_ASYNCCONNECTION_H + +#include +#include +#include +#include +#include +#include +using namespace std; + +#include "auth/AuthSessionHandler.h" +#include "common/ceph_time.h" +#include "common/perf_counters.h" +#include "include/buffer.h" +#include "msg/Connection.h" +#include "msg/Messenger.h" + +#include "Event.h" +#include "Stack.h" + +class AsyncMessenger; +class Worker; + +static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); + +/* + * AsyncConnection maintains a logic session between two endpoints. In other + * word, a pair of addresses can find the only AsyncConnection. AsyncConnection + * will handle with network fault or read/write transactions. If one file + * descriptor broken, AsyncConnection will maintain the message queue and + * sequence, try to reconnect peer endpoint. + */ +class AsyncConnection : public Connection { + + ssize_t read_bulk(char *buf, unsigned len); + ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more); + ssize_t try_send(bufferlist &bl, bool more=false) { + std::lock_guard l(write_lock); + outcoming_bl.claim_append(bl); + return _try_send(more); + } + ssize_t _try_send(bool more=false); + ssize_t _send(Message *m); + void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); + ssize_t read_until(unsigned needed, char *p); + ssize_t _process_connection(); + void _connect(); + void _stop(); + int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r); + ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl); + void was_session_reset(); + void fault(); + void discard_out_queue(); + void discard_requeued_up_to(uint64_t seq); + void requeue_sent(); + int randomize_out_seq(); + void handle_ack(uint64_t seq); + void _append_keepalive_or_ack(bool ack=false, utime_t *t=NULL); + ssize_t write_message(Message *m, bufferlist& bl, bool more); + void inject_delay(); + ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply, + bufferlist &authorizer_reply) { + bufferlist reply_bl; + reply.tag = tag; + reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; + reply.authorizer_len = authorizer_reply.length(); + reply_bl.append((char*)&reply, sizeof(reply)); + if (reply.authorizer_len) { + reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); + } + ssize_t r = try_send(reply_bl); + if (r < 0) { + inject_delay(); + return -1; + } + + state = STATE_ACCEPTING_WAIT_CONNECT_MSG; + return 0; + } + bool is_queued() const { + return !out_q.empty() || outcoming_bl.length(); + } + void shutdown_socket() { + for (auto &&t : register_time_events) + center->delete_time_event(t); + register_time_events.clear(); + if (last_tick_id) { + center->delete_time_event(last_tick_id); + last_tick_id = 0; + } + if (cs) { + center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); + cs.shutdown(); + cs.close(); + } + } + Message *_get_next_outgoing(bufferlist *bl) { + Message *m = 0; + while (!m && !out_q.empty()) { + map > >::reverse_iterator it = out_q.rbegin(); + if (!it->second.empty()) { + list >::iterator p = it->second.begin(); + m = p->second; + if (bl) + bl->swap(p->first); + it->second.erase(p); + } + if (it->second.empty()) + out_q.erase(it->first); + } + return m; + } + bool _has_next_outgoing() const { + return !out_q.empty(); + } + void reset_recv_state(); + + /** + * The DelayedDelivery is for injecting delays into Message delivery off + * the socket. It is only enabled if delays are requested, and if they + * are then it pulls Messages off the DelayQueue and puts them into the + * AsyncMessenger event queue. + */ + class DelayedDelivery : public EventCallback { + std::set register_time_events; // need to delete it if stop + std::deque > delay_queue; + std::mutex delay_lock; + AsyncMessenger *msgr; + EventCenter *center; + DispatchQueue *dispatch_queue; + uint64_t conn_id; + std::atomic_bool stop_dispatch; + + public: + explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c, + DispatchQueue *q, uint64_t cid) + : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid), + stop_dispatch(false) { } + ~DelayedDelivery() override { + assert(register_time_events.empty()); + assert(delay_queue.empty()); + } + void set_center(EventCenter *c) { center = c; } + void do_request(int id) override; + void queue(double delay_period, utime_t release, Message *m) { + std::lock_guard l(delay_lock); + delay_queue.push_back(std::make_pair(release, m)); + register_time_events.insert(center->create_time_event(delay_period*1000000, this)); + } + void discard() { + stop_dispatch = true; + center->submit_to(center->get_id(), [this] () mutable { + std::lock_guard l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); + m->put(); + delay_queue.pop_front(); + } + for (auto i : register_time_events) + center->delete_time_event(i); + register_time_events.clear(); + stop_dispatch = false; + }, true); + } + bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); } + void flush(); + } *delay_state; + + public: + AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w); + ~AsyncConnection() override; + void maybe_start_delay_thread(); + + ostream& _conn_prefix(std::ostream *_dout); + + bool is_connected() override { + return can_write.load() == WriteStatus::CANWRITE; + } + + // Only call when AsyncConnection first construct + void connect(const entity_addr_t& addr, int type) { + set_peer_type(type); + set_peer_addr(addr); + policy = msgr->get_policy(type); + _connect(); + } + // Only call when AsyncConnection first construct + void accept(ConnectedSocket socket, entity_addr_t &addr); + int send_message(Message *m) override; + + void send_keepalive() override; + void mark_down() override; + void mark_disposable() override { + std::lock_guard l(lock); + policy.lossy = true; + } + + private: + enum { + STATE_NONE, + STATE_OPEN, + STATE_OPEN_KEEPALIVE2, + STATE_OPEN_KEEPALIVE2_ACK, + STATE_OPEN_TAG_ACK, + STATE_OPEN_MESSAGE_HEADER, + STATE_OPEN_MESSAGE_THROTTLE_MESSAGE, + STATE_OPEN_MESSAGE_THROTTLE_BYTES, + STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE, + STATE_OPEN_MESSAGE_READ_FRONT, + STATE_OPEN_MESSAGE_READ_MIDDLE, + STATE_OPEN_MESSAGE_READ_DATA_PREPARE, + STATE_OPEN_MESSAGE_READ_DATA, + STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH, + STATE_OPEN_TAG_CLOSE, + STATE_WAIT_SEND, + STATE_CONNECTING, + STATE_CONNECTING_RE, + STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY, + STATE_CONNECTING_SEND_CONNECT_MSG, + STATE_CONNECTING_WAIT_CONNECT_REPLY, + STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH, + STATE_CONNECTING_WAIT_ACK_SEQ, + STATE_CONNECTING_READY, + STATE_ACCEPTING, + STATE_ACCEPTING_WAIT_BANNER_ADDR, + STATE_ACCEPTING_WAIT_CONNECT_MSG, + STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH, + STATE_ACCEPTING_WAIT_SEQ, + STATE_ACCEPTING_READY, + STATE_STANDBY, + STATE_CLOSED, + STATE_WAIT, // just wait for racing connection + }; + + static const int TCP_PREFETCH_MIN_SIZE; + static const char *get_state_name(int state) { + const char* const statenames[] = {"STATE_NONE", + "STATE_OPEN", + "STATE_OPEN_KEEPALIVE2", + "STATE_OPEN_KEEPALIVE2_ACK", + "STATE_OPEN_TAG_ACK", + "STATE_OPEN_MESSAGE_HEADER", + "STATE_OPEN_MESSAGE_THROTTLE_MESSAGE", + "STATE_OPEN_MESSAGE_THROTTLE_BYTES", + "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE", + "STATE_OPEN_MESSAGE_READ_FRONT", + "STATE_OPEN_MESSAGE_READ_MIDDLE", + "STATE_OPEN_MESSAGE_READ_DATA_PREPARE", + "STATE_OPEN_MESSAGE_READ_DATA", + "STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH", + "STATE_OPEN_TAG_CLOSE", + "STATE_WAIT_SEND", + "STATE_CONNECTING", + "STATE_CONNECTING_RE", + "STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY", + "STATE_CONNECTING_SEND_CONNECT_MSG", + "STATE_CONNECTING_WAIT_CONNECT_REPLY", + "STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH", + "STATE_CONNECTING_WAIT_ACK_SEQ", + "STATE_CONNECTING_READY", + "STATE_ACCEPTING", + "STATE_ACCEPTING_WAIT_BANNER_ADDR", + "STATE_ACCEPTING_WAIT_CONNECT_MSG", + "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH", + "STATE_ACCEPTING_WAIT_SEQ", + "STATE_ACCEPTING_READY", + "STATE_STANDBY", + "STATE_CLOSED", + "STATE_WAIT"}; + return statenames[state]; + } + + AsyncMessenger *async_msgr; + uint64_t conn_id; + PerfCounters *logger; + int global_seq; + __u32 connect_seq, peer_global_seq; + std::atomic out_seq{0}; + std::atomic ack_left{0}, in_seq{0}; + int state; + int state_after_send; + ConnectedSocket cs; + int port; + Messenger::Policy policy; + + DispatchQueue *dispatch_queue; + + // lockfree, only used in own thread + bufferlist outcoming_bl; + bool open_write = false; + + std::mutex write_lock; + enum class WriteStatus { + NOWRITE, + REPLACING, + CANWRITE, + CLOSED + }; + std::atomic can_write; + list sent; // the first bufferlist need to inject seq + map > > out_q; // priority queue for outbound msgs + bool keepalive; + + std::mutex lock; + utime_t backoff; // backoff time + EventCallbackRef read_handler; + EventCallbackRef write_handler; + EventCallbackRef wakeup_handler; + EventCallbackRef tick_handler; + struct iovec msgvec[ASYNC_IOV_MAX]; + char *recv_buf; + uint32_t recv_max_prefetch; + uint32_t recv_start; + uint32_t recv_end; + set register_time_events; // need to delete it if stop + ceph::coarse_mono_clock::time_point last_active; + uint64_t last_tick_id = 0; + const uint64_t inactive_timeout_us; + + // Tis section are temp variables used by state transition + + // Open state + utime_t recv_stamp; + utime_t throttle_stamp; + unsigned msg_left; + uint64_t cur_msg_size; + ceph_msg_header current_header; + bufferlist data_buf; + bufferlist::iterator data_blp; + bufferlist front, middle, data; + ceph_msg_connect connect_msg; + // Connecting state + bool got_bad_auth; + AuthAuthorizer *authorizer; + bufferlist authorizer_buf; + ceph_msg_connect_reply connect_reply; + // Accepting state + entity_addr_t socket_addr; + CryptoKey session_key; + bool replacing; // when replacing process happened, we will reply connect + // side with RETRY tag and accept side will clear replaced + // connection. So when connect side reissue connect_msg, + // there won't exists conflicting connection so we use + // "replacing" to skip RESETSESSION to avoid detect wrong + // presentation + bool is_reset_from_peer; + bool once_ready; + + // used only for local state, it will be overwrite when state transition + char *state_buffer; + // used only by "read_until" + uint64_t state_offset; + Worker *worker; + EventCenter *center; + ceph::shared_ptr session_security; + + public: + // used by eventcallback + void handle_write(); + void process(); + void wakeup_from(uint64_t id); + void tick(uint64_t id); + void local_deliver(); + void stop(bool queue_reset) { + lock.lock(); + bool need_queue_reset = (state != STATE_CLOSED) && queue_reset; + _stop(); + lock.unlock(); + if (need_queue_reset) + dispatch_queue->queue_reset(this); + } + void cleanup() { + shutdown_socket(); + delete read_handler; + delete write_handler; + delete wakeup_handler; + delete tick_handler; + if (delay_state) { + delete delay_state; + delay_state = NULL; + } + } + PerfCounters *get_perf_counter() { + return logger; + } +}; /* AsyncConnection */ + +typedef boost::intrusive_ptr AsyncConnectionRef; + +#endif