Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / test / direct_messenger / DirectMessenger.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include "DirectMessenger.h"
16 #include "msg/DispatchStrategy.h"
17
18
19 class DirectConnection : public Connection {
20   /// sent messages are dispatched here
21   DispatchStrategy *const dispatchers;
22
23   /// the connection that will be attached to outgoing messages, so that replies
24   /// can be dispatched back to the sender. the pointer is atomic for
25   /// thread-safety between mark_down() and send_message(). no reference is held
26   /// on this Connection to avoid cyclical refs. we don't need a reference
27   /// because its owning DirectMessenger will mark both connections down (and
28   /// clear this pointer) before dropping its own reference
29   std::atomic<Connection*> reply_connection{nullptr};
30
31  public:
32   DirectConnection(CephContext *cct, DirectMessenger *m,
33                    DispatchStrategy *dispatchers)
34     : Connection(cct, m),
35       dispatchers(dispatchers)
36   {}
37
38   /// sets the Connection that will receive replies to outgoing messages
39   void set_direct_reply_connection(ConnectionRef conn);
40
41   /// return true if a peer connection exists
42   bool is_connected() override;
43
44   /// pass the given message directly to our dispatchers
45   int send_message(Message *m) override;
46
47   /// release our pointer to the peer connection. later calls to is_connected()
48   /// will return false, and send_message() will fail with -ENOTCONN
49   void mark_down() override;
50
51   /// noop - keepalive messages are not needed within a process
52   void send_keepalive() override {}
53
54   /// noop - reconnect/recovery semantics are not needed within a process
55   void mark_disposable() override {}
56 };
57
58 void DirectConnection::set_direct_reply_connection(ConnectionRef conn)
59 {
60   reply_connection.store(conn.get());
61 }
62
63 bool DirectConnection::is_connected()
64 {
65   // true between calls to set_direct_reply_connection() and mark_down()
66   return reply_connection.load() != nullptr;
67 }
68
69 int DirectConnection::send_message(Message *m)
70 {
71   // read reply_connection atomically and take a reference
72   ConnectionRef conn = reply_connection.load();
73   if (!conn) {
74     m->put();
75     return -ENOTCONN;
76   }
77   // attach reply_connection to the Message, so that calls to
78   // m->get_connection()->send_message() can be dispatched back to the sender
79   m->set_connection(conn);
80
81   dispatchers->ds_dispatch(m);
82   return 0;
83 }
84
85 void DirectConnection::mark_down()
86 {
87   Connection *conn = reply_connection.load();
88   if (!conn) {
89     return; // already marked down
90   }
91   if (!reply_connection.compare_exchange_weak(conn, nullptr)) {
92     return; // lost the race to mark down
93   }
94   // called only once to avoid loops
95   conn->mark_down();
96 }
97
98
99 static ConnectionRef create_loopback(DirectMessenger *m,
100                                      entity_name_t name,
101                                      DispatchStrategy *dispatchers)
102 {
103   auto loopback = boost::intrusive_ptr<DirectConnection>(
104       new DirectConnection(m->cct, m, dispatchers));
105   // loopback replies go to itself
106   loopback->set_direct_reply_connection(loopback);
107   loopback->set_peer_type(name.type());
108   loopback->set_features(CEPH_FEATURES_ALL);
109   return loopback;
110 }
111
112 DirectMessenger::DirectMessenger(CephContext *cct, entity_name_t name,
113                                  string mname, uint64_t nonce,
114                                  DispatchStrategy *dispatchers)
115   : SimplePolicyMessenger(cct, name, mname, nonce),
116     dispatchers(dispatchers),
117     loopback_connection(create_loopback(this, name, dispatchers))
118 {
119   dispatchers->set_messenger(this);
120 }
121
122 DirectMessenger::~DirectMessenger()
123 {
124 }
125
126 int DirectMessenger::set_direct_peer(DirectMessenger *peer)
127 {
128   if (get_myinst() == peer->get_myinst()) {
129     return -EADDRINUSE; // must have a different entity instance
130   }
131   peer_inst = peer->get_myinst();
132
133   // allocate a Connection that dispatches to the peer messenger
134   auto direct_connection = boost::intrusive_ptr<DirectConnection>(
135       new DirectConnection(cct, peer, peer->dispatchers.get()));
136
137   direct_connection->set_peer_addr(peer_inst.addr);
138   direct_connection->set_peer_type(peer_inst.name.type());
139   direct_connection->set_features(CEPH_FEATURES_ALL);
140
141   // if set_direct_peer() was already called on the peer messenger, we can
142   // finish by attaching their connections. if not, the later call to
143   // peer->set_direct_peer() will attach their connection to ours
144   auto connection = peer->get_connection(get_myinst());
145   if (connection) {
146     auto p = static_cast<DirectConnection*>(connection.get());
147
148     p->set_direct_reply_connection(direct_connection);
149     direct_connection->set_direct_reply_connection(p);
150   }
151
152   peer_connection = std::move(direct_connection);
153   return 0;
154 }
155
156 int DirectMessenger::bind(const entity_addr_t &bind_addr)
157 {
158   if (peer_connection) {
159     return -EINVAL; // can't change address after sharing it with the peer
160   }
161   set_myaddr(bind_addr);
162   loopback_connection->set_peer_addr(bind_addr);
163   return 0;
164 }
165
166 int DirectMessenger::client_bind(const entity_addr_t &bind_addr)
167 {
168   // same as bind
169   return bind(bind_addr);
170 }
171
172 int DirectMessenger::start()
173 {
174   if (!peer_connection) {
175     return -EINVAL; // did not connect to a peer
176   }
177   if (started) {
178     return -EINVAL; // already started
179   }
180
181   dispatchers->start();
182   return SimplePolicyMessenger::start();
183 }
184
185 int DirectMessenger::shutdown()
186 {
187   if (!started) {
188     return -EINVAL; // not started
189   }
190
191   mark_down_all();
192   peer_connection.reset();
193   loopback_connection.reset();
194
195   dispatchers->shutdown();
196   SimplePolicyMessenger::shutdown();
197   sem.Put(); // signal wait()
198   return 0;
199 }
200
201 void DirectMessenger::wait()
202 {
203   sem.Get(); // wait on signal from shutdown()
204   dispatchers->wait();
205 }
206
207 ConnectionRef DirectMessenger::get_connection(const entity_inst_t& dst)
208 {
209   if (dst == peer_inst) {
210     return peer_connection;
211   }
212   if (dst == get_myinst()) {
213     return loopback_connection;
214   }
215   return nullptr;
216 }
217
218 ConnectionRef DirectMessenger::get_loopback_connection()
219 {
220   return loopback_connection;
221 }
222
223 int DirectMessenger::send_message(Message *m, const entity_inst_t& dst)
224 {
225   auto conn = get_connection(dst);
226   if (!conn) {
227     m->put();
228     return -ENOTCONN;
229   }
230   return conn->send_message(m);
231 }
232
233 void DirectMessenger::mark_down(const entity_addr_t& addr)
234 {
235   ConnectionRef conn;
236   if (addr == peer_inst.addr) {
237     conn = peer_connection;
238   } else if (addr == get_myaddr()) {
239     conn = loopback_connection;
240   }
241   if (conn) {
242     conn->mark_down();
243   }
244 }
245
246 void DirectMessenger::mark_down_all()
247 {
248   if (peer_connection) {
249     peer_connection->mark_down();
250   }
251   loopback_connection->mark_down();
252 }