X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fsimple%2FPipe.h;fp=src%2Fceph%2Fsrc%2Fmsg%2Fsimple%2FPipe.h;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=d8d2a8e0831db13e0fdd672db8e4f0672ff60344;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/simple/Pipe.h b/src/ceph/src/msg/simple/Pipe.h deleted file mode 100644 index d8d2a8e..0000000 --- a/src/ceph/src/msg/simple/Pipe.h +++ /dev/null @@ -1,316 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_MSGR_PIPE_H -#define CEPH_MSGR_PIPE_H - -#include "include/memory.h" -#include "auth/AuthSessionHandler.h" - -#include "msg/msg_types.h" -#include "msg/Messenger.h" -#include "PipeConnection.h" - - -class SimpleMessenger; -class DispatchQueue; - -static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); - - /** - * The Pipe is the most complex SimpleMessenger component. It gets - * two threads, one each for reading and writing on a socket it's handed - * at creation time, and is responsible for everything that happens on - * that socket. Besides message transmission, it's responsible for - * propagating socket errors to the SimpleMessenger and then sticking - * around in a state where it can provide enough data for the SimpleMessenger - * to provide reliable Message delivery when it manages to reconnect. - */ - class Pipe : public RefCountedObject { - /** - * The Reader thread handles all reads off the socket -- not just - * Messages, but also acks and other protocol bits (excepting startup, - * when the Writer does a couple of reads). - * All the work is implemented in Pipe itself, of course. - */ - class Reader : public Thread { - Pipe *pipe; - public: - explicit Reader(Pipe *p) : pipe(p) {} - void *entry() override { pipe->reader(); return 0; } - } reader_thread; - - /** - * The Writer thread handles all writes to the socket (after startup). - * All the work is implemented in Pipe itself, of course. - */ - class Writer : public Thread { - Pipe *pipe; - public: - explicit Writer(Pipe *p) : pipe(p) {} - void *entry() override { pipe->writer(); return 0; } - } writer_thread; - - class DelayedDelivery; - DelayedDelivery *delay_thread; - public: - Pipe(SimpleMessenger *r, int st, PipeConnection *con); - ~Pipe() override; - - SimpleMessenger *msgr; - uint64_t conn_id; - ostream& _pipe_prefix(std::ostream &out) const; - - Pipe* get() { - return static_cast(RefCountedObject::get()); - } - - bool is_connected() { - Mutex::Locker l(pipe_lock); - return state == STATE_OPEN; - } - - char *recv_buf; - size_t recv_max_prefetch; - size_t recv_ofs; - size_t recv_len; - - enum { - STATE_ACCEPTING, - STATE_CONNECTING, - STATE_OPEN, - STATE_STANDBY, - STATE_CLOSED, - STATE_CLOSING, - STATE_WAIT // just wait for racing connection - }; - - static const char *get_state_name(int s) { - switch (s) { - case STATE_ACCEPTING: return "accepting"; - case STATE_CONNECTING: return "connecting"; - case STATE_OPEN: return "open"; - case STATE_STANDBY: return "standby"; - case STATE_CLOSED: return "closed"; - case STATE_CLOSING: return "closing"; - case STATE_WAIT: return "wait"; - default: return "UNKNOWN"; - } - } - const char *get_state_name() { - return get_state_name(state); - } - - private: - int sd; - struct iovec msgvec[SM_IOV_MAX]; - - public: - int port; - int peer_type; - entity_addr_t peer_addr; - Messenger::Policy policy; - - Mutex pipe_lock; - int state; - std::atomic state_closed = { false }; // true iff state = STATE_CLOSED - - // session_security handles any signatures or encryptions required for this pipe's msgs. PLR - - ceph::shared_ptr session_security; - - protected: - friend class SimpleMessenger; - PipeConnectionRef connection_state; - - utime_t backoff; // backoff time - - bool reader_running, reader_needs_join; - bool reader_dispatching; /// reader thread is dispatching without pipe_lock - bool notify_on_dispatch_done; /// something wants a signal when dispatch done - bool writer_running; - - map > out_q; // priority queue for outbound msgs - DispatchQueue *in_q; - list sent; - Cond cond; - bool send_keepalive; - bool send_keepalive_ack; - utime_t keepalive_ack_stamp; - bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it - - __u32 connect_seq, peer_global_seq; - uint64_t out_seq; - uint64_t in_seq, in_seq_acked; - - void set_socket_options(); - - int accept(); // server handshake - int connect(); // client handshake - void reader(); - void writer(); - void unlock_maybe_reap(); - - int randomize_out_seq(); - - int read_message(Message **pm, - AuthSessionHandler *session_security_copy); - int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body); - /** - * Write the given data (of length len) to the Pipe's socket. This function - * will loop until all passed data has been written out. - * If more is set, the function will optimize socket writes - * for additional data (by passing the MSG_MORE flag, aka TCP_CORK). - * - * @param msg The msghdr to write out - * @param len The length of the data in msg - * @param more Should be set true if this is one part of a larger message - * @return 0, or -1 on failure (unrecoverable -- close the socket). - */ - int do_sendmsg(struct msghdr *msg, unsigned len, bool more=false); - int write_ack(uint64_t s); - int write_keepalive(); - int write_keepalive2(char tag, const utime_t &t); - - void fault(bool reader=false); - - void was_session_reset(); - - /* Clean up sent list */ - void handle_ack(uint64_t seq); - - public: - Pipe(const Pipe& other); - const Pipe& operator=(const Pipe& other); - - void start_reader(); - void start_writer(); - void maybe_start_delay_thread(); - void join_reader(); - - // public constructors - static const Pipe& Server(int s); - static const Pipe& Client(const entity_addr_t& pi); - - uint64_t get_out_seq() { return out_seq; } - - bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; } - - entity_addr_t& get_peer_addr() { return peer_addr; } - - void set_peer_addr(const entity_addr_t& a) { - if (&peer_addr != &a) // shut up valgrind - peer_addr = a; - connection_state->set_peer_addr(a); - } - void set_peer_type(int t) { - peer_type = t; - connection_state->set_peer_type(t); - } - - void register_pipe(); - void unregister_pipe(); - void join(); - /// stop a Pipe by closing its socket and setting it to STATE_CLOSED - void stop(); - /// stop() a Pipe if not already done, and wait for it to finish any - /// fast_dispatch in progress. - void stop_and_wait(); - - void _send(Message *m) { - assert(pipe_lock.is_locked()); - out_q[m->get_priority()].push_back(m); - cond.Signal(); - } - void _send_keepalive() { - assert(pipe_lock.is_locked()); - send_keepalive = true; - cond.Signal(); - } - Message *_get_next_outgoing() { - assert(pipe_lock.is_locked()); - Message *m = 0; - while (!m && !out_q.empty()) { - map >::reverse_iterator p = out_q.rbegin(); - if (!p->second.empty()) { - m = p->second.front(); - p->second.pop_front(); - } - if (p->second.empty()) - out_q.erase(p->first); - } - return m; - } - - /// move all messages in the sent list back into the queue at the highest priority. - void requeue_sent(); - /// discard messages requeued by requeued_sent() up to a given seq - void discard_requeued_up_to(uint64_t seq); - void discard_out_queue(); - - void shutdown_socket() { - recv_reset(); - if (sd >= 0) - ::shutdown(sd, SHUT_RDWR); - } - - void recv_reset() { - recv_len = 0; - recv_ofs = 0; - } - ssize_t do_recv(char *buf, size_t len, int flags); - ssize_t buffered_recv(char *buf, size_t len, int flags); - bool has_pending_data() { return recv_len > recv_ofs; } - - /** - * do a blocking read of len bytes from socket - * - * @param buf buffer to read into - * @param len exact number of bytes to read - * @return 0 for success, or -1 on error - */ - int tcp_read(char *buf, unsigned len); - - /** - * wait for bytes to become available on the socket - * - * @return 0 for success, or -1 on error - */ - int tcp_read_wait(); - - /** - * non-blocking read of available bytes on socket - * - * This is expected to be used after tcp_read_wait(), and will return - * an error if there is no data on the socket to consume. - * - * @param buf buffer to read into - * @param len maximum number of bytes to read - * @return bytes read, or -1 on error or when there is no data - */ - ssize_t tcp_read_nonblocking(char *buf, unsigned len); - - /** - * blocking write of bytes to socket - * - * @param buf buffer - * @param len number of bytes to write - * @return 0 for success, or -1 on error - */ - int tcp_write(const char *buf, unsigned len); - - }; - - -#endif