1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2011 New Dream Network
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "common/OutputDataSocket.h"
16 #include "common/errno.h"
17 #include "common/pipe.h"
18 #include "common/safe_io.h"
19 #include "include/compat.h"
24 // re-include our assert to clobber the system one; fix dout:
25 #include "include/assert.h"
27 #define dout_subsys ceph_subsys_asok
29 #define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
31 using std::ostringstream;
34 * UNIX domain sockets created by an application persist even after that
35 * application closes, unless they're explicitly unlinked. This is because the
36 * directory containing the socket keeps a reference to the socket.
38 * This code makes things a little nicer by unlinking those dead sockets when
39 * the application exits normally.
41 static pthread_mutex_t cleanup_lock = PTHREAD_MUTEX_INITIALIZER;
42 static std::vector <const char*> cleanup_files;
43 static bool cleanup_atexit = false;
45 static void remove_cleanup_file(const char *file)
47 pthread_mutex_lock(&cleanup_lock);
48 VOID_TEMP_FAILURE_RETRY(unlink(file));
49 for (std::vector <const char*>::iterator i = cleanup_files.begin();
50 i != cleanup_files.end(); ++i) {
51 if (strcmp(file, *i) == 0) {
53 cleanup_files.erase(i);
57 pthread_mutex_unlock(&cleanup_lock);
60 static void remove_all_cleanup_files()
62 pthread_mutex_lock(&cleanup_lock);
63 for (std::vector <const char*>::iterator i = cleanup_files.begin();
64 i != cleanup_files.end(); ++i) {
65 VOID_TEMP_FAILURE_RETRY(unlink(*i));
68 cleanup_files.clear();
69 pthread_mutex_unlock(&cleanup_lock);
72 static void add_cleanup_file(const char *file)
74 char *fname = strdup(file);
77 pthread_mutex_lock(&cleanup_lock);
78 cleanup_files.push_back(fname);
79 if (!cleanup_atexit) {
80 atexit(remove_all_cleanup_files);
81 cleanup_atexit = true;
83 pthread_mutex_unlock(&cleanup_lock);
87 OutputDataSocket::OutputDataSocket(CephContext *cct, uint64_t _backlog)
89 data_max_backlog(_backlog),
95 m_lock("OutputDataSocket::m_lock")
99 OutputDataSocket::~OutputDataSocket()
105 * This thread listens on the UNIX domain socket for incoming connections.
106 * It only handles one connection at a time at the moment. All I/O is nonblocking,
107 * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
109 * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
110 * pipe, the thread terminates itself gracefully, allowing the
111 * OutputDataSocketConfigObs class to join() it.
114 #define PFL_SUCCESS ((void*)(intptr_t)0)
115 #define PFL_FAIL ((void*)(intptr_t)1)
117 std::string OutputDataSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
120 int ret = pipe_cloexec(pipefd);
123 oss << "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(ret);
127 *pipe_rd = pipefd[0];
128 *pipe_wr = pipefd[1];
132 std::string OutputDataSocket::bind_and_listen(const std::string &sock_path, int *fd)
134 ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl;
136 struct sockaddr_un address;
137 if (sock_path.size() > sizeof(address.sun_path) - 1) {
139 oss << "OutputDataSocket::bind_and_listen: "
140 << "The UNIX domain socket path " << sock_path << " is too long! The "
141 << "maximum length on this system is "
142 << (sizeof(address.sun_path) - 1);
145 int sock_fd = socket(PF_UNIX, SOCK_STREAM, 0);
149 oss << "OutputDataSocket::bind_and_listen: "
150 << "failed to create socket: " << cpp_strerror(err);
153 int r = fcntl(sock_fd, F_SETFD, FD_CLOEXEC);
156 VOID_TEMP_FAILURE_RETRY(::close(sock_fd));
158 oss << "OutputDataSocket::bind_and_listen: failed to fcntl on socket: " << cpp_strerror(r);
161 memset(&address, 0, sizeof(struct sockaddr_un));
162 address.sun_family = AF_UNIX;
163 snprintf(address.sun_path, sizeof(address.sun_path),
164 "%s", sock_path.c_str());
165 if (::bind(sock_fd, (struct sockaddr*)&address,
166 sizeof(struct sockaddr_un)) != 0) {
168 if (err == EADDRINUSE) {
169 // The old UNIX domain socket must still be there.
170 // Let's unlink it and try again.
171 VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
172 if (::bind(sock_fd, (struct sockaddr*)&address,
173 sizeof(struct sockaddr_un)) == 0) {
182 oss << "OutputDataSocket::bind_and_listen: "
183 << "failed to bind the UNIX domain socket to '" << sock_path
184 << "': " << cpp_strerror(err);
189 if (listen(sock_fd, 5) != 0) {
192 oss << "OutputDataSocket::bind_and_listen: "
193 << "failed to listen to socket: " << cpp_strerror(err);
195 VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
202 void* OutputDataSocket::entry()
204 ldout(m_cct, 5) << "entry start" << dendl;
206 struct pollfd fds[2];
207 memset(fds, 0, sizeof(fds));
208 fds[0].fd = m_sock_fd;
209 fds[0].events = POLLIN | POLLRDBAND;
210 fds[1].fd = m_shutdown_rd_fd;
211 fds[1].events = POLLIN | POLLRDBAND;
213 int ret = poll(fds, 2, -1);
219 lderr(m_cct) << "OutputDataSocket: poll(2) error: '"
220 << cpp_strerror(err) << dendl;
224 if (fds[0].revents & POLLIN) {
225 // Send out some data
228 if (fds[1].revents & POLLIN) {
229 // Parent wants us to shut down
233 ldout(m_cct, 5) << "entry exit" << dendl;
235 return PFL_SUCCESS; // unreachable
239 bool OutputDataSocket::do_accept()
241 struct sockaddr_un address;
242 socklen_t address_length = sizeof(address);
243 ldout(m_cct, 30) << "OutputDataSocket: calling accept" << dendl;
244 int connection_fd = accept(m_sock_fd, (struct sockaddr*) &address,
246 ldout(m_cct, 30) << "OutputDataSocket: finished accept" << dendl;
247 if (connection_fd < 0) {
249 lderr(m_cct) << "OutputDataSocket: do_accept error: '"
250 << cpp_strerror(err) << dendl;
254 handle_connection(connection_fd);
255 close_connection(connection_fd);
260 void OutputDataSocket::handle_connection(int fd)
269 /* need to special case the connection init buffer output, as it needs
270 * to be dumped before any data, including older data that was sent
271 * before the connection was established, or before we identified
272 * older connection was broken
274 int ret = safe_write(fd, bl.c_str(), bl.length());
280 int ret = dump_data(fd);
298 int OutputDataSocket::dump_data(int fd)
301 list<bufferlist> l = std::move(data);
306 for (list<bufferlist>::iterator iter = l.begin(); iter != l.end(); ++iter) {
307 bufferlist& bl = *iter;
308 int ret = safe_write(fd, bl.c_str(), bl.length());
310 ret = safe_write(fd, delim.c_str(), delim.length());
313 for (; iter != l.end(); ++iter) {
314 bufferlist& bl = *iter;
316 data_size += bl.length();
325 void OutputDataSocket::close_connection(int fd)
327 VOID_TEMP_FAILURE_RETRY(close(fd));
330 bool OutputDataSocket::init(const std::string &path)
332 ldout(m_cct, 5) << "init " << path << dendl;
334 /* Set up things for the new thread */
336 int pipe_rd = -1, pipe_wr = -1;
337 err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
339 lderr(m_cct) << "OutputDataSocketConfigObs::init: error: " << err << dendl;
343 err = bind_and_listen(path, &sock_fd);
345 lderr(m_cct) << "OutputDataSocketConfigObs::init: failed: " << err << dendl;
351 /* Create new thread */
353 m_shutdown_rd_fd = pipe_rd;
354 m_shutdown_wr_fd = pipe_wr;
356 create("out_data_socket");
357 add_cleanup_file(m_path.c_str());
361 void OutputDataSocket::shutdown()
368 if (m_shutdown_wr_fd < 0)
371 ldout(m_cct, 5) << "shutdown" << dendl;
373 // Send a byte to the shutdown pipe that the thread is listening to
374 char buf[1] = { 0x0 };
375 int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
376 VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd));
377 m_shutdown_wr_fd = -1;
382 lderr(m_cct) << "OutputDataSocket::shutdown: failed to write "
383 "to thread shutdown pipe: error " << ret << dendl;
386 remove_cleanup_file(m_path.c_str());
390 void OutputDataSocket::append_output(bufferlist& bl)
392 Mutex::Locker l(m_lock);
394 if (data_size + bl.length() > data_max_backlog) {
395 ldout(m_cct, 20) << "dropping data output, max backlog reached" << dendl;
399 data_size += bl.length();