Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / simple / Pipe.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #ifndef CEPH_MSGR_PIPE_H
16 #define CEPH_MSGR_PIPE_H
17
18 #include "include/memory.h"
19 #include "auth/AuthSessionHandler.h"
20
21 #include "msg/msg_types.h"
22 #include "msg/Messenger.h"
23 #include "PipeConnection.h"
24
25
26 class SimpleMessenger;
27 class DispatchQueue;
28
29 static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
30
31   /**
32    * The Pipe is the most complex SimpleMessenger component. It gets
33    * two threads, one each for reading and writing on a socket it's handed
34    * at creation time, and is responsible for everything that happens on
35    * that socket. Besides message transmission, it's responsible for
36    * propagating socket errors to the SimpleMessenger and then sticking
37    * around in a state where it can provide enough data for the SimpleMessenger
38    * to provide reliable Message delivery when it manages to reconnect.
39    */
40   class Pipe : public RefCountedObject {
41     /**
42      * The Reader thread handles all reads off the socket -- not just
43      * Messages, but also acks and other protocol bits (excepting startup,
44      * when the Writer does a couple of reads).
45      * All the work is implemented in Pipe itself, of course.
46      */
47     class Reader : public Thread {
48       Pipe *pipe;
49     public:
50       explicit Reader(Pipe *p) : pipe(p) {}
51       void *entry() override { pipe->reader(); return 0; }
52     } reader_thread;
53
54     /**
55      * The Writer thread handles all writes to the socket (after startup).
56      * All the work is implemented in Pipe itself, of course.
57      */
58     class Writer : public Thread {
59       Pipe *pipe;
60     public:
61       explicit Writer(Pipe *p) : pipe(p) {}
62       void *entry() override { pipe->writer(); return 0; }
63     } writer_thread;
64
65     class DelayedDelivery;
66     DelayedDelivery *delay_thread;
67   public:
68     Pipe(SimpleMessenger *r, int st, PipeConnection *con);
69     ~Pipe() override;
70
71     SimpleMessenger *msgr;
72     uint64_t conn_id;
73     ostream& _pipe_prefix(std::ostream &out) const;
74
75     Pipe* get() {
76       return static_cast<Pipe*>(RefCountedObject::get());
77     }
78
79     bool is_connected() {
80       Mutex::Locker l(pipe_lock);
81       return state == STATE_OPEN;
82     }
83
84     char *recv_buf;
85     size_t recv_max_prefetch;
86     size_t recv_ofs;
87     size_t recv_len;
88
89     enum {
90       STATE_ACCEPTING,
91       STATE_CONNECTING,
92       STATE_OPEN,
93       STATE_STANDBY,
94       STATE_CLOSED,
95       STATE_CLOSING,
96       STATE_WAIT       // just wait for racing connection
97     };
98
99     static const char *get_state_name(int s) {
100       switch (s) {
101       case STATE_ACCEPTING: return "accepting";
102       case STATE_CONNECTING: return "connecting";
103       case STATE_OPEN: return "open";
104       case STATE_STANDBY: return "standby";
105       case STATE_CLOSED: return "closed";
106       case STATE_CLOSING: return "closing";
107       case STATE_WAIT: return "wait";
108       default: return "UNKNOWN";
109       }
110     }
111     const char *get_state_name() {
112       return get_state_name(state);
113     }
114
115   private:
116     int sd;
117     struct iovec msgvec[SM_IOV_MAX];
118
119   public:
120     int port;
121     int peer_type;
122     entity_addr_t peer_addr;
123     Messenger::Policy policy;
124     
125     Mutex pipe_lock;
126     int state;
127     std::atomic<bool> state_closed = { false }; // true iff state = STATE_CLOSED
128
129     // session_security handles any signatures or encryptions required for this pipe's msgs. PLR
130
131     ceph::shared_ptr<AuthSessionHandler> session_security;
132
133   protected:
134     friend class SimpleMessenger;
135     PipeConnectionRef connection_state;
136
137     utime_t backoff;         // backoff time
138
139     bool reader_running, reader_needs_join;
140     bool reader_dispatching; /// reader thread is dispatching without pipe_lock
141     bool notify_on_dispatch_done; /// something wants a signal when dispatch done
142     bool writer_running;
143
144     map<int, list<Message*> > out_q;  // priority queue for outbound msgs
145     DispatchQueue *in_q;
146     list<Message*> sent;
147     Cond cond;
148     bool send_keepalive;
149     bool send_keepalive_ack;
150     utime_t keepalive_ack_stamp;
151     bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
152     
153     __u32 connect_seq, peer_global_seq;
154     uint64_t out_seq;
155     uint64_t in_seq, in_seq_acked;
156     
157     void set_socket_options();
158
159     int accept();   // server handshake
160     int connect();  // client handshake
161     void reader();
162     void writer();
163     void unlock_maybe_reap();
164
165     int randomize_out_seq();
166
167     int read_message(Message **pm,
168                      AuthSessionHandler *session_security_copy);
169     int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body);
170     /**
171      * Write the given data (of length len) to the Pipe's socket. This function
172      * will loop until all passed data has been written out.
173      * If more is set, the function will optimize socket writes
174      * for additional data (by passing the MSG_MORE flag, aka TCP_CORK).
175      *
176      * @param msg The msghdr to write out
177      * @param len The length of the data in msg
178      * @param more Should be set true if this is one part of a larger message
179      * @return 0, or -1 on failure (unrecoverable -- close the socket).
180      */
181     int do_sendmsg(struct msghdr *msg, unsigned len, bool more=false);
182     int write_ack(uint64_t s);
183     int write_keepalive();
184     int write_keepalive2(char tag, const utime_t &t);
185
186     void fault(bool reader=false);
187
188     void was_session_reset();
189
190     /* Clean up sent list */
191     void handle_ack(uint64_t seq);
192
193     public:
194     Pipe(const Pipe& other);
195     const Pipe& operator=(const Pipe& other);
196
197     void start_reader();
198     void start_writer();
199     void maybe_start_delay_thread();
200     void join_reader();
201
202     // public constructors
203     static const Pipe& Server(int s);
204     static const Pipe& Client(const entity_addr_t& pi);
205
206     uint64_t get_out_seq() { return out_seq; }
207
208     bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; }
209
210     entity_addr_t& get_peer_addr() { return peer_addr; }
211
212     void set_peer_addr(const entity_addr_t& a) {
213       if (&peer_addr != &a)  // shut up valgrind
214         peer_addr = a;
215       connection_state->set_peer_addr(a);
216     }
217     void set_peer_type(int t) {
218       peer_type = t;
219       connection_state->set_peer_type(t);
220     }
221
222     void register_pipe();
223     void unregister_pipe();
224     void join();
225     /// stop a Pipe by closing its socket and setting it to STATE_CLOSED
226     void stop();
227     /// stop() a Pipe if not already done, and wait for it to finish any
228     /// fast_dispatch in progress.
229     void stop_and_wait();
230
231     void _send(Message *m) {
232       assert(pipe_lock.is_locked());
233       out_q[m->get_priority()].push_back(m);
234       cond.Signal();
235     }
236     void _send_keepalive() {
237       assert(pipe_lock.is_locked());
238       send_keepalive = true;
239       cond.Signal();
240     }
241     Message *_get_next_outgoing() {
242       assert(pipe_lock.is_locked());
243       Message *m = 0;
244       while (!m && !out_q.empty()) {
245         map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
246         if (!p->second.empty()) {
247           m = p->second.front();
248           p->second.pop_front();
249         }
250         if (p->second.empty())
251           out_q.erase(p->first);
252       }
253       return m;
254     }
255
256     /// move all messages in the sent list back into the queue at the highest priority.
257     void requeue_sent();
258     /// discard messages requeued by requeued_sent() up to a given seq
259     void discard_requeued_up_to(uint64_t seq);
260     void discard_out_queue();
261
262     void shutdown_socket() {
263       recv_reset();
264       if (sd >= 0)
265         ::shutdown(sd, SHUT_RDWR);
266     }
267
268     void recv_reset() {
269       recv_len = 0;
270       recv_ofs = 0;
271     }
272     ssize_t do_recv(char *buf, size_t len, int flags);
273     ssize_t buffered_recv(char *buf, size_t len, int flags);
274     bool has_pending_data() { return recv_len > recv_ofs; }
275
276     /**
277      * do a blocking read of len bytes from socket
278      *
279      * @param buf buffer to read into
280      * @param len exact number of bytes to read
281      * @return 0 for success, or -1 on error
282      */
283     int tcp_read(char *buf, unsigned len);
284
285     /**
286      * wait for bytes to become available on the socket
287      *
288      * @return 0 for success, or -1 on error
289      */
290     int tcp_read_wait();
291
292     /**
293      * non-blocking read of available bytes on socket
294      *
295      * This is expected to be used after tcp_read_wait(), and will return
296      * an error if there is no data on the socket to consume.
297      *
298      * @param buf buffer to read into
299      * @param len maximum number of bytes to read
300      * @return bytes read, or -1 on error or when there is no data
301      */
302     ssize_t tcp_read_nonblocking(char *buf, unsigned len);
303
304     /**
305      * blocking write of bytes to socket
306      *
307      * @param buf buffer
308      * @param len number of bytes to write
309      * @return 0 for success, or -1 on error
310      */
311     int tcp_write(const char *buf, unsigned len);
312
313   };
314
315
316 #endif