1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 * Portions Copyright (C) 2013 CohortFS, LLC
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
16 #ifndef XIO_CONNECTION_H
17 #define XIO_CONNECTION_H
21 #include <boost/intrusive/avl_set.hpp>
22 #include <boost/intrusive/list.hpp>
29 #include "XioSubmit.h"
30 #include "msg/Connection.h"
31 #include "msg/Messenger.h"
32 #include "auth/AuthSessionHandler.h"
34 #define XIO_ALL_FEATURES (CEPH_FEATURES_ALL)
37 #define XIO_NOP_TAG_MARKDOWN 0x0001
39 namespace bi = boost::intrusive;
45 class XioConnection : public Connection
48 enum type { ACTIVE, PASSIVE };
50 enum class session_states : unsigned {
60 enum class session_startup_states : unsigned {
69 XioConnection::type xio_conn_type;
71 std::atomic<bool> connected = { false };
73 struct xio_session *session;
74 struct xio_connection *conn;
75 pthread_spinlock_t sp;
76 std::atomic<int64_t> send = { 0 };
77 std::atomic<int64_t> recv = { 0 };
78 uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials)
80 uint32_t special_handling;
87 // different from Pipe states?
98 uint32_t connect_seq, peer_global_seq;
99 uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
100 std::atomic<int64_t> out_seq = { 0 };
102 lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0),
103 peer_global_seq(0), in_seq(0), out_seq_acked(0)
106 void set_in_seq(uint64_t seq) {
110 uint64_t next_out_seq() {
122 static const int FLAG_NONE = 0x0000;
123 static const int FLAG_BAD_AUTH = 0x0001;
124 static const int FLAG_MAPPED = 0x0002;
125 static const int FLAG_RESET = 0x0004;
127 static const int OP_FLAG_NONE = 0x0000;
128 static const int OP_FLAG_LOCKED = 0x0001;
129 static const int OP_FLAG_LRU = 0x0002;
132 Messenger::Policy policy;
134 CryptoKey session_key;
135 ceph::shared_ptr<AuthSessionHandler> session_security;
136 AuthAuthorizer *authorizer;
138 uint32_t protocol_version;
140 std::atomic<session_states> session_state = { 0 };
141 std::atomic<session_startup_state> startup_state = { 0 };
144 uint32_t connect_seq, global_seq, peer_global_seq;
145 uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
146 std::atomic<uint64_t> out_seq = { 0 };
150 explicit CState(XioConnection* _xcon)
165 uint64_t get_session_state() {
166 return session_state;
169 uint64_t get_startup_state() {
170 return startup_state;
173 void set_in_seq(uint64_t seq) {
177 uint64_t next_out_seq() {
183 int next_state(Message* m);
184 #if 0 // future (session startup)
185 int msg_connect(MConnect *m);
186 int msg_connect_reply(MConnectReply *m);
187 int msg_connect_reply(MConnectAuthReply *m);
188 int msg_connect_auth(MConnectAuth *m);
189 int msg_connect_auth_reply(MConnectAuthReply *m);
191 int state_up_ready(uint32_t flags);
192 int state_flow_controlled(uint32_t flags);
194 int state_fail(Message* m, uint32_t flags);
196 } cstate; /* CState */
198 // message submission queue
203 Message::Queue mqueue; // deferred
204 XioSubmit::Queue requeue;
206 SendQ():keepalive(false), ack(false){}
209 // conns_entity_map comparison functor
212 // for internal ordering
213 bool operator()(const XioConnection &lhs, const XioConnection &rhs) const
214 { return lhs.get_peer() < rhs.get_peer(); }
216 // for external search by entity_inst_t(peer)
217 bool operator()(const entity_inst_t &peer, const XioConnection &c) const
218 { return peer < c.get_peer(); }
220 bool operator()(const XioConnection &c, const entity_inst_t &peer) const
221 { return c.get_peer() < peer; }
224 bi::list_member_hook<> conns_hook;
225 bi::avl_set_member_hook<> conns_entity_map_hook;
227 typedef bi::list< XioConnection,
228 bi::member_hook<XioConnection, bi::list_member_hook<>,
229 &XioConnection::conns_hook > > ConnList;
231 typedef bi::member_hook<XioConnection, bi::avl_set_member_hook<>,
232 &XioConnection::conns_entity_map_hook> EntityHook;
234 typedef bi::avl_set< XioConnection, EntityHook,
235 bi::compare<EntityComp> > EntitySet;
237 friend class XioPortal;
238 friend class XioMessenger;
239 friend class XioDispatchHook;
240 friend class XioMarkDownHook;
241 friend class XioSend;
243 int on_disconnect_event() {
245 pthread_spin_lock(&sp);
246 discard_out_queues(CState::OP_FLAG_LOCKED);
247 pthread_spin_unlock(&sp);
251 int on_teardown_event() {
252 pthread_spin_lock(&sp);
254 xio_connection_destroy(conn);
256 pthread_spin_unlock(&sp);
261 int xio_qdepth_high_mark() {
265 int xio_qdepth_low_mark() {
270 XioConnection(XioMessenger *m, XioConnection::type _type,
271 const entity_inst_t& peer);
275 xio_connection_destroy(conn);
277 ostream& conn_prefix(std::ostream *_dout);
279 bool is_connected() override { return connected; }
281 int send_message(Message *m) override;
282 void send_keepalive() override {send_keepalive_or_ack();}
283 void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr);
284 void mark_down() override;
285 int _mark_down(uint32_t flags);
286 void mark_disposable() override;
287 int _mark_disposable(uint32_t flags);
289 const entity_inst_t& get_peer() const { return peer; }
291 XioConnection* get() {
293 cout << "XioConnection::get " << this << " " << nref.load() << std::endl;
295 RefCountedObject::get();
300 RefCountedObject::put();
302 cout << "XioConnection::put " << this << " " << nref.load() << std::endl;
307 if (is_connected()) {
309 xio_disconnect(conn); // normal teardown will clean up conn
313 uint32_t get_magic() { return magic; }
314 void set_magic(int _magic) { magic = _magic; }
315 uint32_t get_special_handling() { return special_handling; }
316 void set_special_handling(int n) { special_handling = n; }
317 uint64_t get_scount() { return scount; }
319 int passive_setup(); /* XXX */
321 int handle_data_msg(struct xio_session *session, struct xio_msg *msg,
322 int more_in_batch, void *cb_user_context);
323 int on_msg(struct xio_session *session, struct xio_msg *msg,
324 int more_in_batch, void *cb_user_context);
325 int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
326 void *conn_user_context);
327 int on_msg_error(struct xio_session *session, enum xio_status error,
328 struct xio_msg *msg, void *conn_user_context);
329 void msg_send_fail(XioSend *xsend, int code);
330 void msg_release_fail(struct xio_msg *msg, int code);
332 void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr);
333 int flush_out_queues(uint32_t flags);
334 int discard_out_queues(uint32_t flags);
335 int adjust_clru(uint32_t flags);
338 typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
340 class XioLoopbackConnection : public Connection
343 std::atomic<uint64_t> seq = { 0 };
345 explicit XioLoopbackConnection(Messenger *m) : Connection(m->cct, m)
347 const entity_inst_t& m_inst = m->get_myinst();
348 peer_addr = m_inst.addr;
349 peer_type = m_inst.name.type();
350 set_features(XIO_ALL_FEATURES); /* XXXX set to ours */
353 XioLoopbackConnection* get() {
354 return static_cast<XioLoopbackConnection*>(RefCountedObject::get());
357 bool is_connected() override { return true; }
359 int send_message(Message *m) override;
360 void send_keepalive() override;
361 void mark_down() override {}
362 void mark_disposable() override {}
368 uint64_t next_seq() {
373 typedef boost::intrusive_ptr<XioLoopbackConnection> XioLoopbackConnectionRef;
375 #endif /* XIO_CONNECTION_H */