remove ceph code
[stor4nfv.git] / src / ceph / src / msg / async / AsyncConnection.h
diff --git a/src/ceph/src/msg/async/AsyncConnection.h b/src/ceph/src/msg/async/AsyncConnection.h
deleted file mode 100644 (file)
index ab2ff2c..0000000
+++ /dev/null
@@ -1,408 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
- *
- * Author: Haomai Wang <haomaiwang@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#ifndef CEPH_MSG_ASYNCCONNECTION_H
-#define CEPH_MSG_ASYNCCONNECTION_H
-
-#include <atomic>
-#include <pthread.h>
-#include <climits>
-#include <list>
-#include <mutex>
-#include <map>
-using namespace std;
-
-#include "auth/AuthSessionHandler.h"
-#include "common/ceph_time.h"
-#include "common/perf_counters.h"
-#include "include/buffer.h"
-#include "msg/Connection.h"
-#include "msg/Messenger.h"
-
-#include "Event.h"
-#include "Stack.h"
-
-class AsyncMessenger;
-class Worker;
-
-static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
-
-/*
- * AsyncConnection maintains a logic session between two endpoints. In other
- * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
- * will handle with network fault or read/write transactions. If one file
- * descriptor broken, AsyncConnection will maintain the message queue and
- * sequence, try to reconnect peer endpoint.
- */
-class AsyncConnection : public Connection {
-
-  ssize_t read_bulk(char *buf, unsigned len);
-  ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
-  ssize_t try_send(bufferlist &bl, bool more=false) {
-    std::lock_guard<std::mutex> l(write_lock);
-    outcoming_bl.claim_append(bl);
-    return _try_send(more);
-  }
-  ssize_t _try_send(bool more=false);
-  ssize_t _send(Message *m);
-  void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
-  ssize_t read_until(unsigned needed, char *p);
-  ssize_t _process_connection();
-  void _connect();
-  void _stop();
-  int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
-  ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
-  void was_session_reset();
-  void fault();
-  void discard_out_queue();
-  void discard_requeued_up_to(uint64_t seq);
-  void requeue_sent();
-  int randomize_out_seq();
-  void handle_ack(uint64_t seq);
-  void _append_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
-  ssize_t write_message(Message *m, bufferlist& bl, bool more);
-  void inject_delay();
-  ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
-                    bufferlist &authorizer_reply) {
-    bufferlist reply_bl;
-    reply.tag = tag;
-    reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
-    reply.authorizer_len = authorizer_reply.length();
-    reply_bl.append((char*)&reply, sizeof(reply));
-    if (reply.authorizer_len) {
-      reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
-    }
-    ssize_t r = try_send(reply_bl);
-    if (r < 0) {
-      inject_delay();
-      return -1;
-    }
-
-    state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
-    return 0;
-  }
-  bool is_queued() const {
-    return !out_q.empty() || outcoming_bl.length();
-  }
-  void shutdown_socket() {
-    for (auto &&t : register_time_events)
-      center->delete_time_event(t);
-    register_time_events.clear();
-    if (last_tick_id) {
-      center->delete_time_event(last_tick_id);
-      last_tick_id = 0;
-    }
-    if (cs) {
-      center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
-      cs.shutdown();
-      cs.close();
-    }
-  }
-  Message *_get_next_outgoing(bufferlist *bl) {
-    Message *m = 0;
-    while (!m && !out_q.empty()) {
-      map<int, list<pair<bufferlist, Message*> > >::reverse_iterator it = out_q.rbegin();
-      if (!it->second.empty()) {
-        list<pair<bufferlist, Message*> >::iterator p = it->second.begin();
-        m = p->second;
-        if (bl)
-          bl->swap(p->first);
-        it->second.erase(p);
-      }
-      if (it->second.empty())
-        out_q.erase(it->first);
-    }
-    return m;
-  }
-  bool _has_next_outgoing() const {
-    return !out_q.empty();
-  }
-  void reset_recv_state();
-
-   /**
-   * The DelayedDelivery is for injecting delays into Message delivery off
-   * the socket. It is only enabled if delays are requested, and if they
-   * are then it pulls Messages off the DelayQueue and puts them into the
-   * AsyncMessenger event queue.
-   */
-  class DelayedDelivery : public EventCallback {
-    std::set<uint64_t> register_time_events; // need to delete it if stop
-    std::deque<std::pair<utime_t, Message*> > delay_queue;
-    std::mutex delay_lock;
-    AsyncMessenger *msgr;
-    EventCenter *center;
-    DispatchQueue *dispatch_queue;
-    uint64_t conn_id;
-    std::atomic_bool stop_dispatch;
-
-   public:
-    explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
-                             DispatchQueue *q, uint64_t cid)
-      : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
-        stop_dispatch(false) { }
-    ~DelayedDelivery() override {
-      assert(register_time_events.empty());
-      assert(delay_queue.empty());
-    }
-    void set_center(EventCenter *c) { center = c; }
-    void do_request(int id) override;
-    void queue(double delay_period, utime_t release, Message *m) {
-      std::lock_guard<std::mutex> l(delay_lock);
-      delay_queue.push_back(std::make_pair(release, m));
-      register_time_events.insert(center->create_time_event(delay_period*1000000, this));
-    }
-    void discard() {
-      stop_dispatch = true;
-      center->submit_to(center->get_id(), [this] () mutable {
-        std::lock_guard<std::mutex> l(delay_lock);
-        while (!delay_queue.empty()) {
-          Message *m = delay_queue.front().second;
-          dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
-          m->put();
-          delay_queue.pop_front();
-        }
-        for (auto i : register_time_events)
-          center->delete_time_event(i);
-        register_time_events.clear();
-        stop_dispatch = false;
-      }, true);
-    }
-    bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
-    void flush();
-  } *delay_state;
-
- public:
-  AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w);
-  ~AsyncConnection() override;
-  void maybe_start_delay_thread();
-
-  ostream& _conn_prefix(std::ostream *_dout);
-
-  bool is_connected() override {
-    return can_write.load() == WriteStatus::CANWRITE;
-  }
-
-  // Only call when AsyncConnection first construct
-  void connect(const entity_addr_t& addr, int type) {
-    set_peer_type(type);
-    set_peer_addr(addr);
-    policy = msgr->get_policy(type);
-    _connect();
-  }
-  // Only call when AsyncConnection first construct
-  void accept(ConnectedSocket socket, entity_addr_t &addr);
-  int send_message(Message *m) override;
-
-  void send_keepalive() override;
-  void mark_down() override;
-  void mark_disposable() override {
-    std::lock_guard<std::mutex> l(lock);
-    policy.lossy = true;
-  }
-  
- private:
-  enum {
-    STATE_NONE,
-    STATE_OPEN,
-    STATE_OPEN_KEEPALIVE2,
-    STATE_OPEN_KEEPALIVE2_ACK,
-    STATE_OPEN_TAG_ACK,
-    STATE_OPEN_MESSAGE_HEADER,
-    STATE_OPEN_MESSAGE_THROTTLE_MESSAGE,
-    STATE_OPEN_MESSAGE_THROTTLE_BYTES,
-    STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE,
-    STATE_OPEN_MESSAGE_READ_FRONT,
-    STATE_OPEN_MESSAGE_READ_MIDDLE,
-    STATE_OPEN_MESSAGE_READ_DATA_PREPARE,
-    STATE_OPEN_MESSAGE_READ_DATA,
-    STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH,
-    STATE_OPEN_TAG_CLOSE,
-    STATE_WAIT_SEND,
-    STATE_CONNECTING,
-    STATE_CONNECTING_RE,
-    STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY,
-    STATE_CONNECTING_SEND_CONNECT_MSG,
-    STATE_CONNECTING_WAIT_CONNECT_REPLY,
-    STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH,
-    STATE_CONNECTING_WAIT_ACK_SEQ,
-    STATE_CONNECTING_READY,
-    STATE_ACCEPTING,
-    STATE_ACCEPTING_WAIT_BANNER_ADDR,
-    STATE_ACCEPTING_WAIT_CONNECT_MSG,
-    STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH,
-    STATE_ACCEPTING_WAIT_SEQ,
-    STATE_ACCEPTING_READY,
-    STATE_STANDBY,
-    STATE_CLOSED,
-    STATE_WAIT,       // just wait for racing connection
-  };
-
-  static const int TCP_PREFETCH_MIN_SIZE;
-  static const char *get_state_name(int state) {
-      const char* const statenames[] = {"STATE_NONE",
-                                        "STATE_OPEN",
-                                        "STATE_OPEN_KEEPALIVE2",
-                                        "STATE_OPEN_KEEPALIVE2_ACK",
-                                        "STATE_OPEN_TAG_ACK",
-                                        "STATE_OPEN_MESSAGE_HEADER",
-                                        "STATE_OPEN_MESSAGE_THROTTLE_MESSAGE",
-                                        "STATE_OPEN_MESSAGE_THROTTLE_BYTES",
-                                        "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE",
-                                        "STATE_OPEN_MESSAGE_READ_FRONT",
-                                        "STATE_OPEN_MESSAGE_READ_MIDDLE",
-                                        "STATE_OPEN_MESSAGE_READ_DATA_PREPARE",
-                                        "STATE_OPEN_MESSAGE_READ_DATA",
-                                        "STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH",
-                                        "STATE_OPEN_TAG_CLOSE",
-                                        "STATE_WAIT_SEND",
-                                        "STATE_CONNECTING",
-                                        "STATE_CONNECTING_RE",
-                                        "STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY",
-                                        "STATE_CONNECTING_SEND_CONNECT_MSG",
-                                        "STATE_CONNECTING_WAIT_CONNECT_REPLY",
-                                        "STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH",
-                                        "STATE_CONNECTING_WAIT_ACK_SEQ",
-                                        "STATE_CONNECTING_READY",
-                                        "STATE_ACCEPTING",
-                                        "STATE_ACCEPTING_WAIT_BANNER_ADDR",
-                                        "STATE_ACCEPTING_WAIT_CONNECT_MSG",
-                                        "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH",
-                                        "STATE_ACCEPTING_WAIT_SEQ",
-                                        "STATE_ACCEPTING_READY",
-                                        "STATE_STANDBY",
-                                        "STATE_CLOSED",
-                                        "STATE_WAIT"};
-      return statenames[state];
-  }
-
-  AsyncMessenger *async_msgr;
-  uint64_t conn_id;
-  PerfCounters *logger;
-  int global_seq;
-  __u32 connect_seq, peer_global_seq;
-  std::atomic<uint64_t> out_seq{0};
-  std::atomic<uint64_t> ack_left{0}, in_seq{0};
-  int state;
-  int state_after_send;
-  ConnectedSocket cs;
-  int port;
-  Messenger::Policy policy;
-
-  DispatchQueue *dispatch_queue;
-
-  // lockfree, only used in own thread
-  bufferlist outcoming_bl;
-  bool open_write = false;
-
-  std::mutex write_lock;
-  enum class WriteStatus {
-    NOWRITE,
-    REPLACING,
-    CANWRITE,
-    CLOSED
-  };
-  std::atomic<WriteStatus> can_write;
-  list<Message*> sent; // the first bufferlist need to inject seq
-  map<int, list<pair<bufferlist, Message*> > > out_q;  // priority queue for outbound msgs
-  bool keepalive;
-
-  std::mutex lock;
-  utime_t backoff;         // backoff time
-  EventCallbackRef read_handler;
-  EventCallbackRef write_handler;
-  EventCallbackRef wakeup_handler;
-  EventCallbackRef tick_handler;
-  struct iovec msgvec[ASYNC_IOV_MAX];
-  char *recv_buf;
-  uint32_t recv_max_prefetch;
-  uint32_t recv_start;
-  uint32_t recv_end;
-  set<uint64_t> register_time_events; // need to delete it if stop
-  ceph::coarse_mono_clock::time_point last_active;
-  uint64_t last_tick_id = 0;
-  const uint64_t inactive_timeout_us;
-
-  // Tis section are temp variables used by state transition
-
-  // Open state
-  utime_t recv_stamp;
-  utime_t throttle_stamp;
-  unsigned msg_left;
-  uint64_t cur_msg_size;
-  ceph_msg_header current_header;
-  bufferlist data_buf;
-  bufferlist::iterator data_blp;
-  bufferlist front, middle, data;
-  ceph_msg_connect connect_msg;
-  // Connecting state
-  bool got_bad_auth;
-  AuthAuthorizer *authorizer;
-  bufferlist authorizer_buf;
-  ceph_msg_connect_reply connect_reply;
-  // Accepting state
-  entity_addr_t socket_addr;
-  CryptoKey session_key;
-  bool replacing;    // when replacing process happened, we will reply connect
-                     // side with RETRY tag and accept side will clear replaced
-                     // connection. So when connect side reissue connect_msg,
-                     // there won't exists conflicting connection so we use
-                     // "replacing" to skip RESETSESSION to avoid detect wrong
-                     // presentation
-  bool is_reset_from_peer;
-  bool once_ready;
-
-  // used only for local state, it will be overwrite when state transition
-  char *state_buffer;
-  // used only by "read_until"
-  uint64_t state_offset;
-  Worker *worker;
-  EventCenter *center;
-  ceph::shared_ptr<AuthSessionHandler> session_security;
-
- public:
-  // used by eventcallback
-  void handle_write();
-  void process();
-  void wakeup_from(uint64_t id);
-  void tick(uint64_t id);
-  void local_deliver();
-  void stop(bool queue_reset) {
-    lock.lock();
-    bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
-    _stop();
-    lock.unlock();
-    if (need_queue_reset)
-      dispatch_queue->queue_reset(this);
-  }
-  void cleanup() {
-    shutdown_socket();
-    delete read_handler;
-    delete write_handler;
-    delete wakeup_handler;
-    delete tick_handler;
-    if (delay_state) {
-      delete delay_state;
-      delay_state = NULL;
-    }
-  }
-  PerfCounters *get_perf_counter() {
-    return logger;
-  }
-}; /* AsyncConnection */
-
-typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
-
-#endif