initial code repo
[stor4nfv.git] / src / ceph / src / msg / simple / Pipe.h
diff --git a/src/ceph/src/msg/simple/Pipe.h b/src/ceph/src/msg/simple/Pipe.h
new file mode 100644 (file)
index 0000000..d8d2a8e
--- /dev/null
@@ -0,0 +1,316 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef CEPH_MSGR_PIPE_H
+#define CEPH_MSGR_PIPE_H
+
+#include "include/memory.h"
+#include "auth/AuthSessionHandler.h"
+
+#include "msg/msg_types.h"
+#include "msg/Messenger.h"
+#include "PipeConnection.h"
+
+
+class SimpleMessenger;
+class DispatchQueue;
+
+static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
+
+  /**
+   * The Pipe is the most complex SimpleMessenger component. It gets
+   * two threads, one each for reading and writing on a socket it's handed
+   * at creation time, and is responsible for everything that happens on
+   * that socket. Besides message transmission, it's responsible for
+   * propagating socket errors to the SimpleMessenger and then sticking
+   * around in a state where it can provide enough data for the SimpleMessenger
+   * to provide reliable Message delivery when it manages to reconnect.
+   */
+  class Pipe : public RefCountedObject {
+    /**
+     * The Reader thread handles all reads off the socket -- not just
+     * Messages, but also acks and other protocol bits (excepting startup,
+     * when the Writer does a couple of reads).
+     * All the work is implemented in Pipe itself, of course.
+     */
+    class Reader : public Thread {
+      Pipe *pipe;
+    public:
+      explicit Reader(Pipe *p) : pipe(p) {}
+      void *entry() override { pipe->reader(); return 0; }
+    } reader_thread;
+
+    /**
+     * The Writer thread handles all writes to the socket (after startup).
+     * All the work is implemented in Pipe itself, of course.
+     */
+    class Writer : public Thread {
+      Pipe *pipe;
+    public:
+      explicit Writer(Pipe *p) : pipe(p) {}
+      void *entry() override { pipe->writer(); return 0; }
+    } writer_thread;
+
+    class DelayedDelivery;
+    DelayedDelivery *delay_thread;
+  public:
+    Pipe(SimpleMessenger *r, int st, PipeConnection *con);
+    ~Pipe() override;
+
+    SimpleMessenger *msgr;
+    uint64_t conn_id;
+    ostream& _pipe_prefix(std::ostream &out) const;
+
+    Pipe* get() {
+      return static_cast<Pipe*>(RefCountedObject::get());
+    }
+
+    bool is_connected() {
+      Mutex::Locker l(pipe_lock);
+      return state == STATE_OPEN;
+    }
+
+    char *recv_buf;
+    size_t recv_max_prefetch;
+    size_t recv_ofs;
+    size_t recv_len;
+
+    enum {
+      STATE_ACCEPTING,
+      STATE_CONNECTING,
+      STATE_OPEN,
+      STATE_STANDBY,
+      STATE_CLOSED,
+      STATE_CLOSING,
+      STATE_WAIT       // just wait for racing connection
+    };
+
+    static const char *get_state_name(int s) {
+      switch (s) {
+      case STATE_ACCEPTING: return "accepting";
+      case STATE_CONNECTING: return "connecting";
+      case STATE_OPEN: return "open";
+      case STATE_STANDBY: return "standby";
+      case STATE_CLOSED: return "closed";
+      case STATE_CLOSING: return "closing";
+      case STATE_WAIT: return "wait";
+      default: return "UNKNOWN";
+      }
+    }
+    const char *get_state_name() {
+      return get_state_name(state);
+    }
+
+  private:
+    int sd;
+    struct iovec msgvec[SM_IOV_MAX];
+
+  public:
+    int port;
+    int peer_type;
+    entity_addr_t peer_addr;
+    Messenger::Policy policy;
+    
+    Mutex pipe_lock;
+    int state;
+    std::atomic<bool> state_closed = { false }; // true iff state = STATE_CLOSED
+
+    // session_security handles any signatures or encryptions required for this pipe's msgs. PLR
+
+    ceph::shared_ptr<AuthSessionHandler> session_security;
+
+  protected:
+    friend class SimpleMessenger;
+    PipeConnectionRef connection_state;
+
+    utime_t backoff;         // backoff time
+
+    bool reader_running, reader_needs_join;
+    bool reader_dispatching; /// reader thread is dispatching without pipe_lock
+    bool notify_on_dispatch_done; /// something wants a signal when dispatch done
+    bool writer_running;
+
+    map<int, list<Message*> > out_q;  // priority queue for outbound msgs
+    DispatchQueue *in_q;
+    list<Message*> sent;
+    Cond cond;
+    bool send_keepalive;
+    bool send_keepalive_ack;
+    utime_t keepalive_ack_stamp;
+    bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
+    
+    __u32 connect_seq, peer_global_seq;
+    uint64_t out_seq;
+    uint64_t in_seq, in_seq_acked;
+    
+    void set_socket_options();
+
+    int accept();   // server handshake
+    int connect();  // client handshake
+    void reader();
+    void writer();
+    void unlock_maybe_reap();
+
+    int randomize_out_seq();
+
+    int read_message(Message **pm,
+                    AuthSessionHandler *session_security_copy);
+    int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body);
+    /**
+     * Write the given data (of length len) to the Pipe's socket. This function
+     * will loop until all passed data has been written out.
+     * If more is set, the function will optimize socket writes
+     * for additional data (by passing the MSG_MORE flag, aka TCP_CORK).
+     *
+     * @param msg The msghdr to write out
+     * @param len The length of the data in msg
+     * @param more Should be set true if this is one part of a larger message
+     * @return 0, or -1 on failure (unrecoverable -- close the socket).
+     */
+    int do_sendmsg(struct msghdr *msg, unsigned len, bool more=false);
+    int write_ack(uint64_t s);
+    int write_keepalive();
+    int write_keepalive2(char tag, const utime_t &t);
+
+    void fault(bool reader=false);
+
+    void was_session_reset();
+
+    /* Clean up sent list */
+    void handle_ack(uint64_t seq);
+
+    public:
+    Pipe(const Pipe& other);
+    const Pipe& operator=(const Pipe& other);
+
+    void start_reader();
+    void start_writer();
+    void maybe_start_delay_thread();
+    void join_reader();
+
+    // public constructors
+    static const Pipe& Server(int s);
+    static const Pipe& Client(const entity_addr_t& pi);
+
+    uint64_t get_out_seq() { return out_seq; }
+
+    bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; }
+
+    entity_addr_t& get_peer_addr() { return peer_addr; }
+
+    void set_peer_addr(const entity_addr_t& a) {
+      if (&peer_addr != &a)  // shut up valgrind
+        peer_addr = a;
+      connection_state->set_peer_addr(a);
+    }
+    void set_peer_type(int t) {
+      peer_type = t;
+      connection_state->set_peer_type(t);
+    }
+
+    void register_pipe();
+    void unregister_pipe();
+    void join();
+    /// stop a Pipe by closing its socket and setting it to STATE_CLOSED
+    void stop();
+    /// stop() a Pipe if not already done, and wait for it to finish any
+    /// fast_dispatch in progress.
+    void stop_and_wait();
+
+    void _send(Message *m) {
+      assert(pipe_lock.is_locked());
+      out_q[m->get_priority()].push_back(m);
+      cond.Signal();
+    }
+    void _send_keepalive() {
+      assert(pipe_lock.is_locked());
+      send_keepalive = true;
+      cond.Signal();
+    }
+    Message *_get_next_outgoing() {
+      assert(pipe_lock.is_locked());
+      Message *m = 0;
+      while (!m && !out_q.empty()) {
+        map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
+        if (!p->second.empty()) {
+          m = p->second.front();
+          p->second.pop_front();
+        }
+        if (p->second.empty())
+          out_q.erase(p->first);
+      }
+      return m;
+    }
+
+    /// move all messages in the sent list back into the queue at the highest priority.
+    void requeue_sent();
+    /// discard messages requeued by requeued_sent() up to a given seq
+    void discard_requeued_up_to(uint64_t seq);
+    void discard_out_queue();
+
+    void shutdown_socket() {
+      recv_reset();
+      if (sd >= 0)
+        ::shutdown(sd, SHUT_RDWR);
+    }
+
+    void recv_reset() {
+      recv_len = 0;
+      recv_ofs = 0;
+    }
+    ssize_t do_recv(char *buf, size_t len, int flags);
+    ssize_t buffered_recv(char *buf, size_t len, int flags);
+    bool has_pending_data() { return recv_len > recv_ofs; }
+
+    /**
+     * do a blocking read of len bytes from socket
+     *
+     * @param buf buffer to read into
+     * @param len exact number of bytes to read
+     * @return 0 for success, or -1 on error
+     */
+    int tcp_read(char *buf, unsigned len);
+
+    /**
+     * wait for bytes to become available on the socket
+     *
+     * @return 0 for success, or -1 on error
+     */
+    int tcp_read_wait();
+
+    /**
+     * non-blocking read of available bytes on socket
+     *
+     * This is expected to be used after tcp_read_wait(), and will return
+     * an error if there is no data on the socket to consume.
+     *
+     * @param buf buffer to read into
+     * @param len maximum number of bytes to read
+     * @return bytes read, or -1 on error or when there is no data
+     */
+    ssize_t tcp_read_nonblocking(char *buf, unsigned len);
+
+    /**
+     * blocking write of bytes to socket
+     *
+     * @param buf buffer
+     * @param len number of bytes to write
+     * @return 0 for success, or -1 on error
+     */
+    int tcp_write(const char *buf, unsigned len);
+
+  };
+
+
+#endif