Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / OutputDataSocket.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2011 New Dream Network
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "common/OutputDataSocket.h"
16 #include "common/errno.h"
17 #include "common/pipe.h"
18 #include "common/safe_io.h"
19 #include "include/compat.h"
20
21 #include <poll.h>
22 #include <sys/un.h>
23
24 // re-include our assert to clobber the system one; fix dout:
25 #include "include/assert.h"
26
27 #define dout_subsys ceph_subsys_asok
28 #undef dout_prefix
29 #define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
30
31 using std::ostringstream;
32
33 /*
34  * UNIX domain sockets created by an application persist even after that
35  * application closes, unless they're explicitly unlinked. This is because the
36  * directory containing the socket keeps a reference to the socket.
37  *
38  * This code makes things a little nicer by unlinking those dead sockets when
39  * the application exits normally.
40  */
41 static pthread_mutex_t cleanup_lock = PTHREAD_MUTEX_INITIALIZER;
42 static std::vector <const char*> cleanup_files;
43 static bool cleanup_atexit = false;
44
45 static void remove_cleanup_file(const char *file)
46 {
47   pthread_mutex_lock(&cleanup_lock);
48   VOID_TEMP_FAILURE_RETRY(unlink(file));
49   for (std::vector <const char*>::iterator i = cleanup_files.begin();
50        i != cleanup_files.end(); ++i) {
51     if (strcmp(file, *i) == 0) {
52       free((void*)*i);
53       cleanup_files.erase(i);
54       break;
55     }
56   }
57   pthread_mutex_unlock(&cleanup_lock);
58 }
59
60 static void remove_all_cleanup_files()
61 {
62   pthread_mutex_lock(&cleanup_lock);
63   for (std::vector <const char*>::iterator i = cleanup_files.begin();
64        i != cleanup_files.end(); ++i) {
65     VOID_TEMP_FAILURE_RETRY(unlink(*i));
66     free((void*)*i);
67   }
68   cleanup_files.clear();
69   pthread_mutex_unlock(&cleanup_lock);
70 }
71
72 static void add_cleanup_file(const char *file)
73 {
74   char *fname = strdup(file);
75   if (!fname)
76     return;
77   pthread_mutex_lock(&cleanup_lock);
78   cleanup_files.push_back(fname);
79   if (!cleanup_atexit) {
80     atexit(remove_all_cleanup_files);
81     cleanup_atexit = true;
82   }
83   pthread_mutex_unlock(&cleanup_lock);
84 }
85
86
87 OutputDataSocket::OutputDataSocket(CephContext *cct, uint64_t _backlog)
88   : m_cct(cct),
89     data_max_backlog(_backlog),
90     m_sock_fd(-1),
91     m_shutdown_rd_fd(-1),
92     m_shutdown_wr_fd(-1),
93     going_down(false),
94     data_size(0),
95     m_lock("OutputDataSocket::m_lock")
96 {
97 }
98
99 OutputDataSocket::~OutputDataSocket()
100 {
101   shutdown();
102 }
103
104 /*
105  * This thread listens on the UNIX domain socket for incoming connections.
106  * It only handles one connection at a time at the moment. All I/O is nonblocking,
107  * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
108  *
109  * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
110  * pipe, the thread terminates itself gracefully, allowing the
111  * OutputDataSocketConfigObs class to join() it.
112  */
113
114 #define PFL_SUCCESS ((void*)(intptr_t)0)
115 #define PFL_FAIL ((void*)(intptr_t)1)
116
117 std::string OutputDataSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
118 {
119   int pipefd[2];
120   int ret = pipe_cloexec(pipefd);
121   if (ret < 0) {
122     ostringstream oss;
123     oss << "OutputDataSocket::create_shutdown_pipe error: " << cpp_strerror(ret);
124     return oss.str();
125   }
126   
127   *pipe_rd = pipefd[0];
128   *pipe_wr = pipefd[1];
129   return "";
130 }
131
132 std::string OutputDataSocket::bind_and_listen(const std::string &sock_path, int *fd)
133 {
134   ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl;
135
136   struct sockaddr_un address;
137   if (sock_path.size() > sizeof(address.sun_path) - 1) {
138     ostringstream oss;
139     oss << "OutputDataSocket::bind_and_listen: "
140         << "The UNIX domain socket path " << sock_path << " is too long! The "
141         << "maximum length on this system is "
142         << (sizeof(address.sun_path) - 1);
143     return oss.str();
144   }
145   int sock_fd = socket(PF_UNIX, SOCK_STREAM, 0);
146   if (sock_fd < 0) {
147     int err = errno;
148     ostringstream oss;
149     oss << "OutputDataSocket::bind_and_listen: "
150         << "failed to create socket: " << cpp_strerror(err);
151     return oss.str();
152   }
153   int r = fcntl(sock_fd, F_SETFD, FD_CLOEXEC);
154   if (r < 0) {
155     r = errno;
156     VOID_TEMP_FAILURE_RETRY(::close(sock_fd));
157     ostringstream oss;
158     oss << "OutputDataSocket::bind_and_listen: failed to fcntl on socket: " << cpp_strerror(r);
159     return oss.str();
160   }
161   memset(&address, 0, sizeof(struct sockaddr_un));
162   address.sun_family = AF_UNIX;
163   snprintf(address.sun_path, sizeof(address.sun_path),
164            "%s", sock_path.c_str());
165   if (::bind(sock_fd, (struct sockaddr*)&address,
166            sizeof(struct sockaddr_un)) != 0) {
167     int err = errno;
168     if (err == EADDRINUSE) {
169       // The old UNIX domain socket must still be there.
170       // Let's unlink it and try again.
171       VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
172       if (::bind(sock_fd, (struct sockaddr*)&address,
173                sizeof(struct sockaddr_un)) == 0) {
174         err = 0;
175       }
176       else {
177         err = errno;
178       }
179     }
180     if (err != 0) {
181       ostringstream oss;
182       oss << "OutputDataSocket::bind_and_listen: "
183           << "failed to bind the UNIX domain socket to '" << sock_path
184           << "': " << cpp_strerror(err);
185       close(sock_fd);
186       return oss.str();
187     }
188   }
189   if (listen(sock_fd, 5) != 0) {
190     int err = errno;
191     ostringstream oss;
192     oss << "OutputDataSocket::bind_and_listen: "
193           << "failed to listen to socket: " << cpp_strerror(err);
194     close(sock_fd);
195     VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
196     return oss.str();
197   }
198   *fd = sock_fd;
199   return "";
200 }
201
202 void* OutputDataSocket::entry()
203 {
204   ldout(m_cct, 5) << "entry start" << dendl;
205   while (true) {
206     struct pollfd fds[2];
207     memset(fds, 0, sizeof(fds));
208     fds[0].fd = m_sock_fd;
209     fds[0].events = POLLIN | POLLRDBAND;
210     fds[1].fd = m_shutdown_rd_fd;
211     fds[1].events = POLLIN | POLLRDBAND;
212
213     int ret = poll(fds, 2, -1);
214     if (ret < 0) {
215       int err = errno;
216       if (err == EINTR) {
217         continue;
218       }
219       lderr(m_cct) << "OutputDataSocket: poll(2) error: '"
220                    << cpp_strerror(err) << dendl;
221       return PFL_FAIL;
222     }
223
224     if (fds[0].revents & POLLIN) {
225       // Send out some data
226       do_accept();
227     }
228     if (fds[1].revents & POLLIN) {
229       // Parent wants us to shut down
230       return PFL_SUCCESS;
231     }
232   }
233   ldout(m_cct, 5) << "entry exit" << dendl;
234
235   return PFL_SUCCESS; // unreachable
236 }
237
238
239 bool OutputDataSocket::do_accept()
240 {
241   struct sockaddr_un address;
242   socklen_t address_length = sizeof(address);
243   ldout(m_cct, 30) << "OutputDataSocket: calling accept" << dendl;
244   int connection_fd = accept(m_sock_fd, (struct sockaddr*) &address,
245                              &address_length);
246   ldout(m_cct, 30) << "OutputDataSocket: finished accept" << dendl;
247   if (connection_fd < 0) {
248     int err = errno;
249     lderr(m_cct) << "OutputDataSocket: do_accept error: '"
250                            << cpp_strerror(err) << dendl;
251     return false;
252   }
253
254   handle_connection(connection_fd);
255   close_connection(connection_fd);
256
257   return 0;
258 }
259
260 void OutputDataSocket::handle_connection(int fd)
261 {
262   bufferlist bl;
263
264   m_lock.Lock();
265   init_connection(bl);
266   m_lock.Unlock();
267
268   if (bl.length()) {
269     /* need to special case the connection init buffer output, as it needs
270      * to be dumped before any data, including older data that was sent
271      * before the connection was established, or before we identified
272      * older connection was broken
273      */
274     int ret = safe_write(fd, bl.c_str(), bl.length());
275     if (ret < 0) {
276       return;
277     }
278   }
279
280   int ret = dump_data(fd);
281   if (ret < 0)
282     return;
283
284   do {
285     m_lock.Lock();
286     cond.Wait(m_lock);
287
288     if (going_down) {
289       m_lock.Unlock();
290       break;
291     }
292     m_lock.Unlock();
293
294     ret = dump_data(fd);
295   } while (ret >= 0);
296 }
297
298 int OutputDataSocket::dump_data(int fd)
299 {
300   m_lock.Lock(); 
301   list<bufferlist> l = std::move(data);
302   data.clear();
303   data_size = 0;
304   m_lock.Unlock();
305
306   for (list<bufferlist>::iterator iter = l.begin(); iter != l.end(); ++iter) {
307     bufferlist& bl = *iter;
308     int ret = safe_write(fd, bl.c_str(), bl.length());
309     if (ret >= 0) {
310       ret = safe_write(fd, delim.c_str(), delim.length());
311     }
312     if (ret < 0) {
313       for (; iter != l.end(); ++iter) {
314         bufferlist& bl = *iter;
315         data.push_back(bl);
316         data_size += bl.length();
317       }
318       return ret;
319     }
320   }
321
322   return 0;
323 }
324
325 void OutputDataSocket::close_connection(int fd)
326 {
327   VOID_TEMP_FAILURE_RETRY(close(fd));
328 }
329
330 bool OutputDataSocket::init(const std::string &path)
331 {
332   ldout(m_cct, 5) << "init " << path << dendl;
333
334   /* Set up things for the new thread */
335   std::string err;
336   int pipe_rd = -1, pipe_wr = -1;
337   err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
338   if (!err.empty()) {
339     lderr(m_cct) << "OutputDataSocketConfigObs::init: error: " << err << dendl;
340     return false;
341   }
342   int sock_fd;
343   err = bind_and_listen(path, &sock_fd);
344   if (!err.empty()) {
345     lderr(m_cct) << "OutputDataSocketConfigObs::init: failed: " << err << dendl;
346     close(pipe_rd);
347     close(pipe_wr);
348     return false;
349   }
350
351   /* Create new thread */
352   m_sock_fd = sock_fd;
353   m_shutdown_rd_fd = pipe_rd;
354   m_shutdown_wr_fd = pipe_wr;
355   m_path = path;
356   create("out_data_socket");
357   add_cleanup_file(m_path.c_str());
358   return true;
359 }
360
361 void OutputDataSocket::shutdown()
362 {
363   m_lock.Lock();
364   going_down = true;
365   cond.Signal();
366   m_lock.Unlock();
367
368   if (m_shutdown_wr_fd < 0)
369     return;
370
371   ldout(m_cct, 5) << "shutdown" << dendl;
372
373   // Send a byte to the shutdown pipe that the thread is listening to
374   char buf[1] = { 0x0 };
375   int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
376   VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd));
377   m_shutdown_wr_fd = -1;
378
379   if (ret == 0) {
380     join();
381   } else {
382     lderr(m_cct) << "OutputDataSocket::shutdown: failed to write "
383       "to thread shutdown pipe: error " << ret << dendl;
384   }
385
386   remove_cleanup_file(m_path.c_str());
387   m_path.clear();
388 }
389
390 void OutputDataSocket::append_output(bufferlist& bl)
391 {
392   Mutex::Locker l(m_lock);
393
394   if (data_size + bl.length() > data_max_backlog) {
395     ldout(m_cct, 20) << "dropping data output, max backlog reached" << dendl;
396   }
397   data.push_back(bl);
398
399   data_size += bl.length();
400
401   cond.Signal();
402 }