Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / Connection.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #ifndef CEPH_CONNECTION_H
16 #define CEPH_CONNECTION_H
17
18 #include <stdlib.h>
19 #include <ostream>
20
21 #include <boost/intrusive_ptr.hpp>
22 // Because intusive_ptr clobbers our assert...
23 #include "include/assert.h"
24
25 #include "include/types.h"
26 #include "include/buffer.h"
27
28 #include "common/RefCountedObj.h"
29
30 #include "common/debug.h"
31 #include "common/config.h"
32
33
34 // ======================================================
35
36 // abstract Connection, for keeping per-connection state
37
38 class Message;
39 class Messenger;
40
41 struct Connection : public RefCountedObject {
42   mutable Mutex lock;
43   Messenger *msgr;
44   RefCountedObject *priv;
45   int peer_type;
46   entity_addr_t peer_addr;
47   utime_t last_keepalive, last_keepalive_ack;
48 private:
49   uint64_t features;
50 public:
51   bool failed; // true if we are a lossy connection that has failed.
52
53   int rx_buffers_version;
54   map<ceph_tid_t,pair<bufferlist,int> > rx_buffers;
55
56   friend class boost::intrusive_ptr<Connection>;
57   friend class PipeConnection;
58
59 public:
60   Connection(CephContext *cct, Messenger *m)
61     // we are managed exlusively by ConnectionRef; make it so you can
62     //   ConnectionRef foo = new Connection;
63     : RefCountedObject(cct, 0),
64       lock("Connection::lock"),
65       msgr(m),
66       priv(NULL),
67       peer_type(-1),
68       features(0),
69       failed(false),
70       rx_buffers_version(0) {
71   }
72
73   ~Connection() override {
74     //generic_dout(0) << "~Connection " << this << dendl;
75     if (priv) {
76       //generic_dout(0) << "~Connection " << this << " dropping priv " << priv << dendl;
77       priv->put();
78     }
79   }
80
81   void set_priv(RefCountedObject *o) {
82     Mutex::Locker l(lock);
83     if (priv)
84       priv->put();
85     priv = o;
86   }
87
88   RefCountedObject *get_priv() {
89     Mutex::Locker l(lock);
90     if (priv)
91       return priv->get();
92     return NULL;
93   }
94
95   /**
96    * Used to judge whether this connection is ready to send. Usually, the
97    * implementation need to build a own shakehand or sesson then it can be
98    * ready to send.
99    *
100    * @return true if ready to send, or false otherwise
101    */
102   virtual bool is_connected() = 0;
103
104   Messenger *get_messenger() {
105     return msgr;
106   }
107
108   /**
109    * Queue the given Message to send out on the given Connection.
110    * Success in this function does not guarantee Message delivery, only
111    * success in queueing the Message. Other guarantees may be provided based
112    * on the Connection policy.
113    *
114    * @param m The Message to send. The Messenger consumes a single reference
115    * when you pass it in.
116    *
117    * @return 0 on success, or -errno on failure.
118    */
119   virtual int send_message(Message *m) = 0;
120   /**
121    * Send a "keepalive" ping along the given Connection, if it's working.
122    * If the underlying connection has broken, this function does nothing.
123    *
124    * @return 0, or implementation-defined error numbers.
125    */
126   virtual void send_keepalive() = 0;
127   /**
128    * Mark down the given Connection.
129    *
130    * This will cause us to discard its outgoing queue, and if reset
131    * detection is enabled in the policy and the endpoint tries to
132    * reconnect they will discard their queue when we inform them of
133    * the session reset.
134    *
135    * It does not generate any notifications to the Dispatcher.
136    */
137   virtual void mark_down() = 0;
138
139   /**
140    * Mark a Connection as "disposable", setting it to lossy
141    * (regardless of initial Policy).  This does not immediately close
142    * the Connection once Messages have been delivered, so as long as
143    * there are no errors you can continue to receive responses; but it
144    * will not attempt to reconnect for message delivery or preserve
145    * your old delivery semantics, either.
146    *
147    * TODO: There's some odd stuff going on in our SimpleMessenger
148    * implementation during connect that looks unused; is there
149    * more of a contract that that's enforcing?
150    */
151   virtual void mark_disposable() = 0;
152
153
154   int get_peer_type() const { return peer_type; }
155   void set_peer_type(int t) { peer_type = t; }
156
157   bool peer_is_mon() const { return peer_type == CEPH_ENTITY_TYPE_MON; }
158   bool peer_is_mgr() const { return peer_type == CEPH_ENTITY_TYPE_MGR; }
159   bool peer_is_mds() const { return peer_type == CEPH_ENTITY_TYPE_MDS; }
160   bool peer_is_osd() const { return peer_type == CEPH_ENTITY_TYPE_OSD; }
161   bool peer_is_client() const { return peer_type == CEPH_ENTITY_TYPE_CLIENT; }
162
163   const entity_addr_t& get_peer_addr() const { return peer_addr; }
164   void set_peer_addr(const entity_addr_t& a) { peer_addr = a; }
165
166   uint64_t get_features() const { return features; }
167   bool has_feature(uint64_t f) const { return features & f; }
168   bool has_features(uint64_t f) const {
169     return (features & f) == f;
170   }
171   void set_features(uint64_t f) { features = f; }
172   void set_feature(uint64_t f) { features |= f; }
173
174   void post_rx_buffer(ceph_tid_t tid, bufferlist& bl) {
175     Mutex::Locker l(lock);
176     ++rx_buffers_version;
177     rx_buffers[tid] = pair<bufferlist,int>(bl, rx_buffers_version);
178   }
179
180   void revoke_rx_buffer(ceph_tid_t tid) {
181     Mutex::Locker l(lock);
182     rx_buffers.erase(tid);
183   }
184
185   utime_t get_last_keepalive() const {
186     Mutex::Locker l(lock);
187     return last_keepalive;
188   }
189   void set_last_keepalive(utime_t t) {
190     Mutex::Locker l(lock);
191     last_keepalive = t;
192   }
193   utime_t get_last_keepalive_ack() const {
194     Mutex::Locker l(lock);
195     return last_keepalive_ack;
196   }
197   void set_last_keepalive_ack(utime_t t) {
198     Mutex::Locker l(lock);
199     last_keepalive_ack = t;
200   }
201
202 };
203
204 typedef boost::intrusive_ptr<Connection> ConnectionRef;
205
206
207 #endif /* CEPH_CONNECTION_H */