Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / async / AsyncConnection.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
7  *
8  * Author: Haomai Wang <haomaiwang@gmail.com>
9  *
10  * This is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License version 2.1, as published by the Free Software
13  * Foundation.  See file COPYING.
14  *
15  */
16
17 #ifndef CEPH_MSG_ASYNCCONNECTION_H
18 #define CEPH_MSG_ASYNCCONNECTION_H
19
20 #include <atomic>
21 #include <pthread.h>
22 #include <climits>
23 #include <list>
24 #include <mutex>
25 #include <map>
26 using namespace std;
27
28 #include "auth/AuthSessionHandler.h"
29 #include "common/ceph_time.h"
30 #include "common/perf_counters.h"
31 #include "include/buffer.h"
32 #include "msg/Connection.h"
33 #include "msg/Messenger.h"
34
35 #include "Event.h"
36 #include "Stack.h"
37
38 class AsyncMessenger;
39 class Worker;
40
41 static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
42
43 /*
44  * AsyncConnection maintains a logic session between two endpoints. In other
45  * word, a pair of addresses can find the only AsyncConnection. AsyncConnection
46  * will handle with network fault or read/write transactions. If one file
47  * descriptor broken, AsyncConnection will maintain the message queue and
48  * sequence, try to reconnect peer endpoint.
49  */
50 class AsyncConnection : public Connection {
51
52   ssize_t read_bulk(char *buf, unsigned len);
53   ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
54   ssize_t try_send(bufferlist &bl, bool more=false) {
55     std::lock_guard<std::mutex> l(write_lock);
56     outcoming_bl.claim_append(bl);
57     return _try_send(more);
58   }
59   ssize_t _try_send(bool more=false);
60   ssize_t _send(Message *m);
61   void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
62   ssize_t read_until(unsigned needed, char *p);
63   ssize_t _process_connection();
64   void _connect();
65   void _stop();
66   int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
67   ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
68   void was_session_reset();
69   void fault();
70   void discard_out_queue();
71   void discard_requeued_up_to(uint64_t seq);
72   void requeue_sent();
73   int randomize_out_seq();
74   void handle_ack(uint64_t seq);
75   void _append_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
76   ssize_t write_message(Message *m, bufferlist& bl, bool more);
77   void inject_delay();
78   ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
79                     bufferlist &authorizer_reply) {
80     bufferlist reply_bl;
81     reply.tag = tag;
82     reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
83     reply.authorizer_len = authorizer_reply.length();
84     reply_bl.append((char*)&reply, sizeof(reply));
85     if (reply.authorizer_len) {
86       reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
87     }
88     ssize_t r = try_send(reply_bl);
89     if (r < 0) {
90       inject_delay();
91       return -1;
92     }
93
94     state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
95     return 0;
96   }
97   bool is_queued() const {
98     return !out_q.empty() || outcoming_bl.length();
99   }
100   void shutdown_socket() {
101     for (auto &&t : register_time_events)
102       center->delete_time_event(t);
103     register_time_events.clear();
104     if (last_tick_id) {
105       center->delete_time_event(last_tick_id);
106       last_tick_id = 0;
107     }
108     if (cs) {
109       center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
110       cs.shutdown();
111       cs.close();
112     }
113   }
114   Message *_get_next_outgoing(bufferlist *bl) {
115     Message *m = 0;
116     while (!m && !out_q.empty()) {
117       map<int, list<pair<bufferlist, Message*> > >::reverse_iterator it = out_q.rbegin();
118       if (!it->second.empty()) {
119         list<pair<bufferlist, Message*> >::iterator p = it->second.begin();
120         m = p->second;
121         if (bl)
122           bl->swap(p->first);
123         it->second.erase(p);
124       }
125       if (it->second.empty())
126         out_q.erase(it->first);
127     }
128     return m;
129   }
130   bool _has_next_outgoing() const {
131     return !out_q.empty();
132   }
133   void reset_recv_state();
134
135    /**
136    * The DelayedDelivery is for injecting delays into Message delivery off
137    * the socket. It is only enabled if delays are requested, and if they
138    * are then it pulls Messages off the DelayQueue and puts them into the
139    * AsyncMessenger event queue.
140    */
141   class DelayedDelivery : public EventCallback {
142     std::set<uint64_t> register_time_events; // need to delete it if stop
143     std::deque<std::pair<utime_t, Message*> > delay_queue;
144     std::mutex delay_lock;
145     AsyncMessenger *msgr;
146     EventCenter *center;
147     DispatchQueue *dispatch_queue;
148     uint64_t conn_id;
149     std::atomic_bool stop_dispatch;
150
151    public:
152     explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
153                              DispatchQueue *q, uint64_t cid)
154       : msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
155         stop_dispatch(false) { }
156     ~DelayedDelivery() override {
157       assert(register_time_events.empty());
158       assert(delay_queue.empty());
159     }
160     void set_center(EventCenter *c) { center = c; }
161     void do_request(int id) override;
162     void queue(double delay_period, utime_t release, Message *m) {
163       std::lock_guard<std::mutex> l(delay_lock);
164       delay_queue.push_back(std::make_pair(release, m));
165       register_time_events.insert(center->create_time_event(delay_period*1000000, this));
166     }
167     void discard() {
168       stop_dispatch = true;
169       center->submit_to(center->get_id(), [this] () mutable {
170         std::lock_guard<std::mutex> l(delay_lock);
171         while (!delay_queue.empty()) {
172           Message *m = delay_queue.front().second;
173           dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
174           m->put();
175           delay_queue.pop_front();
176         }
177         for (auto i : register_time_events)
178           center->delete_time_event(i);
179         register_time_events.clear();
180         stop_dispatch = false;
181       }, true);
182     }
183     bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
184     void flush();
185   } *delay_state;
186
187  public:
188   AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w);
189   ~AsyncConnection() override;
190   void maybe_start_delay_thread();
191
192   ostream& _conn_prefix(std::ostream *_dout);
193
194   bool is_connected() override {
195     return can_write.load() == WriteStatus::CANWRITE;
196   }
197
198   // Only call when AsyncConnection first construct
199   void connect(const entity_addr_t& addr, int type) {
200     set_peer_type(type);
201     set_peer_addr(addr);
202     policy = msgr->get_policy(type);
203     _connect();
204   }
205   // Only call when AsyncConnection first construct
206   void accept(ConnectedSocket socket, entity_addr_t &addr);
207   int send_message(Message *m) override;
208
209   void send_keepalive() override;
210   void mark_down() override;
211   void mark_disposable() override {
212     std::lock_guard<std::mutex> l(lock);
213     policy.lossy = true;
214   }
215   
216  private:
217   enum {
218     STATE_NONE,
219     STATE_OPEN,
220     STATE_OPEN_KEEPALIVE2,
221     STATE_OPEN_KEEPALIVE2_ACK,
222     STATE_OPEN_TAG_ACK,
223     STATE_OPEN_MESSAGE_HEADER,
224     STATE_OPEN_MESSAGE_THROTTLE_MESSAGE,
225     STATE_OPEN_MESSAGE_THROTTLE_BYTES,
226     STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE,
227     STATE_OPEN_MESSAGE_READ_FRONT,
228     STATE_OPEN_MESSAGE_READ_MIDDLE,
229     STATE_OPEN_MESSAGE_READ_DATA_PREPARE,
230     STATE_OPEN_MESSAGE_READ_DATA,
231     STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH,
232     STATE_OPEN_TAG_CLOSE,
233     STATE_WAIT_SEND,
234     STATE_CONNECTING,
235     STATE_CONNECTING_RE,
236     STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY,
237     STATE_CONNECTING_SEND_CONNECT_MSG,
238     STATE_CONNECTING_WAIT_CONNECT_REPLY,
239     STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH,
240     STATE_CONNECTING_WAIT_ACK_SEQ,
241     STATE_CONNECTING_READY,
242     STATE_ACCEPTING,
243     STATE_ACCEPTING_WAIT_BANNER_ADDR,
244     STATE_ACCEPTING_WAIT_CONNECT_MSG,
245     STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH,
246     STATE_ACCEPTING_WAIT_SEQ,
247     STATE_ACCEPTING_READY,
248     STATE_STANDBY,
249     STATE_CLOSED,
250     STATE_WAIT,       // just wait for racing connection
251   };
252
253   static const int TCP_PREFETCH_MIN_SIZE;
254   static const char *get_state_name(int state) {
255       const char* const statenames[] = {"STATE_NONE",
256                                         "STATE_OPEN",
257                                         "STATE_OPEN_KEEPALIVE2",
258                                         "STATE_OPEN_KEEPALIVE2_ACK",
259                                         "STATE_OPEN_TAG_ACK",
260                                         "STATE_OPEN_MESSAGE_HEADER",
261                                         "STATE_OPEN_MESSAGE_THROTTLE_MESSAGE",
262                                         "STATE_OPEN_MESSAGE_THROTTLE_BYTES",
263                                         "STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE",
264                                         "STATE_OPEN_MESSAGE_READ_FRONT",
265                                         "STATE_OPEN_MESSAGE_READ_MIDDLE",
266                                         "STATE_OPEN_MESSAGE_READ_DATA_PREPARE",
267                                         "STATE_OPEN_MESSAGE_READ_DATA",
268                                         "STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH",
269                                         "STATE_OPEN_TAG_CLOSE",
270                                         "STATE_WAIT_SEND",
271                                         "STATE_CONNECTING",
272                                         "STATE_CONNECTING_RE",
273                                         "STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY",
274                                         "STATE_CONNECTING_SEND_CONNECT_MSG",
275                                         "STATE_CONNECTING_WAIT_CONNECT_REPLY",
276                                         "STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH",
277                                         "STATE_CONNECTING_WAIT_ACK_SEQ",
278                                         "STATE_CONNECTING_READY",
279                                         "STATE_ACCEPTING",
280                                         "STATE_ACCEPTING_WAIT_BANNER_ADDR",
281                                         "STATE_ACCEPTING_WAIT_CONNECT_MSG",
282                                         "STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH",
283                                         "STATE_ACCEPTING_WAIT_SEQ",
284                                         "STATE_ACCEPTING_READY",
285                                         "STATE_STANDBY",
286                                         "STATE_CLOSED",
287                                         "STATE_WAIT"};
288       return statenames[state];
289   }
290
291   AsyncMessenger *async_msgr;
292   uint64_t conn_id;
293   PerfCounters *logger;
294   int global_seq;
295   __u32 connect_seq, peer_global_seq;
296   std::atomic<uint64_t> out_seq{0};
297   std::atomic<uint64_t> ack_left{0}, in_seq{0};
298   int state;
299   int state_after_send;
300   ConnectedSocket cs;
301   int port;
302   Messenger::Policy policy;
303
304   DispatchQueue *dispatch_queue;
305
306   // lockfree, only used in own thread
307   bufferlist outcoming_bl;
308   bool open_write = false;
309
310   std::mutex write_lock;
311   enum class WriteStatus {
312     NOWRITE,
313     REPLACING,
314     CANWRITE,
315     CLOSED
316   };
317   std::atomic<WriteStatus> can_write;
318   list<Message*> sent; // the first bufferlist need to inject seq
319   map<int, list<pair<bufferlist, Message*> > > out_q;  // priority queue for outbound msgs
320   bool keepalive;
321
322   std::mutex lock;
323   utime_t backoff;         // backoff time
324   EventCallbackRef read_handler;
325   EventCallbackRef write_handler;
326   EventCallbackRef wakeup_handler;
327   EventCallbackRef tick_handler;
328   struct iovec msgvec[ASYNC_IOV_MAX];
329   char *recv_buf;
330   uint32_t recv_max_prefetch;
331   uint32_t recv_start;
332   uint32_t recv_end;
333   set<uint64_t> register_time_events; // need to delete it if stop
334   ceph::coarse_mono_clock::time_point last_active;
335   uint64_t last_tick_id = 0;
336   const uint64_t inactive_timeout_us;
337
338   // Tis section are temp variables used by state transition
339
340   // Open state
341   utime_t recv_stamp;
342   utime_t throttle_stamp;
343   unsigned msg_left;
344   uint64_t cur_msg_size;
345   ceph_msg_header current_header;
346   bufferlist data_buf;
347   bufferlist::iterator data_blp;
348   bufferlist front, middle, data;
349   ceph_msg_connect connect_msg;
350   // Connecting state
351   bool got_bad_auth;
352   AuthAuthorizer *authorizer;
353   bufferlist authorizer_buf;
354   ceph_msg_connect_reply connect_reply;
355   // Accepting state
356   entity_addr_t socket_addr;
357   CryptoKey session_key;
358   bool replacing;    // when replacing process happened, we will reply connect
359                      // side with RETRY tag and accept side will clear replaced
360                      // connection. So when connect side reissue connect_msg,
361                      // there won't exists conflicting connection so we use
362                      // "replacing" to skip RESETSESSION to avoid detect wrong
363                      // presentation
364   bool is_reset_from_peer;
365   bool once_ready;
366
367   // used only for local state, it will be overwrite when state transition
368   char *state_buffer;
369   // used only by "read_until"
370   uint64_t state_offset;
371   Worker *worker;
372   EventCenter *center;
373   ceph::shared_ptr<AuthSessionHandler> session_security;
374
375  public:
376   // used by eventcallback
377   void handle_write();
378   void process();
379   void wakeup_from(uint64_t id);
380   void tick(uint64_t id);
381   void local_deliver();
382   void stop(bool queue_reset) {
383     lock.lock();
384     bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
385     _stop();
386     lock.unlock();
387     if (need_queue_reset)
388       dispatch_queue->queue_reset(this);
389   }
390   void cleanup() {
391     shutdown_socket();
392     delete read_handler;
393     delete write_handler;
394     delete wakeup_handler;
395     delete tick_handler;
396     if (delay_state) {
397       delete delay_state;
398       delay_state = NULL;
399     }
400   }
401   PerfCounters *get_perf_counter() {
402     return logger;
403   }
404 }; /* AsyncConnection */
405
406 typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
407
408 #endif