X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fxio%2FXioConnection.h;fp=src%2Fceph%2Fsrc%2Fmsg%2Fxio%2FXioConnection.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=1ed88b995d8e867aa010febfc32cbf89937b2ad6;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/xio/XioConnection.h b/src/ceph/src/msg/xio/XioConnection.h deleted file mode 100644 index 1ed88b9..0000000 --- a/src/ceph/src/msg/xio/XioConnection.h +++ /dev/null @@ -1,375 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * Portions Copyright (C) 2013 CohortFS, LLC - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef XIO_CONNECTION_H -#define XIO_CONNECTION_H - -#include - -#include -#include - -extern "C" { -#include "libxio.h" -} - -#include "XioInSeq.h" -#include "XioSubmit.h" -#include "msg/Connection.h" -#include "msg/Messenger.h" -#include "auth/AuthSessionHandler.h" - -#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL) - - -#define XIO_NOP_TAG_MARKDOWN 0x0001 - -namespace bi = boost::intrusive; - -class XioPortal; -class XioMessenger; -class XioSend; - -class XioConnection : public Connection -{ -public: - enum type { ACTIVE, PASSIVE }; - - enum class session_states : unsigned { - INIT = 0, - START, - UP, - FLOW_CONTROLLED, - DISCONNECTED, - DELETED, - BARRIER - }; - - enum class session_startup_states : unsigned { - IDLE = 0, - CONNECTING, - ACCEPTING, - READY, - FAIL - }; - -private: - XioConnection::type xio_conn_type; - XioPortal *portal; - std::atomic connected = { false }; - entity_inst_t peer; - struct xio_session *session; - struct xio_connection *conn; - pthread_spinlock_t sp; - std::atomic send = { 0 }; - std::atomic recv = { 0 }; - uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials) - uint32_t magic; - uint32_t special_handling; - uint64_t scount; - uint32_t send_ctr; - int q_high_mark; - int q_low_mark; - - struct lifecycle { - // different from Pipe states? - enum lf_state { - INIT, - LOCAL_DISCON, - REMOTE_DISCON, - RECONNECTING, - UP, - DEAD } state; - - /* XXX */ - uint32_t reconnects; - uint32_t connect_seq, peer_global_seq; - uint64_t in_seq, out_seq_acked; // atomic, got receipt - std::atomic out_seq = { 0 }; - - lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0), - peer_global_seq(0), in_seq(0), out_seq_acked(0) - {} - - void set_in_seq(uint64_t seq) { - in_seq = seq; - } - - uint64_t next_out_seq() { - return ++out_seq; - } - - } state; - - /* batching */ - XioInSeq in_seq; - - class CState - { - public: - static const int FLAG_NONE = 0x0000; - static const int FLAG_BAD_AUTH = 0x0001; - static const int FLAG_MAPPED = 0x0002; - static const int FLAG_RESET = 0x0004; - - static const int OP_FLAG_NONE = 0x0000; - static const int OP_FLAG_LOCKED = 0x0001; - static const int OP_FLAG_LRU = 0x0002; - - uint64_t features; - Messenger::Policy policy; - - CryptoKey session_key; - ceph::shared_ptr session_security; - AuthAuthorizer *authorizer; - XioConnection *xcon; - uint32_t protocol_version; - - std::atomic session_state = { 0 }; - std::atomic startup_state = { 0 }; - - uint32_t reconnects; - uint32_t connect_seq, global_seq, peer_global_seq; - uint64_t in_seq, out_seq_acked; // atomic, got receipt - std::atomic out_seq = { 0 }; - - uint32_t flags; - - explicit CState(XioConnection* _xcon) - : features(0), - authorizer(NULL), - xcon(_xcon), - protocol_version(0), - session_state(INIT), - startup_state(IDLE), - reconnects(0), - connect_seq(0), - global_seq(0), - peer_global_seq(0), - in_seq(0), - out_seq_acked(0), - flags(FLAG_NONE) {} - - uint64_t get_session_state() { - return session_state; - } - - uint64_t get_startup_state() { - return startup_state; - } - - void set_in_seq(uint64_t seq) { - in_seq = seq; - } - - uint64_t next_out_seq() { - return ++out_seq; - }; - - // state machine - int init_state(); - int next_state(Message* m); -#if 0 // future (session startup) - int msg_connect(MConnect *m); - int msg_connect_reply(MConnectReply *m); - int msg_connect_reply(MConnectAuthReply *m); - int msg_connect_auth(MConnectAuth *m); - int msg_connect_auth_reply(MConnectAuthReply *m); -#endif - int state_up_ready(uint32_t flags); - int state_flow_controlled(uint32_t flags); - int state_discon(); - int state_fail(Message* m, uint32_t flags); - - } cstate; /* CState */ - - // message submission queue - struct SendQ { - bool keepalive; - bool ack; - utime_t ack_time; - Message::Queue mqueue; // deferred - XioSubmit::Queue requeue; - - SendQ():keepalive(false), ack(false){} - } outgoing; - - // conns_entity_map comparison functor - struct EntityComp - { - // for internal ordering - bool operator()(const XioConnection &lhs, const XioConnection &rhs) const - { return lhs.get_peer() < rhs.get_peer(); } - - // for external search by entity_inst_t(peer) - bool operator()(const entity_inst_t &peer, const XioConnection &c) const - { return peer < c.get_peer(); } - - bool operator()(const XioConnection &c, const entity_inst_t &peer) const - { return c.get_peer() < peer; } - }; - - bi::list_member_hook<> conns_hook; - bi::avl_set_member_hook<> conns_entity_map_hook; - - typedef bi::list< XioConnection, - bi::member_hook, - &XioConnection::conns_hook > > ConnList; - - typedef bi::member_hook, - &XioConnection::conns_entity_map_hook> EntityHook; - - typedef bi::avl_set< XioConnection, EntityHook, - bi::compare > EntitySet; - - friend class XioPortal; - friend class XioMessenger; - friend class XioDispatchHook; - friend class XioMarkDownHook; - friend class XioSend; - - int on_disconnect_event() { - connected = false; - pthread_spin_lock(&sp); - discard_out_queues(CState::OP_FLAG_LOCKED); - pthread_spin_unlock(&sp); - return 0; - } - - int on_teardown_event() { - pthread_spin_lock(&sp); - if (conn) - xio_connection_destroy(conn); - conn = NULL; - pthread_spin_unlock(&sp); - this->put(); - return 0; - } - - int xio_qdepth_high_mark() { - return q_high_mark; - } - - int xio_qdepth_low_mark() { - return q_low_mark; - } - -public: - XioConnection(XioMessenger *m, XioConnection::type _type, - const entity_inst_t& peer); - - ~XioConnection() { - if (conn) - xio_connection_destroy(conn); - } - ostream& conn_prefix(std::ostream *_dout); - - bool is_connected() override { return connected; } - - int send_message(Message *m) override; - void send_keepalive() override {send_keepalive_or_ack();} - void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr); - void mark_down() override; - int _mark_down(uint32_t flags); - void mark_disposable() override; - int _mark_disposable(uint32_t flags); - - const entity_inst_t& get_peer() const { return peer; } - - XioConnection* get() { -#if 0 - cout << "XioConnection::get " << this << " " << nref.load() << std::endl; -#endif - RefCountedObject::get(); - return this; - } - - void put() { - RefCountedObject::put(); -#if 0 - cout << "XioConnection::put " << this << " " << nref.load() << std::endl; -#endif - } - - void disconnect() { - if (is_connected()) { - connected = false; - xio_disconnect(conn); // normal teardown will clean up conn - } - } - - uint32_t get_magic() { return magic; } - void set_magic(int _magic) { magic = _magic; } - uint32_t get_special_handling() { return special_handling; } - void set_special_handling(int n) { special_handling = n; } - uint64_t get_scount() { return scount; } - - int passive_setup(); /* XXX */ - - int handle_data_msg(struct xio_session *session, struct xio_msg *msg, - int more_in_batch, void *cb_user_context); - int on_msg(struct xio_session *session, struct xio_msg *msg, - int more_in_batch, void *cb_user_context); - int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg, - void *conn_user_context); - int on_msg_error(struct xio_session *session, enum xio_status error, - struct xio_msg *msg, void *conn_user_context); - void msg_send_fail(XioSend *xsend, int code); - void msg_release_fail(struct xio_msg *msg, int code); -private: - void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr); - int flush_out_queues(uint32_t flags); - int discard_out_queues(uint32_t flags); - int adjust_clru(uint32_t flags); -}; - -typedef boost::intrusive_ptr XioConnectionRef; - -class XioLoopbackConnection : public Connection -{ -private: - std::atomic seq = { 0 }; -public: - explicit XioLoopbackConnection(Messenger *m) : Connection(m->cct, m) - { - const entity_inst_t& m_inst = m->get_myinst(); - peer_addr = m_inst.addr; - peer_type = m_inst.name.type(); - set_features(XIO_ALL_FEATURES); /* XXXX set to ours */ - } - - XioLoopbackConnection* get() { - return static_cast(RefCountedObject::get()); - } - - bool is_connected() override { return true; } - - int send_message(Message *m) override; - void send_keepalive() override; - void mark_down() override {} - void mark_disposable() override {} - - uint64_t get_seq() { - return seq; - } - - uint64_t next_seq() { - return ++seq; - } -}; - -typedef boost::intrusive_ptr XioLoopbackConnectionRef; - -#endif /* XIO_CONNECTION_H */