X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fcommon%2FOutputDataSocket.cc;fp=src%2Fceph%2Fsrc%2Fcommon%2FOutputDataSocket.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=bbee667f329f91083cc361c26e4ab86f01e26866;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/common/OutputDataSocket.cc b/src/ceph/src/common/OutputDataSocket.cc deleted file mode 100644 index bbee667..0000000 --- a/src/ceph/src/common/OutputDataSocket.cc +++ /dev/null @@ -1,402 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2011 New Dream Network - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "common/OutputDataSocket.h" -#include "common/errno.h" -#include "common/pipe.h" -#include "common/safe_io.h" -#include "include/compat.h" - -#include -#include - -// re-include our assert to clobber the system one; fix dout: -#include "include/assert.h" - -#define dout_subsys ceph_subsys_asok -#undef dout_prefix -#define dout_prefix *_dout << "asok(" << (void*)m_cct << ") " - -using std::ostringstream; - -/* - * UNIX domain sockets created by an application persist even after that - * application closes, unless they're explicitly unlinked. This is because the - * directory containing the socket keeps a reference to the socket. - * - * This code makes things a little nicer by unlinking those dead sockets when - * the application exits normally. - */ -static pthread_mutex_t cleanup_lock = PTHREAD_MUTEX_INITIALIZER; -static std::vector cleanup_files; -static bool cleanup_atexit = false; - -static void remove_cleanup_file(const char *file) -{ - pthread_mutex_lock(&cleanup_lock); - VOID_TEMP_FAILURE_RETRY(unlink(file)); - for (std::vector ::iterator i = cleanup_files.begin(); - i != cleanup_files.end(); ++i) { - if (strcmp(file, *i) == 0) { - free((void*)*i); - cleanup_files.erase(i); - break; - } - } - pthread_mutex_unlock(&cleanup_lock); -} - -static void remove_all_cleanup_files() -{ - pthread_mutex_lock(&cleanup_lock); - for (std::vector ::iterator i = cleanup_files.begin(); - i != cleanup_files.end(); ++i) { - VOID_TEMP_FAILURE_RETRY(unlink(*i)); - free((void*)*i); - } - cleanup_files.clear(); - pthread_mutex_unlock(&cleanup_lock); -} - -static void add_cleanup_file(const char *file) -{ - char *fname = strdup(file); - if (!fname) - return; - pthread_mutex_lock(&cleanup_lock); - cleanup_files.push_back(fname); - if (!cleanup_atexit) { - atexit(remove_all_cleanup_files); - cleanup_atexit = true; - } - pthread_mutex_unlock(&cleanup_lock); -} - - -OutputDataSocket::OutputDataSocket(CephContext *cct, uint64_t _backlog) - : m_cct(cct), - data_max_backlog(_backlog), - m_sock_fd(-1), - m_shutdown_rd_fd(-1), - m_shutdown_wr_fd(-1), - going_down(false), - data_size(0), - m_lock("OutputDataSocket::m_lock") -{ -} - -OutputDataSocket::~OutputDataSocket() -{ - shutdown(); -} - -/* - * This thread listens on the UNIX domain socket for incoming connections. - * It only handles one connection at a time at the moment. All I/O is nonblocking, - * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking] - * - * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this - * pipe, the thread terminates itself gracefully, allowing the - * OutputDataSocketConfigObs class to join() it. - */ - -#define PFL_SUCCESS ((void*)(intptr_t)0) -#define PFL_FAIL ((void*)(intptr_t)1) - -std::string OutputDataSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr) -{ - int pipefd[2]; - int ret = pipe_cloexec(pipefd); - if (ret < 0) { - ostringstream oss; - oss << "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(ret); - return oss.str(); - } - - *pipe_rd = pipefd[0]; - *pipe_wr = pipefd[1]; - return ""; -} - -std::string OutputDataSocket::bind_and_listen(const std::string &sock_path, int *fd) -{ - ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl; - - struct sockaddr_un address; - if (sock_path.size() > sizeof(address.sun_path) - 1) { - ostringstream oss; - oss << "OutputDataSocket::bind_and_listen: " - << "The UNIX domain socket path " << sock_path << " is too long! The " - << "maximum length on this system is " - << (sizeof(address.sun_path) - 1); - return oss.str(); - } - int sock_fd = socket(PF_UNIX, SOCK_STREAM, 0); - if (sock_fd < 0) { - int err = errno; - ostringstream oss; - oss << "OutputDataSocket::bind_and_listen: " - << "failed to create socket: " << cpp_strerror(err); - return oss.str(); - } - int r = fcntl(sock_fd, F_SETFD, FD_CLOEXEC); - if (r < 0) { - r = errno; - VOID_TEMP_FAILURE_RETRY(::close(sock_fd)); - ostringstream oss; - oss << "OutputDataSocket::bind_and_listen: failed to fcntl on socket: " << cpp_strerror(r); - return oss.str(); - } - memset(&address, 0, sizeof(struct sockaddr_un)); - address.sun_family = AF_UNIX; - snprintf(address.sun_path, sizeof(address.sun_path), - "%s", sock_path.c_str()); - if (::bind(sock_fd, (struct sockaddr*)&address, - sizeof(struct sockaddr_un)) != 0) { - int err = errno; - if (err == EADDRINUSE) { - // The old UNIX domain socket must still be there. - // Let's unlink it and try again. - VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str())); - if (::bind(sock_fd, (struct sockaddr*)&address, - sizeof(struct sockaddr_un)) == 0) { - err = 0; - } - else { - err = errno; - } - } - if (err != 0) { - ostringstream oss; - oss << "OutputDataSocket::bind_and_listen: " - << "failed to bind the UNIX domain socket to '" << sock_path - << "': " << cpp_strerror(err); - close(sock_fd); - return oss.str(); - } - } - if (listen(sock_fd, 5) != 0) { - int err = errno; - ostringstream oss; - oss << "OutputDataSocket::bind_and_listen: " - << "failed to listen to socket: " << cpp_strerror(err); - close(sock_fd); - VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str())); - return oss.str(); - } - *fd = sock_fd; - return ""; -} - -void* OutputDataSocket::entry() -{ - ldout(m_cct, 5) << "entry start" << dendl; - while (true) { - struct pollfd fds[2]; - memset(fds, 0, sizeof(fds)); - fds[0].fd = m_sock_fd; - fds[0].events = POLLIN | POLLRDBAND; - fds[1].fd = m_shutdown_rd_fd; - fds[1].events = POLLIN | POLLRDBAND; - - int ret = poll(fds, 2, -1); - if (ret < 0) { - int err = errno; - if (err == EINTR) { - continue; - } - lderr(m_cct) << "OutputDataSocket: poll(2) error: '" - << cpp_strerror(err) << dendl; - return PFL_FAIL; - } - - if (fds[0].revents & POLLIN) { - // Send out some data - do_accept(); - } - if (fds[1].revents & POLLIN) { - // Parent wants us to shut down - return PFL_SUCCESS; - } - } - ldout(m_cct, 5) << "entry exit" << dendl; - - return PFL_SUCCESS; // unreachable -} - - -bool OutputDataSocket::do_accept() -{ - struct sockaddr_un address; - socklen_t address_length = sizeof(address); - ldout(m_cct, 30) << "OutputDataSocket: calling accept" << dendl; - int connection_fd = accept(m_sock_fd, (struct sockaddr*) &address, - &address_length); - ldout(m_cct, 30) << "OutputDataSocket: finished accept" << dendl; - if (connection_fd < 0) { - int err = errno; - lderr(m_cct) << "OutputDataSocket: do_accept error: '" - << cpp_strerror(err) << dendl; - return false; - } - - handle_connection(connection_fd); - close_connection(connection_fd); - - return 0; -} - -void OutputDataSocket::handle_connection(int fd) -{ - bufferlist bl; - - m_lock.Lock(); - init_connection(bl); - m_lock.Unlock(); - - if (bl.length()) { - /* need to special case the connection init buffer output, as it needs - * to be dumped before any data, including older data that was sent - * before the connection was established, or before we identified - * older connection was broken - */ - int ret = safe_write(fd, bl.c_str(), bl.length()); - if (ret < 0) { - return; - } - } - - int ret = dump_data(fd); - if (ret < 0) - return; - - do { - m_lock.Lock(); - cond.Wait(m_lock); - - if (going_down) { - m_lock.Unlock(); - break; - } - m_lock.Unlock(); - - ret = dump_data(fd); - } while (ret >= 0); -} - -int OutputDataSocket::dump_data(int fd) -{ - m_lock.Lock(); - list l = std::move(data); - data.clear(); - data_size = 0; - m_lock.Unlock(); - - for (list::iterator iter = l.begin(); iter != l.end(); ++iter) { - bufferlist& bl = *iter; - int ret = safe_write(fd, bl.c_str(), bl.length()); - if (ret >= 0) { - ret = safe_write(fd, delim.c_str(), delim.length()); - } - if (ret < 0) { - for (; iter != l.end(); ++iter) { - bufferlist& bl = *iter; - data.push_back(bl); - data_size += bl.length(); - } - return ret; - } - } - - return 0; -} - -void OutputDataSocket::close_connection(int fd) -{ - VOID_TEMP_FAILURE_RETRY(close(fd)); -} - -bool OutputDataSocket::init(const std::string &path) -{ - ldout(m_cct, 5) << "init " << path << dendl; - - /* Set up things for the new thread */ - std::string err; - int pipe_rd = -1, pipe_wr = -1; - err = create_shutdown_pipe(&pipe_rd, &pipe_wr); - if (!err.empty()) { - lderr(m_cct) << "OutputDataSocketConfigObs::init: error: " << err << dendl; - return false; - } - int sock_fd; - err = bind_and_listen(path, &sock_fd); - if (!err.empty()) { - lderr(m_cct) << "OutputDataSocketConfigObs::init: failed: " << err << dendl; - close(pipe_rd); - close(pipe_wr); - return false; - } - - /* Create new thread */ - m_sock_fd = sock_fd; - m_shutdown_rd_fd = pipe_rd; - m_shutdown_wr_fd = pipe_wr; - m_path = path; - create("out_data_socket"); - add_cleanup_file(m_path.c_str()); - return true; -} - -void OutputDataSocket::shutdown() -{ - m_lock.Lock(); - going_down = true; - cond.Signal(); - m_lock.Unlock(); - - if (m_shutdown_wr_fd < 0) - return; - - ldout(m_cct, 5) << "shutdown" << dendl; - - // Send a byte to the shutdown pipe that the thread is listening to - char buf[1] = { 0x0 }; - int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf)); - VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd)); - m_shutdown_wr_fd = -1; - - if (ret == 0) { - join(); - } else { - lderr(m_cct) << "OutputDataSocket::shutdown: failed to write " - "to thread shutdown pipe: error " << ret << dendl; - } - - remove_cleanup_file(m_path.c_str()); - m_path.clear(); -} - -void OutputDataSocket::append_output(bufferlist& bl) -{ - Mutex::Locker l(m_lock); - - if (data_size + bl.length() > data_max_backlog) { - ldout(m_cct, 20) << "dropping data output, max backlog reached" << dendl; - } - data.push_back(bl); - - data_size += bl.length(); - - cond.Signal(); -}