1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_MSGR_PIPE_H
16 #define CEPH_MSGR_PIPE_H
18 #include "include/memory.h"
19 #include "auth/AuthSessionHandler.h"
21 #include "msg/msg_types.h"
22 #include "msg/Messenger.h"
23 #include "PipeConnection.h"
26 class SimpleMessenger;
29 static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
32 * The Pipe is the most complex SimpleMessenger component. It gets
33 * two threads, one each for reading and writing on a socket it's handed
34 * at creation time, and is responsible for everything that happens on
35 * that socket. Besides message transmission, it's responsible for
36 * propagating socket errors to the SimpleMessenger and then sticking
37 * around in a state where it can provide enough data for the SimpleMessenger
38 * to provide reliable Message delivery when it manages to reconnect.
40 class Pipe : public RefCountedObject {
42 * The Reader thread handles all reads off the socket -- not just
43 * Messages, but also acks and other protocol bits (excepting startup,
44 * when the Writer does a couple of reads).
45 * All the work is implemented in Pipe itself, of course.
47 class Reader : public Thread {
50 explicit Reader(Pipe *p) : pipe(p) {}
51 void *entry() override { pipe->reader(); return 0; }
55 * The Writer thread handles all writes to the socket (after startup).
56 * All the work is implemented in Pipe itself, of course.
58 class Writer : public Thread {
61 explicit Writer(Pipe *p) : pipe(p) {}
62 void *entry() override { pipe->writer(); return 0; }
65 class DelayedDelivery;
66 DelayedDelivery *delay_thread;
68 Pipe(SimpleMessenger *r, int st, PipeConnection *con);
71 SimpleMessenger *msgr;
73 ostream& _pipe_prefix(std::ostream &out) const;
76 return static_cast<Pipe*>(RefCountedObject::get());
80 Mutex::Locker l(pipe_lock);
81 return state == STATE_OPEN;
85 size_t recv_max_prefetch;
96 STATE_WAIT // just wait for racing connection
99 static const char *get_state_name(int s) {
101 case STATE_ACCEPTING: return "accepting";
102 case STATE_CONNECTING: return "connecting";
103 case STATE_OPEN: return "open";
104 case STATE_STANDBY: return "standby";
105 case STATE_CLOSED: return "closed";
106 case STATE_CLOSING: return "closing";
107 case STATE_WAIT: return "wait";
108 default: return "UNKNOWN";
111 const char *get_state_name() {
112 return get_state_name(state);
117 struct iovec msgvec[SM_IOV_MAX];
122 entity_addr_t peer_addr;
123 Messenger::Policy policy;
127 std::atomic<bool> state_closed = { false }; // true iff state = STATE_CLOSED
129 // session_security handles any signatures or encryptions required for this pipe's msgs. PLR
131 ceph::shared_ptr<AuthSessionHandler> session_security;
134 friend class SimpleMessenger;
135 PipeConnectionRef connection_state;
137 utime_t backoff; // backoff time
139 bool reader_running, reader_needs_join;
140 bool reader_dispatching; /// reader thread is dispatching without pipe_lock
141 bool notify_on_dispatch_done; /// something wants a signal when dispatch done
144 map<int, list<Message*> > out_q; // priority queue for outbound msgs
149 bool send_keepalive_ack;
150 utime_t keepalive_ack_stamp;
151 bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
153 __u32 connect_seq, peer_global_seq;
155 uint64_t in_seq, in_seq_acked;
157 void set_socket_options();
159 int accept(); // server handshake
160 int connect(); // client handshake
163 void unlock_maybe_reap();
165 int randomize_out_seq();
167 int read_message(Message **pm,
168 AuthSessionHandler *session_security_copy);
169 int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body);
171 * Write the given data (of length len) to the Pipe's socket. This function
172 * will loop until all passed data has been written out.
173 * If more is set, the function will optimize socket writes
174 * for additional data (by passing the MSG_MORE flag, aka TCP_CORK).
176 * @param msg The msghdr to write out
177 * @param len The length of the data in msg
178 * @param more Should be set true if this is one part of a larger message
179 * @return 0, or -1 on failure (unrecoverable -- close the socket).
181 int do_sendmsg(struct msghdr *msg, unsigned len, bool more=false);
182 int write_ack(uint64_t s);
183 int write_keepalive();
184 int write_keepalive2(char tag, const utime_t &t);
186 void fault(bool reader=false);
188 void was_session_reset();
190 /* Clean up sent list */
191 void handle_ack(uint64_t seq);
194 Pipe(const Pipe& other);
195 const Pipe& operator=(const Pipe& other);
199 void maybe_start_delay_thread();
202 // public constructors
203 static const Pipe& Server(int s);
204 static const Pipe& Client(const entity_addr_t& pi);
206 uint64_t get_out_seq() { return out_seq; }
208 bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; }
210 entity_addr_t& get_peer_addr() { return peer_addr; }
212 void set_peer_addr(const entity_addr_t& a) {
213 if (&peer_addr != &a) // shut up valgrind
215 connection_state->set_peer_addr(a);
217 void set_peer_type(int t) {
219 connection_state->set_peer_type(t);
222 void register_pipe();
223 void unregister_pipe();
225 /// stop a Pipe by closing its socket and setting it to STATE_CLOSED
227 /// stop() a Pipe if not already done, and wait for it to finish any
228 /// fast_dispatch in progress.
229 void stop_and_wait();
231 void _send(Message *m) {
232 assert(pipe_lock.is_locked());
233 out_q[m->get_priority()].push_back(m);
236 void _send_keepalive() {
237 assert(pipe_lock.is_locked());
238 send_keepalive = true;
241 Message *_get_next_outgoing() {
242 assert(pipe_lock.is_locked());
244 while (!m && !out_q.empty()) {
245 map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
246 if (!p->second.empty()) {
247 m = p->second.front();
248 p->second.pop_front();
250 if (p->second.empty())
251 out_q.erase(p->first);
256 /// move all messages in the sent list back into the queue at the highest priority.
258 /// discard messages requeued by requeued_sent() up to a given seq
259 void discard_requeued_up_to(uint64_t seq);
260 void discard_out_queue();
262 void shutdown_socket() {
265 ::shutdown(sd, SHUT_RDWR);
272 ssize_t do_recv(char *buf, size_t len, int flags);
273 ssize_t buffered_recv(char *buf, size_t len, int flags);
274 bool has_pending_data() { return recv_len > recv_ofs; }
277 * do a blocking read of len bytes from socket
279 * @param buf buffer to read into
280 * @param len exact number of bytes to read
281 * @return 0 for success, or -1 on error
283 int tcp_read(char *buf, unsigned len);
286 * wait for bytes to become available on the socket
288 * @return 0 for success, or -1 on error
293 * non-blocking read of available bytes on socket
295 * This is expected to be used after tcp_read_wait(), and will return
296 * an error if there is no data on the socket to consume.
298 * @param buf buffer to read into
299 * @param len maximum number of bytes to read
300 * @return bytes read, or -1 on error or when there is no data
302 ssize_t tcp_read_nonblocking(char *buf, unsigned len);
305 * blocking write of bytes to socket
308 * @param len number of bytes to write
309 * @return 0 for success, or -1 on error
311 int tcp_write(const char *buf, unsigned len);