initial code repo
[stor4nfv.git] / src / ceph / src / msg / xio / XioConnection.h
diff --git a/src/ceph/src/msg/xio/XioConnection.h b/src/ceph/src/msg/xio/XioConnection.h
new file mode 100644 (file)
index 0000000..1ed88b9
--- /dev/null
@@ -0,0 +1,375 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * Portions Copyright (C) 2013 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef XIO_CONNECTION_H
+#define XIO_CONNECTION_H
+
+#include <atomic>
+
+#include <boost/intrusive/avl_set.hpp>
+#include <boost/intrusive/list.hpp>
+
+extern "C" {
+#include "libxio.h"
+}
+
+#include "XioInSeq.h"
+#include "XioSubmit.h"
+#include "msg/Connection.h"
+#include "msg/Messenger.h"
+#include "auth/AuthSessionHandler.h"
+
+#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL)
+
+
+#define XIO_NOP_TAG_MARKDOWN 0x0001
+
+namespace bi = boost::intrusive;
+
+class XioPortal;
+class XioMessenger;
+class XioSend;
+
+class XioConnection : public Connection
+{
+public:
+  enum type { ACTIVE, PASSIVE };
+
+  enum class session_states : unsigned {
+    INIT = 0,
+    START,
+    UP,
+    FLOW_CONTROLLED,
+    DISCONNECTED,
+    DELETED,
+    BARRIER
+  };
+
+  enum class session_startup_states : unsigned {
+    IDLE = 0,
+    CONNECTING,
+    ACCEPTING,
+    READY,
+    FAIL
+  };
+
+private:
+  XioConnection::type xio_conn_type;
+  XioPortal *portal;
+  std::atomic<bool> connected = { false };
+  entity_inst_t peer;
+  struct xio_session *session;
+  struct xio_connection        *conn;
+  pthread_spinlock_t sp;
+  std::atomic<int64_t> send = { 0 };
+  std::atomic<int64_t> recv = { 0 };
+  uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials)
+  uint32_t magic;
+  uint32_t special_handling;
+  uint64_t scount;
+  uint32_t send_ctr;
+  int q_high_mark;
+  int q_low_mark;
+
+  struct lifecycle {
+    // different from Pipe states?
+    enum lf_state {
+      INIT,
+      LOCAL_DISCON,
+      REMOTE_DISCON,
+      RECONNECTING,
+      UP,
+      DEAD } state;
+
+    /* XXX */
+    uint32_t reconnects;
+    uint32_t connect_seq, peer_global_seq;
+    uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
+    std::atomic<int64_t> out_seq = { 0 }; 
+
+    lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0),
+                 peer_global_seq(0), in_seq(0), out_seq_acked(0)
+                 {}
+
+    void set_in_seq(uint64_t seq) {
+      in_seq = seq;
+    }
+
+    uint64_t next_out_seq() {
+      return ++out_seq;
+    }
+
+  } state;
+
+  /* batching */
+  XioInSeq in_seq;
+
+  class CState
+  {
+  public:
+    static const int FLAG_NONE = 0x0000;
+    static const int FLAG_BAD_AUTH = 0x0001;
+    static const int FLAG_MAPPED = 0x0002;
+    static const int FLAG_RESET = 0x0004;
+
+    static const int OP_FLAG_NONE = 0x0000;
+    static const int OP_FLAG_LOCKED = 0x0001;
+    static const int OP_FLAG_LRU = 0x0002;
+
+    uint64_t features;
+    Messenger::Policy policy;
+
+    CryptoKey session_key;
+    ceph::shared_ptr<AuthSessionHandler> session_security;
+    AuthAuthorizer *authorizer;
+    XioConnection *xcon;
+    uint32_t protocol_version;
+
+    std::atomic<session_states> session_state = { 0 };
+    std::atomic<session_startup_state> startup_state = { 0 };
+
+    uint32_t reconnects;
+    uint32_t connect_seq, global_seq, peer_global_seq;
+    uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
+    std::atomic<uint64_t> out_seq = { 0 }; 
+
+    uint32_t flags;
+
+    explicit CState(XioConnection* _xcon)
+      : features(0),
+       authorizer(NULL),
+       xcon(_xcon),
+       protocol_version(0),
+       session_state(INIT),
+       startup_state(IDLE),
+       reconnects(0),
+       connect_seq(0),
+       global_seq(0),
+       peer_global_seq(0),
+       in_seq(0),
+       out_seq_acked(0),
+       flags(FLAG_NONE) {}
+
+    uint64_t get_session_state() {
+      return session_state;
+    }
+
+    uint64_t get_startup_state() {
+      return startup_state;
+    }
+
+    void set_in_seq(uint64_t seq) {
+      in_seq = seq;
+    }
+
+    uint64_t next_out_seq() {
+      return ++out_seq;
+    };
+
+    // state machine
+    int init_state();
+    int next_state(Message* m);
+#if 0 // future (session startup)
+    int msg_connect(MConnect *m);
+    int msg_connect_reply(MConnectReply *m);
+    int msg_connect_reply(MConnectAuthReply *m);
+    int msg_connect_auth(MConnectAuth *m);
+    int msg_connect_auth_reply(MConnectAuthReply *m);
+#endif
+    int state_up_ready(uint32_t flags);
+    int state_flow_controlled(uint32_t flags);
+    int state_discon();
+    int state_fail(Message* m, uint32_t flags);
+
+  } cstate; /* CState */
+
+  // message submission queue
+  struct SendQ {
+    bool keepalive;
+    bool ack;
+    utime_t ack_time;
+    Message::Queue mqueue; // deferred
+    XioSubmit::Queue requeue;
+
+    SendQ():keepalive(false), ack(false){}
+  } outgoing;
+
+  // conns_entity_map comparison functor
+  struct EntityComp
+  {
+    // for internal ordering
+    bool operator()(const XioConnection &lhs,  const XioConnection &rhs) const
+      {  return lhs.get_peer() < rhs.get_peer(); }
+
+    // for external search by entity_inst_t(peer)
+    bool operator()(const entity_inst_t &peer, const XioConnection &c) const
+      {  return peer < c.get_peer(); }
+
+    bool operator()(const XioConnection &c, const entity_inst_t &peer) const
+      {  return c.get_peer() < peer;  }
+  };
+
+  bi::list_member_hook<> conns_hook;
+  bi::avl_set_member_hook<> conns_entity_map_hook;
+
+  typedef bi::list< XioConnection,
+                   bi::member_hook<XioConnection, bi::list_member_hook<>,
+                                   &XioConnection::conns_hook > > ConnList;
+
+  typedef bi::member_hook<XioConnection, bi::avl_set_member_hook<>,
+                         &XioConnection::conns_entity_map_hook> EntityHook;
+
+  typedef bi::avl_set< XioConnection, EntityHook,
+                      bi::compare<EntityComp> > EntitySet;
+
+  friend class XioPortal;
+  friend class XioMessenger;
+  friend class XioDispatchHook;
+  friend class XioMarkDownHook;
+  friend class XioSend;
+
+  int on_disconnect_event() {
+    connected = false;
+    pthread_spin_lock(&sp);
+    discard_out_queues(CState::OP_FLAG_LOCKED);
+    pthread_spin_unlock(&sp);
+    return 0;
+  }
+
+  int on_teardown_event() {
+    pthread_spin_lock(&sp);
+    if (conn)
+      xio_connection_destroy(conn);
+    conn = NULL;
+    pthread_spin_unlock(&sp);
+    this->put();
+    return 0;
+  }
+
+  int xio_qdepth_high_mark() {
+    return q_high_mark;
+  }
+
+  int xio_qdepth_low_mark() {
+    return q_low_mark;
+  }
+
+public:
+  XioConnection(XioMessenger *m, XioConnection::type _type,
+               const entity_inst_t& peer);
+
+  ~XioConnection() {
+    if (conn)
+      xio_connection_destroy(conn);
+  }
+  ostream& conn_prefix(std::ostream *_dout);
+
+  bool is_connected() override { return connected; }
+
+  int send_message(Message *m) override;
+  void send_keepalive() override {send_keepalive_or_ack();}
+  void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr);
+  void mark_down() override;
+  int _mark_down(uint32_t flags);
+  void mark_disposable() override;
+  int _mark_disposable(uint32_t flags);
+
+  const entity_inst_t& get_peer() const { return peer; }
+
+  XioConnection* get() {
+#if 0
+    cout << "XioConnection::get " << this << " " << nref.load() << std::endl;
+#endif
+    RefCountedObject::get();
+    return this;
+  }
+
+  void put() {
+    RefCountedObject::put();
+#if 0
+    cout << "XioConnection::put " << this << " " << nref.load() << std::endl;
+#endif
+  }
+
+  void disconnect() {
+    if (is_connected()) {
+      connected = false;
+      xio_disconnect(conn); // normal teardown will clean up conn
+    }
+  }
+
+  uint32_t get_magic() { return magic; }
+  void set_magic(int _magic) { magic = _magic; }
+  uint32_t get_special_handling() { return special_handling; }
+  void set_special_handling(int n) { special_handling = n; }
+  uint64_t get_scount() { return scount; }
+
+  int passive_setup(); /* XXX */
+
+  int handle_data_msg(struct xio_session *session, struct xio_msg *msg,
+                int more_in_batch, void *cb_user_context);
+  int on_msg(struct xio_session *session, struct xio_msg *msg,
+                int more_in_batch, void *cb_user_context);
+  int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
+                             void *conn_user_context);
+  int on_msg_error(struct xio_session *session, enum xio_status error,
+                  struct xio_msg  *msg, void *conn_user_context);
+  void msg_send_fail(XioSend *xsend, int code);
+  void msg_release_fail(struct xio_msg *msg, int code);
+private:
+  void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr);
+  int flush_out_queues(uint32_t flags);
+  int discard_out_queues(uint32_t flags);
+  int adjust_clru(uint32_t flags);
+};
+
+typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
+
+class XioLoopbackConnection : public Connection
+{
+private:
+  std::atomic<uint64_t> seq = { 0 };
+public:
+  explicit XioLoopbackConnection(Messenger *m) : Connection(m->cct, m)
+    {
+      const entity_inst_t& m_inst = m->get_myinst();
+      peer_addr = m_inst.addr;
+      peer_type = m_inst.name.type();
+      set_features(XIO_ALL_FEATURES); /* XXXX set to ours */
+    }
+
+  XioLoopbackConnection* get() {
+    return static_cast<XioLoopbackConnection*>(RefCountedObject::get());
+  }
+
+  bool is_connected() override { return true; }
+
+  int send_message(Message *m) override;
+  void send_keepalive() override;
+  void mark_down() override {}
+  void mark_disposable() override {}
+
+  uint64_t get_seq() {
+    return seq;
+  }
+
+  uint64_t next_seq() {
+    return ++seq;
+  }
+};
+
+typedef boost::intrusive_ptr<XioLoopbackConnection> XioLoopbackConnectionRef;
+
+#endif /* XIO_CONNECTION_H */