Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / xio / XioConnection.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  * Portions Copyright (C) 2013 CohortFS, LLC
8  *
9  * This is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Lesser General Public
11  * License version 2.1, as published by the Free Software
12  * Foundation.  See file COPYING.
13  *
14  */
15
16 #ifndef XIO_CONNECTION_H
17 #define XIO_CONNECTION_H
18
19 #include <atomic>
20
21 #include <boost/intrusive/avl_set.hpp>
22 #include <boost/intrusive/list.hpp>
23
24 extern "C" {
25 #include "libxio.h"
26 }
27
28 #include "XioInSeq.h"
29 #include "XioSubmit.h"
30 #include "msg/Connection.h"
31 #include "msg/Messenger.h"
32 #include "auth/AuthSessionHandler.h"
33
34 #define XIO_ALL_FEATURES (CEPH_FEATURES_ALL)
35
36
37 #define XIO_NOP_TAG_MARKDOWN 0x0001
38
39 namespace bi = boost::intrusive;
40
41 class XioPortal;
42 class XioMessenger;
43 class XioSend;
44
45 class XioConnection : public Connection
46 {
47 public:
48   enum type { ACTIVE, PASSIVE };
49
50   enum class session_states : unsigned {
51     INIT = 0,
52     START,
53     UP,
54     FLOW_CONTROLLED,
55     DISCONNECTED,
56     DELETED,
57     BARRIER
58   };
59
60   enum class session_startup_states : unsigned {
61     IDLE = 0,
62     CONNECTING,
63     ACCEPTING,
64     READY,
65     FAIL
66   };
67
68 private:
69   XioConnection::type xio_conn_type;
70   XioPortal *portal;
71   std::atomic<bool> connected = { false };
72   entity_inst_t peer;
73   struct xio_session *session;
74   struct xio_connection *conn;
75   pthread_spinlock_t sp;
76   std::atomic<int64_t> send = { 0 };
77   std::atomic<int64_t> recv = { 0 };
78   uint32_t n_reqs; // Accelio-initiated reqs in progress (!counting partials)
79   uint32_t magic;
80   uint32_t special_handling;
81   uint64_t scount;
82   uint32_t send_ctr;
83   int q_high_mark;
84   int q_low_mark;
85
86   struct lifecycle {
87     // different from Pipe states?
88     enum lf_state {
89       INIT,
90       LOCAL_DISCON,
91       REMOTE_DISCON,
92       RECONNECTING,
93       UP,
94       DEAD } state;
95
96     /* XXX */
97     uint32_t reconnects;
98     uint32_t connect_seq, peer_global_seq;
99     uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
100     std::atomic<int64_t> out_seq = { 0 }; 
101
102     lifecycle() : state(lifecycle::INIT), reconnects(0), connect_seq(0),
103                   peer_global_seq(0), in_seq(0), out_seq_acked(0)
104                   {}
105
106     void set_in_seq(uint64_t seq) {
107       in_seq = seq;
108     }
109
110     uint64_t next_out_seq() {
111       return ++out_seq;
112     }
113
114   } state;
115
116   /* batching */
117   XioInSeq in_seq;
118
119   class CState
120   {
121   public:
122     static const int FLAG_NONE = 0x0000;
123     static const int FLAG_BAD_AUTH = 0x0001;
124     static const int FLAG_MAPPED = 0x0002;
125     static const int FLAG_RESET = 0x0004;
126
127     static const int OP_FLAG_NONE = 0x0000;
128     static const int OP_FLAG_LOCKED = 0x0001;
129     static const int OP_FLAG_LRU = 0x0002;
130
131     uint64_t features;
132     Messenger::Policy policy;
133
134     CryptoKey session_key;
135     ceph::shared_ptr<AuthSessionHandler> session_security;
136     AuthAuthorizer *authorizer;
137     XioConnection *xcon;
138     uint32_t protocol_version;
139
140     std::atomic<session_states> session_state = { 0 };
141     std::atomic<session_startup_state> startup_state = { 0 };
142
143     uint32_t reconnects;
144     uint32_t connect_seq, global_seq, peer_global_seq;
145     uint64_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
146     std::atomic<uint64_t> out_seq = { 0 }; 
147
148     uint32_t flags;
149
150     explicit CState(XioConnection* _xcon)
151       : features(0),
152         authorizer(NULL),
153         xcon(_xcon),
154         protocol_version(0),
155         session_state(INIT),
156         startup_state(IDLE),
157         reconnects(0),
158         connect_seq(0),
159         global_seq(0),
160         peer_global_seq(0),
161         in_seq(0),
162         out_seq_acked(0),
163         flags(FLAG_NONE) {}
164
165     uint64_t get_session_state() {
166       return session_state;
167     }
168
169     uint64_t get_startup_state() {
170       return startup_state;
171     }
172
173     void set_in_seq(uint64_t seq) {
174       in_seq = seq;
175     }
176
177     uint64_t next_out_seq() {
178       return ++out_seq;
179     };
180
181     // state machine
182     int init_state();
183     int next_state(Message* m);
184 #if 0 // future (session startup)
185     int msg_connect(MConnect *m);
186     int msg_connect_reply(MConnectReply *m);
187     int msg_connect_reply(MConnectAuthReply *m);
188     int msg_connect_auth(MConnectAuth *m);
189     int msg_connect_auth_reply(MConnectAuthReply *m);
190 #endif
191     int state_up_ready(uint32_t flags);
192     int state_flow_controlled(uint32_t flags);
193     int state_discon();
194     int state_fail(Message* m, uint32_t flags);
195
196   } cstate; /* CState */
197
198   // message submission queue
199   struct SendQ {
200     bool keepalive;
201     bool ack;
202     utime_t ack_time;
203     Message::Queue mqueue; // deferred
204     XioSubmit::Queue requeue;
205
206     SendQ():keepalive(false), ack(false){}
207   } outgoing;
208
209   // conns_entity_map comparison functor
210   struct EntityComp
211   {
212     // for internal ordering
213     bool operator()(const XioConnection &lhs,  const XioConnection &rhs) const
214       {  return lhs.get_peer() < rhs.get_peer(); }
215
216     // for external search by entity_inst_t(peer)
217     bool operator()(const entity_inst_t &peer, const XioConnection &c) const
218       {  return peer < c.get_peer(); }
219
220     bool operator()(const XioConnection &c, const entity_inst_t &peer) const
221       {  return c.get_peer() < peer;  }
222   };
223
224   bi::list_member_hook<> conns_hook;
225   bi::avl_set_member_hook<> conns_entity_map_hook;
226
227   typedef bi::list< XioConnection,
228                     bi::member_hook<XioConnection, bi::list_member_hook<>,
229                                     &XioConnection::conns_hook > > ConnList;
230
231   typedef bi::member_hook<XioConnection, bi::avl_set_member_hook<>,
232                           &XioConnection::conns_entity_map_hook> EntityHook;
233
234   typedef bi::avl_set< XioConnection, EntityHook,
235                        bi::compare<EntityComp> > EntitySet;
236
237   friend class XioPortal;
238   friend class XioMessenger;
239   friend class XioDispatchHook;
240   friend class XioMarkDownHook;
241   friend class XioSend;
242
243   int on_disconnect_event() {
244     connected = false;
245     pthread_spin_lock(&sp);
246     discard_out_queues(CState::OP_FLAG_LOCKED);
247     pthread_spin_unlock(&sp);
248     return 0;
249   }
250
251   int on_teardown_event() {
252     pthread_spin_lock(&sp);
253     if (conn)
254       xio_connection_destroy(conn);
255     conn = NULL;
256     pthread_spin_unlock(&sp);
257     this->put();
258     return 0;
259   }
260
261   int xio_qdepth_high_mark() {
262     return q_high_mark;
263   }
264
265   int xio_qdepth_low_mark() {
266     return q_low_mark;
267   }
268
269 public:
270   XioConnection(XioMessenger *m, XioConnection::type _type,
271                 const entity_inst_t& peer);
272
273   ~XioConnection() {
274     if (conn)
275       xio_connection_destroy(conn);
276   }
277   ostream& conn_prefix(std::ostream *_dout);
278
279   bool is_connected() override { return connected; }
280
281   int send_message(Message *m) override;
282   void send_keepalive() override {send_keepalive_or_ack();}
283   void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr);
284   void mark_down() override;
285   int _mark_down(uint32_t flags);
286   void mark_disposable() override;
287   int _mark_disposable(uint32_t flags);
288
289   const entity_inst_t& get_peer() const { return peer; }
290
291   XioConnection* get() {
292 #if 0
293     cout << "XioConnection::get " << this << " " << nref.load() << std::endl;
294 #endif
295     RefCountedObject::get();
296     return this;
297   }
298
299   void put() {
300     RefCountedObject::put();
301 #if 0
302     cout << "XioConnection::put " << this << " " << nref.load() << std::endl;
303 #endif
304   }
305
306   void disconnect() {
307     if (is_connected()) {
308       connected = false;
309       xio_disconnect(conn); // normal teardown will clean up conn
310     }
311   }
312
313   uint32_t get_magic() { return magic; }
314   void set_magic(int _magic) { magic = _magic; }
315   uint32_t get_special_handling() { return special_handling; }
316   void set_special_handling(int n) { special_handling = n; }
317   uint64_t get_scount() { return scount; }
318
319   int passive_setup(); /* XXX */
320
321   int handle_data_msg(struct xio_session *session, struct xio_msg *msg,
322                  int more_in_batch, void *cb_user_context);
323   int on_msg(struct xio_session *session, struct xio_msg *msg,
324                  int more_in_batch, void *cb_user_context);
325   int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
326                               void *conn_user_context);
327   int on_msg_error(struct xio_session *session, enum xio_status error,
328                    struct xio_msg  *msg, void *conn_user_context);
329   void msg_send_fail(XioSend *xsend, int code);
330   void msg_release_fail(struct xio_msg *msg, int code);
331 private:
332   void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr);
333   int flush_out_queues(uint32_t flags);
334   int discard_out_queues(uint32_t flags);
335   int adjust_clru(uint32_t flags);
336 };
337
338 typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
339
340 class XioLoopbackConnection : public Connection
341 {
342 private:
343   std::atomic<uint64_t> seq = { 0 };
344 public:
345   explicit XioLoopbackConnection(Messenger *m) : Connection(m->cct, m)
346     {
347       const entity_inst_t& m_inst = m->get_myinst();
348       peer_addr = m_inst.addr;
349       peer_type = m_inst.name.type();
350       set_features(XIO_ALL_FEATURES); /* XXXX set to ours */
351     }
352
353   XioLoopbackConnection* get() {
354     return static_cast<XioLoopbackConnection*>(RefCountedObject::get());
355   }
356
357   bool is_connected() override { return true; }
358
359   int send_message(Message *m) override;
360   void send_keepalive() override;
361   void mark_down() override {}
362   void mark_disposable() override {}
363
364   uint64_t get_seq() {
365     return seq;
366   }
367
368   uint64_t next_seq() {
369     return ++seq;
370   }
371 };
372
373 typedef boost::intrusive_ptr<XioLoopbackConnection> XioLoopbackConnectionRef;
374
375 #endif /* XIO_CONNECTION_H */