initial code repo
[stor4nfv.git] / src / ceph / src / msg / simple / Accepter.cc
diff --git a/src/ceph/src/msg/simple/Accepter.cc b/src/ceph/src/msg/simple/Accepter.cc
new file mode 100644 (file)
index 0000000..5779cc7
--- /dev/null
@@ -0,0 +1,409 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "include/compat.h"
+#include <sys/socket.h>
+#include <netinet/tcp.h>
+#include <sys/uio.h>
+#include <limits.h>
+#include <poll.h>
+
+#include "msg/msg_types.h"
+#include "msg/Message.h"
+
+#include "Accepter.h"
+#include "Pipe.h"
+#include "SimpleMessenger.h"
+
+#include "common/debug.h"
+#include "common/errno.h"
+#include "common/safe_io.h"
+
+#define dout_subsys ceph_subsys_ms
+
+#undef dout_prefix
+#define dout_prefix *_dout << "accepter."
+
+
+/********************************************
+ * Accepter
+ */
+
+static int set_close_on_exec(int fd)
+{
+  int flags = fcntl(fd, F_GETFD, 0);
+  if (flags < 0) {
+    return errno;
+  }
+  if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC)) {
+    return errno;
+  }
+  return 0;
+}
+
+int Accepter::create_selfpipe(int *pipe_rd, int *pipe_wr) {
+  int selfpipe[2];
+  int ret = ::pipe2(selfpipe, (O_CLOEXEC|O_NONBLOCK));
+  if (ret < 0 ) {
+    lderr(msgr->cct) << __func__ << " unable to create the selfpipe: "
+                    << cpp_strerror(errno) << dendl;
+    return -errno;
+  }
+  *pipe_rd = selfpipe[0];
+  *pipe_wr = selfpipe[1];
+  return 0;
+}
+
+int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
+{
+  const md_config_t *conf = msgr->cct->_conf;
+  // bind to a socket
+  ldout(msgr->cct,10) <<  __func__ << dendl;
+  
+  int family;
+  switch (bind_addr.get_family()) {
+  case AF_INET:
+  case AF_INET6:
+    family = bind_addr.get_family();
+    break;
+
+  default:
+    // bind_addr is empty
+    family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET;
+  }
+
+  /* socket creation */
+  listen_sd = ::socket(family, SOCK_STREAM, 0);
+  ldout(msgr->cct,10) <<  __func__ << " socket sd: " << listen_sd << dendl;
+  if (listen_sd < 0) {
+    lderr(msgr->cct) << __func__ << " unable to create socket: "
+                    << cpp_strerror(errno) << dendl;
+    return -errno;
+  }
+
+  if (set_close_on_exec(listen_sd)) {
+    lderr(msgr->cct) << __func__ << " unable to set_close_exec(): "
+                    << cpp_strerror(errno) << dendl;
+  }
+  
+
+  // use whatever user specified (if anything)
+  entity_addr_t listen_addr = bind_addr;
+  if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) {
+    listen_addr.set_type(entity_addr_t::TYPE_LEGACY);
+  }
+  listen_addr.set_family(family);
+
+  /* bind to port */
+  int rc = -1;
+  int r = -1;
+
+  for (int i = 0; i < conf->ms_bind_retry_count; i++) {
+
+    if (i > 0) {
+        lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in " 
+                        << conf->ms_bind_retry_delay << " seconds " << dendl;
+        sleep(conf->ms_bind_retry_delay);
+    }
+
+    if (listen_addr.get_port()) {
+        // specific port
+
+        // reuse addr+port when possible
+        int on = 1;
+        rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+        if (rc < 0) {
+            lderr(msgr->cct) << __func__ << " unable to setsockopt: "
+                             << cpp_strerror(errno) << dendl;
+            r = -errno;
+            continue;
+        }
+
+        rc = ::bind(listen_sd, listen_addr.get_sockaddr(),
+                   listen_addr.get_sockaddr_len());
+        if (rc < 0) {
+            lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
+                            << ": " << cpp_strerror(errno) << dendl;
+            r = -errno;
+            continue;
+        }
+    } else {
+        // try a range of ports
+        for (int port = msgr->cct->_conf->ms_bind_port_min; 
+               port <= msgr->cct->_conf->ms_bind_port_max; port++) {
+            if (avoid_ports.count(port))
+                continue;
+
+            listen_addr.set_port(port);
+            rc = ::bind(listen_sd, listen_addr.get_sockaddr(),
+                       listen_addr.get_sockaddr_len());
+            if (rc == 0)
+                break;
+        }
+        if (rc < 0) {
+            lderr(msgr->cct) <<  __func__ << "  unable to bind to " << listen_addr
+                             << " on any port in range " << msgr->cct->_conf->ms_bind_port_min
+                             << "-" << msgr->cct->_conf->ms_bind_port_max
+                             << ": " << cpp_strerror(errno)
+                             << dendl;
+            r = -errno;
+            // Clear port before retry, otherwise we shall fail again.
+            listen_addr.set_port(0); 
+            continue;
+        }
+        ldout(msgr->cct,10) << __func__ << " bound on random port " 
+                           << listen_addr << dendl;
+    }
+
+    if (rc == 0)
+        break;
+  }
+
+  // It seems that binding completely failed, return with that exit status
+  if (rc < 0) {
+      lderr(msgr->cct) << __func__ << " was unable to bind after " 
+                      << conf->ms_bind_retry_count << " attempts: " 
+                      << cpp_strerror(errno) << dendl;
+      ::close(listen_sd);
+      listen_sd = -1;
+      return r;
+  }
+
+  // what port did we get?
+  sockaddr_storage ss;
+  socklen_t llen = sizeof(ss);
+  rc = getsockname(listen_sd, (sockaddr*)&ss, &llen);
+  if (rc < 0) {
+    rc = -errno;
+    lderr(msgr->cct) << __func__ << " failed getsockname: " 
+                    << cpp_strerror(rc) << dendl;
+    ::close(listen_sd);
+    listen_sd = -1;
+    return rc;
+  }
+  listen_addr.set_sockaddr((sockaddr*)&ss);
+  
+  if (msgr->cct->_conf->ms_tcp_rcvbuf) {
+    int size = msgr->cct->_conf->ms_tcp_rcvbuf;
+    rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF, 
+                       (void*)&size, sizeof(size));
+    if (rc < 0)  {
+      rc = -errno;
+      lderr(msgr->cct) <<  __func__ << "  failed to set SO_RCVBUF to " 
+                      << size << ": " << cpp_strerror(rc) << dendl;
+      ::close(listen_sd);
+      listen_sd = -1;
+      return rc;
+    }
+  }
+
+  ldout(msgr->cct,10) <<  __func__ << " bound to " << listen_addr << dendl;
+
+  // listen!
+  rc = ::listen(listen_sd, msgr->cct->_conf->ms_tcp_listen_backlog);
+  if (rc < 0) {
+    rc = -errno;
+    lderr(msgr->cct) <<  __func__ << " unable to listen on " << listen_addr
+                    << ": " << cpp_strerror(rc) << dendl;
+    ::close(listen_sd);
+    listen_sd = -1;
+    return rc;
+  }
+  
+  msgr->set_myaddr(bind_addr);
+  if (bind_addr != entity_addr_t())
+    msgr->learned_addr(bind_addr);
+  else
+    assert(msgr->get_need_addr());  // should still be true.
+
+  if (msgr->get_myaddr().get_port() == 0) {
+    msgr->set_myaddr(listen_addr);
+  }
+  entity_addr_t addr = msgr->get_myaddr();
+  addr.nonce = nonce;
+  msgr->set_myaddr(addr);
+
+  msgr->init_local_connection();
+
+  rc = create_selfpipe(&shutdown_rd_fd, &shutdown_wr_fd);
+  if (rc < 0) {
+    lderr(msgr->cct) <<  __func__ << " unable to create signalling pipe " << listen_addr
+                    << ": " << cpp_strerror(rc) << dendl;
+    return rc;
+  }
+
+  ldout(msgr->cct,1) <<  __func__ << " my_inst.addr is " << msgr->get_myaddr()
+                    << " need_addr=" << msgr->get_need_addr() << dendl;
+  return 0;
+}
+
+int Accepter::rebind(const set<int>& avoid_ports)
+{
+  ldout(msgr->cct,1) << __func__ << " avoid " << avoid_ports << dendl;
+  
+  entity_addr_t addr = msgr->get_myaddr();
+  set<int> new_avoid = avoid_ports;
+  new_avoid.insert(addr.get_port());
+  addr.set_port(0);
+
+  // adjust the nonce; we want our entity_addr_t to be truly unique.
+  nonce += 1000000;
+  msgr->my_inst.addr.nonce = nonce;
+  ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and inst " 
+                       << msgr->my_inst << dendl;
+
+  ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl;
+  int r = bind(addr, new_avoid);
+  if (r == 0)
+    start();
+  return r;
+}
+
+int Accepter::start()
+{
+  ldout(msgr->cct,1) << __func__ << dendl;
+
+  // start thread
+  create("ms_accepter");
+
+  return 0;
+}
+
+void *Accepter::entry()
+{
+  ldout(msgr->cct,1) << __func__ << " start" << dendl;
+  
+  int errors = 0;
+  int ch;
+
+  struct pollfd pfd[2];
+  memset(pfd, 0, sizeof(pfd));
+
+  pfd[0].fd = listen_sd;
+  pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+  pfd[1].fd = shutdown_rd_fd;
+  pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
+  while (!done) {
+    ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl;
+    int r = poll(pfd, 2, -1);
+    if (r < 0) {
+      if (errno == EINTR) {
+        continue;
+      }
+      ldout(msgr->cct,1) << __func__ << " poll got error"  
+                         << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+      break;
+    }
+    ldout(msgr->cct,10) << __func__ << " poll returned oke: " << r << dendl;
+    ldout(msgr->cct,20) << __func__ <<  " pfd.revents[0]=" << pfd[0].revents << dendl;
+    ldout(msgr->cct,20) << __func__ <<  " pfd.revents[1]=" << pfd[1].revents << dendl;
+
+    if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) {
+      ldout(msgr->cct,1) << __func__ << " poll got errors in revents "  
+                        <<  pfd[0].revents << dendl;
+      break;
+    }
+    if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) {
+      // We got "signaled" to exit the poll
+      // clean the selfpipe
+      if (::read(shutdown_rd_fd, &ch, 1) == -1) {
+        if (errno != EAGAIN)
+          ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: "
+                             << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+        }
+      break;
+    }
+    if (done) break;
+
+    // accept
+    sockaddr_storage ss;
+    socklen_t slen = sizeof(ss);
+    int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen);
+    if (sd >= 0) {
+      int r = set_close_on_exec(sd);
+      if (r) {
+       ldout(msgr->cct,1) << __func__ << " set_close_on_exec() failed "
+             << cpp_strerror(r) << dendl;
+      }
+      errors = 0;
+      ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl;
+      
+      msgr->add_accept_pipe(sd);
+    } else {
+      ldout(msgr->cct,0) << __func__ << " no incoming connection?  sd = " << sd
+             << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+      if (++errors > 4)
+       break;
+    }
+  }
+
+  ldout(msgr->cct,20) << __func__ << " closing" << dendl;
+  // socket is closed right after the thread has joined.
+  // closing it here might race
+  if (shutdown_rd_fd >= 0) {
+    ::close(shutdown_rd_fd);
+    shutdown_rd_fd = -1;
+  }
+
+  ldout(msgr->cct,10) << __func__ << " stopping" << dendl;
+  return 0;
+}
+
+void Accepter::stop()
+{
+  done = true;
+  ldout(msgr->cct,10) << __func__ << " accept listening on: " << listen_sd << dendl;
+
+  if (shutdown_wr_fd < 0)
+    return;
+
+  // Send a byte to the shutdown pipe that the thread is listening to
+  char buf[1] = { 0x0 };
+  int ret = safe_write(shutdown_wr_fd, buf, 1);
+  if (ret < 0) {
+    ldout(msgr->cct,1) << __func__ << "close failed: "
+             << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+  } else {
+    ldout(msgr->cct,15) << __func__ << " signaled poll" << dendl;
+  }
+  VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd));
+  shutdown_wr_fd = -1;
+
+  // wait for thread to stop before closing the socket, to avoid
+  // racing against fd re-use.
+  if (is_started()) {
+    ldout(msgr->cct,5) << __func__ << " wait for thread to join." << dendl;
+    join();
+  }
+
+  if (listen_sd >= 0) {
+    if (::close(listen_sd) < 0) {
+      ldout(msgr->cct,1) << __func__ << "close listen_sd failed: "
+             << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+    }
+    listen_sd = -1;
+  }
+  if (shutdown_rd_fd >= 0) {
+    if (::close(shutdown_rd_fd) < 0) {
+      ldout(msgr->cct,1) << __func__ << "close shutdown_rd_fd failed: "
+             << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+    }
+    shutdown_rd_fd = -1;
+  }
+  done = false;
+}
+
+
+
+