X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmsg%2Fsimple%2FAccepter.cc;fp=src%2Fceph%2Fsrc%2Fmsg%2Fsimple%2FAccepter.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=5779cc7abec69b8d52c66962e1092d56dbb6799e;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/msg/simple/Accepter.cc b/src/ceph/src/msg/simple/Accepter.cc deleted file mode 100644 index 5779cc7..0000000 --- a/src/ceph/src/msg/simple/Accepter.cc +++ /dev/null @@ -1,409 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "include/compat.h" -#include -#include -#include -#include -#include - -#include "msg/msg_types.h" -#include "msg/Message.h" - -#include "Accepter.h" -#include "Pipe.h" -#include "SimpleMessenger.h" - -#include "common/debug.h" -#include "common/errno.h" -#include "common/safe_io.h" - -#define dout_subsys ceph_subsys_ms - -#undef dout_prefix -#define dout_prefix *_dout << "accepter." - - -/******************************************** - * Accepter - */ - -static int set_close_on_exec(int fd) -{ - int flags = fcntl(fd, F_GETFD, 0); - if (flags < 0) { - return errno; - } - if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC)) { - return errno; - } - return 0; -} - -int Accepter::create_selfpipe(int *pipe_rd, int *pipe_wr) { - int selfpipe[2]; - int ret = ::pipe2(selfpipe, (O_CLOEXEC|O_NONBLOCK)); - if (ret < 0 ) { - lderr(msgr->cct) << __func__ << " unable to create the selfpipe: " - << cpp_strerror(errno) << dendl; - return -errno; - } - *pipe_rd = selfpipe[0]; - *pipe_wr = selfpipe[1]; - return 0; -} - -int Accepter::bind(const entity_addr_t &bind_addr, const set& avoid_ports) -{ - const md_config_t *conf = msgr->cct->_conf; - // bind to a socket - ldout(msgr->cct,10) << __func__ << dendl; - - int family; - switch (bind_addr.get_family()) { - case AF_INET: - case AF_INET6: - family = bind_addr.get_family(); - break; - - default: - // bind_addr is empty - family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET; - } - - /* socket creation */ - listen_sd = ::socket(family, SOCK_STREAM, 0); - ldout(msgr->cct,10) << __func__ << " socket sd: " << listen_sd << dendl; - if (listen_sd < 0) { - lderr(msgr->cct) << __func__ << " unable to create socket: " - << cpp_strerror(errno) << dendl; - return -errno; - } - - if (set_close_on_exec(listen_sd)) { - lderr(msgr->cct) << __func__ << " unable to set_close_exec(): " - << cpp_strerror(errno) << dendl; - } - - - // use whatever user specified (if anything) - entity_addr_t listen_addr = bind_addr; - if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) { - listen_addr.set_type(entity_addr_t::TYPE_LEGACY); - } - listen_addr.set_family(family); - - /* bind to port */ - int rc = -1; - int r = -1; - - for (int i = 0; i < conf->ms_bind_retry_count; i++) { - - if (i > 0) { - lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in " - << conf->ms_bind_retry_delay << " seconds " << dendl; - sleep(conf->ms_bind_retry_delay); - } - - if (listen_addr.get_port()) { - // specific port - - // reuse addr+port when possible - int on = 1; - rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); - if (rc < 0) { - lderr(msgr->cct) << __func__ << " unable to setsockopt: " - << cpp_strerror(errno) << dendl; - r = -errno; - continue; - } - - rc = ::bind(listen_sd, listen_addr.get_sockaddr(), - listen_addr.get_sockaddr_len()); - if (rc < 0) { - lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr - << ": " << cpp_strerror(errno) << dendl; - r = -errno; - continue; - } - } else { - // try a range of ports - for (int port = msgr->cct->_conf->ms_bind_port_min; - port <= msgr->cct->_conf->ms_bind_port_max; port++) { - if (avoid_ports.count(port)) - continue; - - listen_addr.set_port(port); - rc = ::bind(listen_sd, listen_addr.get_sockaddr(), - listen_addr.get_sockaddr_len()); - if (rc == 0) - break; - } - if (rc < 0) { - lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr - << " on any port in range " << msgr->cct->_conf->ms_bind_port_min - << "-" << msgr->cct->_conf->ms_bind_port_max - << ": " << cpp_strerror(errno) - << dendl; - r = -errno; - // Clear port before retry, otherwise we shall fail again. - listen_addr.set_port(0); - continue; - } - ldout(msgr->cct,10) << __func__ << " bound on random port " - << listen_addr << dendl; - } - - if (rc == 0) - break; - } - - // It seems that binding completely failed, return with that exit status - if (rc < 0) { - lderr(msgr->cct) << __func__ << " was unable to bind after " - << conf->ms_bind_retry_count << " attempts: " - << cpp_strerror(errno) << dendl; - ::close(listen_sd); - listen_sd = -1; - return r; - } - - // what port did we get? - sockaddr_storage ss; - socklen_t llen = sizeof(ss); - rc = getsockname(listen_sd, (sockaddr*)&ss, &llen); - if (rc < 0) { - rc = -errno; - lderr(msgr->cct) << __func__ << " failed getsockname: " - << cpp_strerror(rc) << dendl; - ::close(listen_sd); - listen_sd = -1; - return rc; - } - listen_addr.set_sockaddr((sockaddr*)&ss); - - if (msgr->cct->_conf->ms_tcp_rcvbuf) { - int size = msgr->cct->_conf->ms_tcp_rcvbuf; - rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF, - (void*)&size, sizeof(size)); - if (rc < 0) { - rc = -errno; - lderr(msgr->cct) << __func__ << " failed to set SO_RCVBUF to " - << size << ": " << cpp_strerror(rc) << dendl; - ::close(listen_sd); - listen_sd = -1; - return rc; - } - } - - ldout(msgr->cct,10) << __func__ << " bound to " << listen_addr << dendl; - - // listen! - rc = ::listen(listen_sd, msgr->cct->_conf->ms_tcp_listen_backlog); - if (rc < 0) { - rc = -errno; - lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr - << ": " << cpp_strerror(rc) << dendl; - ::close(listen_sd); - listen_sd = -1; - return rc; - } - - msgr->set_myaddr(bind_addr); - if (bind_addr != entity_addr_t()) - msgr->learned_addr(bind_addr); - else - assert(msgr->get_need_addr()); // should still be true. - - if (msgr->get_myaddr().get_port() == 0) { - msgr->set_myaddr(listen_addr); - } - entity_addr_t addr = msgr->get_myaddr(); - addr.nonce = nonce; - msgr->set_myaddr(addr); - - msgr->init_local_connection(); - - rc = create_selfpipe(&shutdown_rd_fd, &shutdown_wr_fd); - if (rc < 0) { - lderr(msgr->cct) << __func__ << " unable to create signalling pipe " << listen_addr - << ": " << cpp_strerror(rc) << dendl; - return rc; - } - - ldout(msgr->cct,1) << __func__ << " my_inst.addr is " << msgr->get_myaddr() - << " need_addr=" << msgr->get_need_addr() << dendl; - return 0; -} - -int Accepter::rebind(const set& avoid_ports) -{ - ldout(msgr->cct,1) << __func__ << " avoid " << avoid_ports << dendl; - - entity_addr_t addr = msgr->get_myaddr(); - set new_avoid = avoid_ports; - new_avoid.insert(addr.get_port()); - addr.set_port(0); - - // adjust the nonce; we want our entity_addr_t to be truly unique. - nonce += 1000000; - msgr->my_inst.addr.nonce = nonce; - ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and inst " - << msgr->my_inst << dendl; - - ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl; - int r = bind(addr, new_avoid); - if (r == 0) - start(); - return r; -} - -int Accepter::start() -{ - ldout(msgr->cct,1) << __func__ << dendl; - - // start thread - create("ms_accepter"); - - return 0; -} - -void *Accepter::entry() -{ - ldout(msgr->cct,1) << __func__ << " start" << dendl; - - int errors = 0; - int ch; - - struct pollfd pfd[2]; - memset(pfd, 0, sizeof(pfd)); - - pfd[0].fd = listen_sd; - pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; - pfd[1].fd = shutdown_rd_fd; - pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP; - while (!done) { - ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl; - int r = poll(pfd, 2, -1); - if (r < 0) { - if (errno == EINTR) { - continue; - } - ldout(msgr->cct,1) << __func__ << " poll got error" - << " errno " << errno << " " << cpp_strerror(errno) << dendl; - break; - } - ldout(msgr->cct,10) << __func__ << " poll returned oke: " << r << dendl; - ldout(msgr->cct,20) << __func__ << " pfd.revents[0]=" << pfd[0].revents << dendl; - ldout(msgr->cct,20) << __func__ << " pfd.revents[1]=" << pfd[1].revents << dendl; - - if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) { - ldout(msgr->cct,1) << __func__ << " poll got errors in revents " - << pfd[0].revents << dendl; - break; - } - if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) { - // We got "signaled" to exit the poll - // clean the selfpipe - if (::read(shutdown_rd_fd, &ch, 1) == -1) { - if (errno != EAGAIN) - ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: " - << " errno " << errno << " " << cpp_strerror(errno) << dendl; - } - break; - } - if (done) break; - - // accept - sockaddr_storage ss; - socklen_t slen = sizeof(ss); - int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen); - if (sd >= 0) { - int r = set_close_on_exec(sd); - if (r) { - ldout(msgr->cct,1) << __func__ << " set_close_on_exec() failed " - << cpp_strerror(r) << dendl; - } - errors = 0; - ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl; - - msgr->add_accept_pipe(sd); - } else { - ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; - if (++errors > 4) - break; - } - } - - ldout(msgr->cct,20) << __func__ << " closing" << dendl; - // socket is closed right after the thread has joined. - // closing it here might race - if (shutdown_rd_fd >= 0) { - ::close(shutdown_rd_fd); - shutdown_rd_fd = -1; - } - - ldout(msgr->cct,10) << __func__ << " stopping" << dendl; - return 0; -} - -void Accepter::stop() -{ - done = true; - ldout(msgr->cct,10) << __func__ << " accept listening on: " << listen_sd << dendl; - - if (shutdown_wr_fd < 0) - return; - - // Send a byte to the shutdown pipe that the thread is listening to - char buf[1] = { 0x0 }; - int ret = safe_write(shutdown_wr_fd, buf, 1); - if (ret < 0) { - ldout(msgr->cct,1) << __func__ << "close failed: " - << " errno " << errno << " " << cpp_strerror(errno) << dendl; - } else { - ldout(msgr->cct,15) << __func__ << " signaled poll" << dendl; - } - VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd)); - shutdown_wr_fd = -1; - - // wait for thread to stop before closing the socket, to avoid - // racing against fd re-use. - if (is_started()) { - ldout(msgr->cct,5) << __func__ << " wait for thread to join." << dendl; - join(); - } - - if (listen_sd >= 0) { - if (::close(listen_sd) < 0) { - ldout(msgr->cct,1) << __func__ << "close listen_sd failed: " - << " errno " << errno << " " << cpp_strerror(errno) << dendl; - } - listen_sd = -1; - } - if (shutdown_rd_fd >= 0) { - if (::close(shutdown_rd_fd) < 0) { - ldout(msgr->cct,1) << __func__ << "close shutdown_rd_fd failed: " - << " errno " << errno << " " << cpp_strerror(errno) << dendl; - } - shutdown_rd_fd = -1; - } - done = false; -} - - - -