X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FAsyncConnection.h;fp=src%2Fceph%2Fsrc%2Fmsg%2Fasync%2FAsyncConnection.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=ab2ff2c4ab05054a8a2b579d0eed26c33a97f9ad;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/async/AsyncConnection.h b/src/ceph/src/msg/async/AsyncConnection.h deleted file mode 100644 index ab2ff2c..0000000 --- a/src/ceph/src/msg/async/AsyncConnection.h +++ /dev/null @@ -1,408 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2014 UnitedStack - * - * Author: Haomai Wang - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_MSG_ASYNCCONNECTION_H -#define CEPH_MSG_ASYNCCONNECTION_H - -#include -#include -#include -#include -#include -#include -using namespace std; - -#include "auth/AuthSessionHandler.h" -#include "common/ceph_time.h" -#include "common/perf_counters.h" -#include "include/buffer.h" -#include "msg/Connection.h" -#include "msg/Messenger.h" - -#include "Event.h" -#include "Stack.h" - -class AsyncMessenger; -class Worker; - -static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); - -/* - * AsyncConnection maintains a logic session between two endpoints. In other - * word, a pair of addresses can find the only AsyncConnection. AsyncConnection - * will handle with network fault or read/write transactions. If one file - * descriptor broken, AsyncConnection will maintain the message queue and - * sequence, try to reconnect peer endpoint. - */ -class AsyncConnection : public Connection { - - ssize_t read_bulk(char *buf, unsigned len); - ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more); - ssize_t try_send(bufferlist &bl, bool more=false) { - std::lock_guard l(write_lock); - outcoming_bl.claim_append(bl); - return _try_send(more); - } - ssize_t _try_send(bool more=false); - ssize_t _send(Message *m); - void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); - ssize_t read_until(unsigned needed, char *p); - ssize_t _process_connection(); - void _connect(); - void _stop(); - int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r); - ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl); - void was_session_reset(); - void fault(); - void discard_out_queue(); - void discard_requeued_up_to(uint64_t seq); - void requeue_sent(); - int randomize_out_seq(); - void handle_ack(uint64_t seq); - void _append_keepalive_or_ack(bool ack=false, utime_t *t=NULL); - ssize_t write_message(Message *m, bufferlist& bl, bool more); - void inject_delay(); - ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply, - bufferlist &authorizer_reply) { - bufferlist reply_bl; - reply.tag = tag; - reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; - reply.authorizer_len = authorizer_reply.length(); - reply_bl.append((char*)&reply, sizeof(reply)); - if (reply.authorizer_len) { - reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); - } - ssize_t r = try_send(reply_bl); - if (r < 0) { - inject_delay(); - return -1; - } - - state = STATE_ACCEPTING_WAIT_CONNECT_MSG; - return 0; - } - bool is_queued() const { - return !out_q.empty() || outcoming_bl.length(); - } - void shutdown_socket() { - for (auto &&t : register_time_events) - center->delete_time_event(t); - register_time_events.clear(); - if (last_tick_id) { - center->delete_time_event(last_tick_id); - last_tick_id = 0; - } - if (cs) { - center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); - cs.shutdown(); - cs.close(); - } - } - Message *_get_next_outgoing(bufferlist *bl) { - Message *m = 0; - while (!m && !out_q.empty()) { - map > >::reverse_iterator it = out_q.rbegin(); - if (!it->second.empty()) { - list >::iterator p = it->second.begin(); - m = p->second; - if (bl) - bl->swap(p->first); - it->second.erase(p); - } - if (it->second.empty()) - out_q.erase(it->first); - } - return m; - } - bool _has_next_outgoing() const { - return !out_q.empty(); - } - void reset_recv_state(); - - /** - * The DelayedDelivery is for injecting delays into Message delivery off - * the socket. It is only enabled if delays are requested, and if they - * are then it pulls Messages off the DelayQueue and puts them into the - * AsyncMessenger event queue. - */ - class DelayedDelivery : public EventCallback { - std::set register_time_events; // need to delete it if stop - std::deque > delay_queue; - std::mutex delay_lock; - AsyncMessenger *msgr; - EventCenter *center; - DispatchQueue *dispatch_queue; - uint64_t conn_id; - std::atomic_bool stop_dispatch; - - public: - explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c, - DispatchQueue *q, uint64_t cid) - : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid), - stop_dispatch(false) { } - ~DelayedDelivery() override { - assert(register_time_events.empty()); - assert(delay_queue.empty()); - } - void set_center(EventCenter *c) { center = c; } - void do_request(int id) override; - void queue(double delay_period, utime_t release, Message *m) { - std::lock_guard l(delay_lock); - delay_queue.push_back(std::make_pair(release, m)); - register_time_events.insert(center->create_time_event(delay_period*1000000, this)); - } - void discard() { - stop_dispatch = true; - center->submit_to(center->get_id(), [this] () mutable { - std::lock_guard l(delay_lock); - while (!delay_queue.empty()) { - Message *m = delay_queue.front().second; - dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size()); - m->put(); - delay_queue.pop_front(); - } - for (auto i : register_time_events) - center->delete_time_event(i); - register_time_events.clear(); - stop_dispatch = false; - }, true); - } - bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); } - void flush(); - } *delay_state; - - public: - AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w); - ~AsyncConnection() override; - void maybe_start_delay_thread(); - - ostream& _conn_prefix(std::ostream *_dout); - - bool is_connected() override { - return can_write.load() == WriteStatus::CANWRITE; - } - - // Only call when AsyncConnection first construct - void connect(const entity_addr_t& addr, int type) { - set_peer_type(type); - set_peer_addr(addr); - policy = msgr->get_policy(type); - _connect(); - } - // Only call when AsyncConnection first construct - void accept(ConnectedSocket socket, entity_addr_t &addr); - int send_message(Message *m) override; - - void send_keepalive() override; - void mark_down() override; - void mark_disposable() override { - std::lock_guard l(lock); - policy.lossy = true; - } - - private: - enum { - STATE_NONE, - STATE_OPEN, - STATE_OPEN_KEEPALIVE2, - STATE_OPEN_KEEPALIVE2_ACK, - STATE_OPEN_TAG_ACK, - STATE_OPEN_MESSAGE_HEADER, - STATE_OPEN_MESSAGE_THROTTLE_MESSAGE, - STATE_OPEN_MESSAGE_THROTTLE_BYTES, - STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE, - STATE_OPEN_MESSAGE_READ_FRONT, - STATE_OPEN_MESSAGE_READ_MIDDLE, - STATE_OPEN_MESSAGE_READ_DATA_PREPARE, - STATE_OPEN_MESSAGE_READ_DATA, - STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH, - STATE_OPEN_TAG_CLOSE, - STATE_WAIT_SEND, - STATE_CONNECTING, - STATE_CONNECTING_RE, - STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY, - STATE_CONNECTING_SEND_CONNECT_MSG, - STATE_CONNECTING_WAIT_CONNECT_REPLY, - STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH, - STATE_CONNECTING_WAIT_ACK_SEQ, - STATE_CONNECTING_READY, - STATE_ACCEPTING, - STATE_ACCEPTING_WAIT_BANNER_ADDR, - STATE_ACCEPTING_WAIT_CONNECT_MSG, - STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH, - STATE_ACCEPTING_WAIT_SEQ, - STATE_ACCEPTING_READY, - STATE_STANDBY, - STATE_CLOSED, - STATE_WAIT, // just wait for racing connection - }; - - static const int TCP_PREFETCH_MIN_SIZE; - static const char *get_state_name(int state) { - const char* const statenames[] = {"STATE_NONE", - "STATE_OPEN", - "STATE_OPEN_KEEPALIVE2", - "STATE_OPEN_KEEPALIVE2_ACK", - "STATE_OPEN_TAG_ACK", - "STATE_OPEN_MESSAGE_HEADER", - "STATE_OPEN_MESSAGE_THROTTLE_MESSAGE", - "STATE_OPEN_MESSAGE_THROTTLE_BYTES", - "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE", - "STATE_OPEN_MESSAGE_READ_FRONT", - "STATE_OPEN_MESSAGE_READ_MIDDLE", - "STATE_OPEN_MESSAGE_READ_DATA_PREPARE", - "STATE_OPEN_MESSAGE_READ_DATA", - "STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH", - "STATE_OPEN_TAG_CLOSE", - "STATE_WAIT_SEND", - "STATE_CONNECTING", - "STATE_CONNECTING_RE", - "STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY", - "STATE_CONNECTING_SEND_CONNECT_MSG", - "STATE_CONNECTING_WAIT_CONNECT_REPLY", - "STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH", - "STATE_CONNECTING_WAIT_ACK_SEQ", - "STATE_CONNECTING_READY", - "STATE_ACCEPTING", - "STATE_ACCEPTING_WAIT_BANNER_ADDR", - "STATE_ACCEPTING_WAIT_CONNECT_MSG", - "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH", - "STATE_ACCEPTING_WAIT_SEQ", - "STATE_ACCEPTING_READY", - "STATE_STANDBY", - "STATE_CLOSED", - "STATE_WAIT"}; - return statenames[state]; - } - - AsyncMessenger *async_msgr; - uint64_t conn_id; - PerfCounters *logger; - int global_seq; - __u32 connect_seq, peer_global_seq; - std::atomic out_seq{0}; - std::atomic ack_left{0}, in_seq{0}; - int state; - int state_after_send; - ConnectedSocket cs; - int port; - Messenger::Policy policy; - - DispatchQueue *dispatch_queue; - - // lockfree, only used in own thread - bufferlist outcoming_bl; - bool open_write = false; - - std::mutex write_lock; - enum class WriteStatus { - NOWRITE, - REPLACING, - CANWRITE, - CLOSED - }; - std::atomic can_write; - list sent; // the first bufferlist need to inject seq - map > > out_q; // priority queue for outbound msgs - bool keepalive; - - std::mutex lock; - utime_t backoff; // backoff time - EventCallbackRef read_handler; - EventCallbackRef write_handler; - EventCallbackRef wakeup_handler; - EventCallbackRef tick_handler; - struct iovec msgvec[ASYNC_IOV_MAX]; - char *recv_buf; - uint32_t recv_max_prefetch; - uint32_t recv_start; - uint32_t recv_end; - set register_time_events; // need to delete it if stop - ceph::coarse_mono_clock::time_point last_active; - uint64_t last_tick_id = 0; - const uint64_t inactive_timeout_us; - - // Tis section are temp variables used by state transition - - // Open state - utime_t recv_stamp; - utime_t throttle_stamp; - unsigned msg_left; - uint64_t cur_msg_size; - ceph_msg_header current_header; - bufferlist data_buf; - bufferlist::iterator data_blp; - bufferlist front, middle, data; - ceph_msg_connect connect_msg; - // Connecting state - bool got_bad_auth; - AuthAuthorizer *authorizer; - bufferlist authorizer_buf; - ceph_msg_connect_reply connect_reply; - // Accepting state - entity_addr_t socket_addr; - CryptoKey session_key; - bool replacing; // when replacing process happened, we will reply connect - // side with RETRY tag and accept side will clear replaced - // connection. So when connect side reissue connect_msg, - // there won't exists conflicting connection so we use - // "replacing" to skip RESETSESSION to avoid detect wrong - // presentation - bool is_reset_from_peer; - bool once_ready; - - // used only for local state, it will be overwrite when state transition - char *state_buffer; - // used only by "read_until" - uint64_t state_offset; - Worker *worker; - EventCenter *center; - ceph::shared_ptr session_security; - - public: - // used by eventcallback - void handle_write(); - void process(); - void wakeup_from(uint64_t id); - void tick(uint64_t id); - void local_deliver(); - void stop(bool queue_reset) { - lock.lock(); - bool need_queue_reset = (state != STATE_CLOSED) && queue_reset; - _stop(); - lock.unlock(); - if (need_queue_reset) - dispatch_queue->queue_reset(this); - } - void cleanup() { - shutdown_socket(); - delete read_handler; - delete write_handler; - delete wakeup_handler; - delete tick_handler; - if (delay_state) { - delete delay_state; - delay_state = NULL; - } - } - PerfCounters *get_perf_counter() { - return logger; - } -}; /* AsyncConnection */ - -typedef boost::intrusive_ptr AsyncConnectionRef; - -#endif