Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / msg / simple / Accepter.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "include/compat.h"
16 #include <sys/socket.h>
17 #include <netinet/tcp.h>
18 #include <sys/uio.h>
19 #include <limits.h>
20 #include <poll.h>
21
22 #include "msg/msg_types.h"
23 #include "msg/Message.h"
24
25 #include "Accepter.h"
26 #include "Pipe.h"
27 #include "SimpleMessenger.h"
28
29 #include "common/debug.h"
30 #include "common/errno.h"
31 #include "common/safe_io.h"
32
33 #define dout_subsys ceph_subsys_ms
34
35 #undef dout_prefix
36 #define dout_prefix *_dout << "accepter."
37
38
39 /********************************************
40  * Accepter
41  */
42
43 static int set_close_on_exec(int fd)
44 {
45   int flags = fcntl(fd, F_GETFD, 0);
46   if (flags < 0) {
47     return errno;
48   }
49   if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC)) {
50     return errno;
51   }
52   return 0;
53 }
54
55 int Accepter::create_selfpipe(int *pipe_rd, int *pipe_wr) {
56   int selfpipe[2];
57   int ret = ::pipe2(selfpipe, (O_CLOEXEC|O_NONBLOCK));
58   if (ret < 0 ) {
59     lderr(msgr->cct) << __func__ << " unable to create the selfpipe: "
60                     << cpp_strerror(errno) << dendl;
61     return -errno;
62   }
63   *pipe_rd = selfpipe[0];
64   *pipe_wr = selfpipe[1];
65   return 0;
66 }
67
68 int Accepter::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
69 {
70   const md_config_t *conf = msgr->cct->_conf;
71   // bind to a socket
72   ldout(msgr->cct,10) <<  __func__ << dendl;
73   
74   int family;
75   switch (bind_addr.get_family()) {
76   case AF_INET:
77   case AF_INET6:
78     family = bind_addr.get_family();
79     break;
80
81   default:
82     // bind_addr is empty
83     family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET;
84   }
85
86   /* socket creation */
87   listen_sd = ::socket(family, SOCK_STREAM, 0);
88   ldout(msgr->cct,10) <<  __func__ << " socket sd: " << listen_sd << dendl;
89   if (listen_sd < 0) {
90     lderr(msgr->cct) << __func__ << " unable to create socket: "
91                      << cpp_strerror(errno) << dendl;
92     return -errno;
93   }
94
95   if (set_close_on_exec(listen_sd)) {
96     lderr(msgr->cct) << __func__ << " unable to set_close_exec(): "
97                      << cpp_strerror(errno) << dendl;
98   }
99   
100
101   // use whatever user specified (if anything)
102   entity_addr_t listen_addr = bind_addr;
103   if (listen_addr.get_type() == entity_addr_t::TYPE_NONE) {
104     listen_addr.set_type(entity_addr_t::TYPE_LEGACY);
105   }
106   listen_addr.set_family(family);
107
108   /* bind to port */
109   int rc = -1;
110   int r = -1;
111
112   for (int i = 0; i < conf->ms_bind_retry_count; i++) {
113
114     if (i > 0) {
115         lderr(msgr->cct) << __func__ << " was unable to bind. Trying again in " 
116                          << conf->ms_bind_retry_delay << " seconds " << dendl;
117         sleep(conf->ms_bind_retry_delay);
118     }
119
120     if (listen_addr.get_port()) {
121         // specific port
122
123         // reuse addr+port when possible
124         int on = 1;
125         rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
126         if (rc < 0) {
127             lderr(msgr->cct) << __func__ << " unable to setsockopt: "
128                              << cpp_strerror(errno) << dendl;
129             r = -errno;
130             continue;
131         }
132
133         rc = ::bind(listen_sd, listen_addr.get_sockaddr(),
134                     listen_addr.get_sockaddr_len());
135         if (rc < 0) {
136             lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
137                              << ": " << cpp_strerror(errno) << dendl;
138             r = -errno;
139             continue;
140         }
141     } else {
142         // try a range of ports
143         for (int port = msgr->cct->_conf->ms_bind_port_min; 
144                 port <= msgr->cct->_conf->ms_bind_port_max; port++) {
145             if (avoid_ports.count(port))
146                 continue;
147
148             listen_addr.set_port(port);
149             rc = ::bind(listen_sd, listen_addr.get_sockaddr(),
150                         listen_addr.get_sockaddr_len());
151             if (rc == 0)
152                 break;
153         }
154         if (rc < 0) {
155             lderr(msgr->cct) <<  __func__ << "  unable to bind to " << listen_addr
156                              << " on any port in range " << msgr->cct->_conf->ms_bind_port_min
157                              << "-" << msgr->cct->_conf->ms_bind_port_max
158                              << ": " << cpp_strerror(errno)
159                              << dendl;
160             r = -errno;
161             // Clear port before retry, otherwise we shall fail again.
162             listen_addr.set_port(0); 
163             continue;
164         }
165         ldout(msgr->cct,10) << __func__ << " bound on random port " 
166                             << listen_addr << dendl;
167     }
168
169     if (rc == 0)
170         break;
171   }
172
173   // It seems that binding completely failed, return with that exit status
174   if (rc < 0) {
175       lderr(msgr->cct) << __func__ << " was unable to bind after " 
176                        << conf->ms_bind_retry_count << " attempts: " 
177                        << cpp_strerror(errno) << dendl;
178       ::close(listen_sd);
179       listen_sd = -1;
180       return r;
181   }
182
183   // what port did we get?
184   sockaddr_storage ss;
185   socklen_t llen = sizeof(ss);
186   rc = getsockname(listen_sd, (sockaddr*)&ss, &llen);
187   if (rc < 0) {
188     rc = -errno;
189     lderr(msgr->cct) << __func__ << " failed getsockname: " 
190                      << cpp_strerror(rc) << dendl;
191     ::close(listen_sd);
192     listen_sd = -1;
193     return rc;
194   }
195   listen_addr.set_sockaddr((sockaddr*)&ss);
196   
197   if (msgr->cct->_conf->ms_tcp_rcvbuf) {
198     int size = msgr->cct->_conf->ms_tcp_rcvbuf;
199     rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_RCVBUF, 
200                         (void*)&size, sizeof(size));
201     if (rc < 0)  {
202       rc = -errno;
203       lderr(msgr->cct) <<  __func__ << "  failed to set SO_RCVBUF to " 
204                        << size << ": " << cpp_strerror(rc) << dendl;
205       ::close(listen_sd);
206       listen_sd = -1;
207       return rc;
208     }
209   }
210
211   ldout(msgr->cct,10) <<  __func__ << " bound to " << listen_addr << dendl;
212
213   // listen!
214   rc = ::listen(listen_sd, msgr->cct->_conf->ms_tcp_listen_backlog);
215   if (rc < 0) {
216     rc = -errno;
217     lderr(msgr->cct) <<  __func__ << " unable to listen on " << listen_addr
218                      << ": " << cpp_strerror(rc) << dendl;
219     ::close(listen_sd);
220     listen_sd = -1;
221     return rc;
222   }
223   
224   msgr->set_myaddr(bind_addr);
225   if (bind_addr != entity_addr_t())
226     msgr->learned_addr(bind_addr);
227   else
228     assert(msgr->get_need_addr());  // should still be true.
229
230   if (msgr->get_myaddr().get_port() == 0) {
231     msgr->set_myaddr(listen_addr);
232   }
233   entity_addr_t addr = msgr->get_myaddr();
234   addr.nonce = nonce;
235   msgr->set_myaddr(addr);
236
237   msgr->init_local_connection();
238
239   rc = create_selfpipe(&shutdown_rd_fd, &shutdown_wr_fd);
240   if (rc < 0) {
241     lderr(msgr->cct) <<  __func__ << " unable to create signalling pipe " << listen_addr
242                      << ": " << cpp_strerror(rc) << dendl;
243     return rc;
244   }
245
246   ldout(msgr->cct,1) <<  __func__ << " my_inst.addr is " << msgr->get_myaddr()
247                      << " need_addr=" << msgr->get_need_addr() << dendl;
248   return 0;
249 }
250
251 int Accepter::rebind(const set<int>& avoid_ports)
252 {
253   ldout(msgr->cct,1) << __func__ << " avoid " << avoid_ports << dendl;
254   
255   entity_addr_t addr = msgr->get_myaddr();
256   set<int> new_avoid = avoid_ports;
257   new_avoid.insert(addr.get_port());
258   addr.set_port(0);
259
260   // adjust the nonce; we want our entity_addr_t to be truly unique.
261   nonce += 1000000;
262   msgr->my_inst.addr.nonce = nonce;
263   ldout(msgr->cct,10) << __func__ << " new nonce " << nonce << " and inst " 
264                         << msgr->my_inst << dendl;
265
266   ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl;
267   int r = bind(addr, new_avoid);
268   if (r == 0)
269     start();
270   return r;
271 }
272
273 int Accepter::start()
274 {
275   ldout(msgr->cct,1) << __func__ << dendl;
276
277   // start thread
278   create("ms_accepter");
279
280   return 0;
281 }
282
283 void *Accepter::entry()
284 {
285   ldout(msgr->cct,1) << __func__ << " start" << dendl;
286   
287   int errors = 0;
288   int ch;
289
290   struct pollfd pfd[2];
291   memset(pfd, 0, sizeof(pfd));
292
293   pfd[0].fd = listen_sd;
294   pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
295   pfd[1].fd = shutdown_rd_fd;
296   pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
297   while (!done) {
298     ldout(msgr->cct,20) << __func__ << " calling poll for sd:" << listen_sd << dendl;
299     int r = poll(pfd, 2, -1);
300     if (r < 0) {
301       if (errno == EINTR) {
302         continue;
303       }
304       ldout(msgr->cct,1) << __func__ << " poll got error"  
305                           << " errno " << errno << " " << cpp_strerror(errno) << dendl;
306       break;
307     }
308     ldout(msgr->cct,10) << __func__ << " poll returned oke: " << r << dendl;
309     ldout(msgr->cct,20) << __func__ <<  " pfd.revents[0]=" << pfd[0].revents << dendl;
310     ldout(msgr->cct,20) << __func__ <<  " pfd.revents[1]=" << pfd[1].revents << dendl;
311
312     if (pfd[0].revents & (POLLERR | POLLNVAL | POLLHUP)) {
313       ldout(msgr->cct,1) << __func__ << " poll got errors in revents "  
314                          <<  pfd[0].revents << dendl;
315       break;
316     }
317     if (pfd[1].revents & (POLLIN | POLLERR | POLLNVAL | POLLHUP)) {
318       // We got "signaled" to exit the poll
319       // clean the selfpipe
320       if (::read(shutdown_rd_fd, &ch, 1) == -1) {
321         if (errno != EAGAIN)
322           ldout(msgr->cct,1) << __func__ << " Cannot read selfpipe: "
323                               << " errno " << errno << " " << cpp_strerror(errno) << dendl;
324         }
325       break;
326     }
327     if (done) break;
328
329     // accept
330     sockaddr_storage ss;
331     socklen_t slen = sizeof(ss);
332     int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen);
333     if (sd >= 0) {
334       int r = set_close_on_exec(sd);
335       if (r) {
336         ldout(msgr->cct,1) << __func__ << " set_close_on_exec() failed "
337               << cpp_strerror(r) << dendl;
338       }
339       errors = 0;
340       ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl;
341       
342       msgr->add_accept_pipe(sd);
343     } else {
344       ldout(msgr->cct,0) << __func__ << " no incoming connection?  sd = " << sd
345               << " errno " << errno << " " << cpp_strerror(errno) << dendl;
346       if (++errors > 4)
347         break;
348     }
349   }
350
351   ldout(msgr->cct,20) << __func__ << " closing" << dendl;
352   // socket is closed right after the thread has joined.
353   // closing it here might race
354   if (shutdown_rd_fd >= 0) {
355     ::close(shutdown_rd_fd);
356     shutdown_rd_fd = -1;
357   }
358
359   ldout(msgr->cct,10) << __func__ << " stopping" << dendl;
360   return 0;
361 }
362
363 void Accepter::stop()
364 {
365   done = true;
366   ldout(msgr->cct,10) << __func__ << " accept listening on: " << listen_sd << dendl;
367
368   if (shutdown_wr_fd < 0)
369     return;
370
371   // Send a byte to the shutdown pipe that the thread is listening to
372   char buf[1] = { 0x0 };
373   int ret = safe_write(shutdown_wr_fd, buf, 1);
374   if (ret < 0) {
375     ldout(msgr->cct,1) << __func__ << "close failed: "
376              << " errno " << errno << " " << cpp_strerror(errno) << dendl;
377   } else {
378     ldout(msgr->cct,15) << __func__ << " signaled poll" << dendl;
379   }
380   VOID_TEMP_FAILURE_RETRY(close(shutdown_wr_fd));
381   shutdown_wr_fd = -1;
382
383   // wait for thread to stop before closing the socket, to avoid
384   // racing against fd re-use.
385   if (is_started()) {
386     ldout(msgr->cct,5) << __func__ << " wait for thread to join." << dendl;
387     join();
388   }
389
390   if (listen_sd >= 0) {
391     if (::close(listen_sd) < 0) {
392       ldout(msgr->cct,1) << __func__ << "close listen_sd failed: "
393               << " errno " << errno << " " << cpp_strerror(errno) << dendl;
394     }
395     listen_sd = -1;
396   }
397   if (shutdown_rd_fd >= 0) {
398     if (::close(shutdown_rd_fd) < 0) {
399       ldout(msgr->cct,1) << __func__ << "close shutdown_rd_fd failed: "
400               << " errno " << errno << " " << cpp_strerror(errno) << dendl;
401     }
402     shutdown_rd_fd = -1;
403   }
404   done = false;
405 }
406
407
408
409