Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / common / admin_socket.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2011 New Dream Network
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15 #include "common/admin_socket.h"
16 #include "common/admin_socket_client.h"
17 #include "common/errno.h"
18 #include "common/pipe.h"
19 #include "common/safe_io.h"
20 #include "common/version.h"
21 #include "include/compat.h"
22
23 #include <poll.h>
24 #include <sys/un.h>
25
26 // re-include our assert to clobber the system one; fix dout:
27 #include "include/assert.h"
28
29 #define dout_subsys ceph_subsys_asok
30 #undef dout_prefix
31 #define dout_prefix *_dout << "asok(" << (void*)m_cct << ") "
32
33
34 using std::ostringstream;
35
36 /*
37  * UNIX domain sockets created by an application persist even after that
38  * application closes, unless they're explicitly unlinked. This is because the
39  * directory containing the socket keeps a reference to the socket.
40  *
41  * This code makes things a little nicer by unlinking those dead sockets when
42  * the application exits normally.
43  */
44 static pthread_mutex_t cleanup_lock = PTHREAD_MUTEX_INITIALIZER;
45 static std::vector <const char*> cleanup_files;
46 static bool cleanup_atexit = false;
47
48 static void remove_cleanup_file(const char *file)
49 {
50   pthread_mutex_lock(&cleanup_lock);
51   VOID_TEMP_FAILURE_RETRY(unlink(file));
52   for (std::vector <const char*>::iterator i = cleanup_files.begin();
53        i != cleanup_files.end(); ++i) {
54     if (strcmp(file, *i) == 0) {
55       free((void*)*i);
56       cleanup_files.erase(i);
57       break;
58     }
59   }
60   pthread_mutex_unlock(&cleanup_lock);
61 }
62
63 static void remove_all_cleanup_files()
64 {
65   pthread_mutex_lock(&cleanup_lock);
66   for (std::vector <const char*>::iterator i = cleanup_files.begin();
67        i != cleanup_files.end(); ++i) {
68     VOID_TEMP_FAILURE_RETRY(unlink(*i));
69     free((void*)*i);
70   }
71   cleanup_files.clear();
72   pthread_mutex_unlock(&cleanup_lock);
73 }
74
75 static void add_cleanup_file(const char *file)
76 {
77   char *fname = strdup(file);
78   if (!fname)
79     return;
80   pthread_mutex_lock(&cleanup_lock);
81   cleanup_files.push_back(fname);
82   if (!cleanup_atexit) {
83     atexit(remove_all_cleanup_files);
84     cleanup_atexit = true;
85   }
86   pthread_mutex_unlock(&cleanup_lock);
87 }
88
89
90 AdminSocket::AdminSocket(CephContext *cct)
91   : m_cct(cct),
92     m_sock_fd(-1),
93     m_shutdown_rd_fd(-1),
94     m_shutdown_wr_fd(-1),
95     in_hook(false),
96     m_lock("AdminSocket::m_lock"),
97     m_version_hook(NULL),
98     m_help_hook(NULL),
99     m_getdescs_hook(NULL)
100 {
101 }
102
103 AdminSocket::~AdminSocket()
104 {
105   shutdown();
106 }
107
108 /*
109  * This thread listens on the UNIX domain socket for incoming connections.
110  * It only handles one connection at a time at the moment. All I/O is nonblocking,
111  * so that we can implement sensible timeouts. [TODO: make all I/O nonblocking]
112  *
113  * This thread also listens to m_shutdown_rd_fd. If there is any data sent to this
114  * pipe, the thread terminates itself gracefully, allowing the
115  * AdminSocketConfigObs class to join() it.
116  */
117
118 #define PFL_SUCCESS ((void*)(intptr_t)0)
119 #define PFL_FAIL ((void*)(intptr_t)1)
120
121 std::string AdminSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
122 {
123   int pipefd[2];
124   int ret = pipe_cloexec(pipefd);
125   if (ret < 0) {
126     ostringstream oss;
127     oss << "AdminSocket::create_shutdown_pipe error: " << cpp_strerror(ret);
128     return oss.str();
129   }
130   
131   *pipe_rd = pipefd[0];
132   *pipe_wr = pipefd[1];
133   return "";
134 }
135
136 std::string AdminSocket::destroy_shutdown_pipe()
137 {
138   // Send a byte to the shutdown pipe that the thread is listening to
139   char buf[1] = { 0x0 };
140   int ret = safe_write(m_shutdown_wr_fd, buf, sizeof(buf));
141
142   // Close write end
143   VOID_TEMP_FAILURE_RETRY(close(m_shutdown_wr_fd));
144   m_shutdown_wr_fd = -1;
145
146   if (ret != 0) {
147     ostringstream oss;
148     oss << "AdminSocket::destroy_shutdown_pipe error: failed to write"
149       "to thread shutdown pipe: error " << ret;
150     return oss.str();
151   }
152
153   join();
154
155   // Close read end. Doing this before join() blocks the listenter and prevents
156   // joining.
157   VOID_TEMP_FAILURE_RETRY(close(m_shutdown_rd_fd));
158   m_shutdown_rd_fd = -1;
159
160   return "";
161 }
162
163 std::string AdminSocket::bind_and_listen(const std::string &sock_path, int *fd)
164 {
165   ldout(m_cct, 5) << "bind_and_listen " << sock_path << dendl;
166
167   struct sockaddr_un address;
168   if (sock_path.size() > sizeof(address.sun_path) - 1) {
169     ostringstream oss;
170     oss << "AdminSocket::bind_and_listen: "
171         << "The UNIX domain socket path " << sock_path << " is too long! The "
172         << "maximum length on this system is "
173         << (sizeof(address.sun_path) - 1);
174     return oss.str();
175   }
176   int sock_fd = socket(PF_UNIX, SOCK_STREAM, 0);
177   if (sock_fd < 0) {
178     int err = errno;
179     ostringstream oss;
180     oss << "AdminSocket::bind_and_listen: "
181         << "failed to create socket: " << cpp_strerror(err);
182     return oss.str();
183   }
184   int r = fcntl(sock_fd, F_SETFD, FD_CLOEXEC);
185   if (r < 0) {
186     r = errno;
187     VOID_TEMP_FAILURE_RETRY(::close(sock_fd));
188     ostringstream oss;
189     oss << "AdminSocket::bind_and_listen: failed to fcntl on socket: " << cpp_strerror(r);
190     return oss.str();
191   }
192   memset(&address, 0, sizeof(struct sockaddr_un));
193   address.sun_family = AF_UNIX;
194   snprintf(address.sun_path, sizeof(address.sun_path),
195            "%s", sock_path.c_str());
196   if (::bind(sock_fd, (struct sockaddr*)&address,
197            sizeof(struct sockaddr_un)) != 0) {
198     int err = errno;
199     if (err == EADDRINUSE) {
200       AdminSocketClient client(sock_path);
201       bool ok;
202       client.ping(&ok);
203       if (ok) {
204         ldout(m_cct, 20) << "socket " << sock_path << " is in use" << dendl;
205         err = EEXIST;
206       } else {
207         ldout(m_cct, 20) << "unlink stale file " << sock_path << dendl;
208         VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
209         if (::bind(sock_fd, (struct sockaddr*)&address,
210                  sizeof(struct sockaddr_un)) == 0) {
211           err = 0;
212         } else {
213           err = errno;
214         }
215       }
216     }
217     if (err != 0) {
218       ostringstream oss;
219       oss << "AdminSocket::bind_and_listen: "
220           << "failed to bind the UNIX domain socket to '" << sock_path
221           << "': " << cpp_strerror(err);
222       close(sock_fd);
223       return oss.str();
224     }
225   }
226   if (listen(sock_fd, 5) != 0) {
227     int err = errno;
228     ostringstream oss;
229     oss << "AdminSocket::bind_and_listen: "
230           << "failed to listen to socket: " << cpp_strerror(err);
231     close(sock_fd);
232     VOID_TEMP_FAILURE_RETRY(unlink(sock_path.c_str()));
233     return oss.str();
234   }
235   *fd = sock_fd;
236   return "";
237 }
238
239 void* AdminSocket::entry()
240 {
241   ldout(m_cct, 5) << "entry start" << dendl;
242   while (true) {
243     struct pollfd fds[2];
244     memset(fds, 0, sizeof(fds));
245     fds[0].fd = m_sock_fd;
246     fds[0].events = POLLIN | POLLRDBAND;
247     fds[1].fd = m_shutdown_rd_fd;
248     fds[1].events = POLLIN | POLLRDBAND;
249
250     int ret = poll(fds, 2, -1);
251     if (ret < 0) {
252       int err = errno;
253       if (err == EINTR) {
254         continue;
255       }
256       lderr(m_cct) << "AdminSocket: poll(2) error: '"
257                    << cpp_strerror(err) << dendl;
258       return PFL_FAIL;
259     }
260
261     if (fds[0].revents & POLLIN) {
262       // Send out some data
263       do_accept();
264     }
265     if (fds[1].revents & POLLIN) {
266       // Parent wants us to shut down
267       return PFL_SUCCESS;
268     }
269   }
270   ldout(m_cct, 5) << "entry exit" << dendl;
271 }
272
273 void AdminSocket::chown(uid_t uid, gid_t gid)
274 {
275   if (m_sock_fd >= 0) {
276     int r = ::chown(m_path.c_str(), uid, gid);
277     if (r < 0) {
278       r = -errno;
279       lderr(m_cct) << "AdminSocket: failed to chown socket: "
280                    << cpp_strerror(r) << dendl;
281     }
282   }
283 }
284
285 void AdminSocket::chmod(mode_t mode)
286 {
287   if (m_sock_fd >= 0) {
288     int r = ::chmod(m_path.c_str(), mode);
289     if (r < 0) {
290       r = -errno;
291       lderr(m_cct) << "AdminSocket: failed to chmod socket: "
292                    << cpp_strerror(r) << dendl;
293     }
294   }
295 }
296
297 bool AdminSocket::do_accept()
298 {
299   struct sockaddr_un address;
300   socklen_t address_length = sizeof(address);
301   ldout(m_cct, 30) << "AdminSocket: calling accept" << dendl;
302   int connection_fd = accept(m_sock_fd, (struct sockaddr*) &address,
303                              &address_length);
304   ldout(m_cct, 30) << "AdminSocket: finished accept" << dendl;
305   if (connection_fd < 0) {
306     int err = errno;
307     lderr(m_cct) << "AdminSocket: do_accept error: '"
308                            << cpp_strerror(err) << dendl;
309     return false;
310   }
311
312   char cmd[1024];
313   unsigned pos = 0;
314   string c;
315   while (1) {
316     int ret = safe_read(connection_fd, &cmd[pos], 1);
317     if (ret <= 0) {
318       if (ret < 0) {
319         lderr(m_cct) << "AdminSocket: error reading request code: "
320                      << cpp_strerror(ret) << dendl;
321       }
322       VOID_TEMP_FAILURE_RETRY(close(connection_fd));
323       return false;
324     }
325     //ldout(m_cct, 0) << "AdminSocket read byte " << (int)cmd[pos] << " pos " << pos << dendl;
326     if (cmd[0] == '\0') {
327       // old protocol: __be32
328       if (pos == 3 && cmd[0] == '\0') {
329         switch (cmd[3]) {
330         case 0:
331           c = "0";
332           break;
333         case 1:
334           c = "perfcounters_dump";
335           break;
336         case 2:
337           c = "perfcounters_schema";
338           break;
339         default:
340           c = "foo";
341           break;
342         }
343         break;
344       }
345     } else {
346       // new protocol: null or \n terminated string
347       if (cmd[pos] == '\n' || cmd[pos] == '\0') {
348         cmd[pos] = '\0';
349         c = cmd;
350         break;
351       }
352     }
353     if (++pos >= sizeof(cmd)) {
354       lderr(m_cct) << "AdminSocket: error reading request too long" << dendl;
355       VOID_TEMP_FAILURE_RETRY(close(connection_fd));
356       return false;
357     }
358   }
359
360   bool rval = false;
361
362   map<string, cmd_vartype> cmdmap;
363   string format;
364   vector<string> cmdvec;
365   stringstream errss;
366   cmdvec.push_back(cmd);
367   if (!cmdmap_from_json(cmdvec, &cmdmap, errss)) {
368     ldout(m_cct, 0) << "AdminSocket: " << errss.rdbuf() << dendl;
369     VOID_TEMP_FAILURE_RETRY(close(connection_fd));
370     return false;
371   }
372   cmd_getval(m_cct, cmdmap, "format", format);
373   if (format != "json" && format != "json-pretty" &&
374       format != "xml" && format != "xml-pretty")
375     format = "json-pretty";
376   cmd_getval(m_cct, cmdmap, "prefix", c);
377
378   m_lock.Lock();
379   map<string,AdminSocketHook*>::iterator p;
380   string match = c;
381   while (match.size()) {
382     p = m_hooks.find(match);
383     if (p != m_hooks.end())
384       break;
385     
386     // drop right-most word
387     size_t pos = match.rfind(' ');
388     if (pos == std::string::npos) {
389       match.clear();  // we fail
390       break;
391     } else {
392       match.resize(pos);
393     }
394   }
395
396   bufferlist out;
397   if (p == m_hooks.end()) {
398     lderr(m_cct) << "AdminSocket: request '" << c << "' not defined" << dendl;
399   } else {
400     string args;
401     if (match != c) {
402       args = c.substr(match.length() + 1);
403     }
404
405     // Drop lock to avoid cycles in cases where the hook takes
406     // the same lock that was held during calls to register/unregister,
407     // and set in_hook to allow unregister to wait for us before
408     // removing this hook.
409     in_hook = true;
410     auto match_hook = p->second;
411     m_lock.Unlock();
412     bool success = match_hook->call(match, cmdmap, format, out);
413     m_lock.Lock();
414     in_hook = false;
415     in_hook_cond.Signal();
416
417     if (!success) {
418       ldout(m_cct, 0) << "AdminSocket: request '" << match << "' args '" << args
419                       << "' to " << p->second << " failed" << dendl;
420       out.append("failed");
421     } else {
422       ldout(m_cct, 5) << "AdminSocket: request '" << match << "' '" << args
423                        << "' to " << p->second
424                        << " returned " << out.length() << " bytes" << dendl;
425     }
426     uint32_t len = htonl(out.length());
427     int ret = safe_write(connection_fd, &len, sizeof(len));
428     if (ret < 0) {
429       lderr(m_cct) << "AdminSocket: error writing response length "
430                    << cpp_strerror(ret) << dendl;
431     } else {
432       if (out.write_fd(connection_fd) >= 0)
433         rval = true;
434     }
435   }
436   m_lock.Unlock();
437
438   VOID_TEMP_FAILURE_RETRY(close(connection_fd));
439   return rval;
440 }
441
442 int AdminSocket::register_command(std::string command, std::string cmddesc, AdminSocketHook *hook, std::string help)
443 {
444   int ret;
445   m_lock.Lock();
446   if (m_hooks.count(command)) {
447     ldout(m_cct, 5) << "register_command " << command << " hook " << hook << " EEXIST" << dendl;
448     ret = -EEXIST;
449   } else {
450     ldout(m_cct, 5) << "register_command " << command << " hook " << hook << dendl;
451     m_hooks[command] = hook;
452     m_descs[command] = cmddesc;
453     m_help[command] = help;
454     ret = 0;
455   }  
456   m_lock.Unlock();
457   return ret;
458 }
459
460 int AdminSocket::unregister_command(std::string command)
461 {
462   int ret;
463   m_lock.Lock();
464   if (m_hooks.count(command)) {
465     ldout(m_cct, 5) << "unregister_command " << command << dendl;
466     m_hooks.erase(command);
467     m_descs.erase(command);
468     m_help.erase(command);
469
470     // If we are currently processing a command, wait for it to
471     // complete in case it referenced the hook that we are
472     // unregistering.
473     if (in_hook) {
474       in_hook_cond.Wait(m_lock);
475     }
476
477     ret = 0;
478   } else {
479     ldout(m_cct, 5) << "unregister_command " << command << " ENOENT" << dendl;
480     ret = -ENOENT;
481   }  
482   m_lock.Unlock();
483   return ret;
484 }
485
486 class VersionHook : public AdminSocketHook {
487 public:
488   bool call(std::string command, cmdmap_t &cmdmap, std::string format,
489                     bufferlist& out) override {
490     if (command == "0") {
491       out.append(CEPH_ADMIN_SOCK_VERSION);
492     } else {
493       JSONFormatter jf;
494       jf.open_object_section("version");
495       if (command == "version") {
496         jf.dump_string("version", ceph_version_to_str());
497         jf.dump_string("release", ceph_release_name(ceph_release()));
498         jf.dump_string("release_type", ceph_release_type());
499       } else if (command == "git_version") {
500         jf.dump_string("git_version", git_version_to_str());
501       }
502       ostringstream ss;
503       jf.close_section();
504       jf.flush(ss);
505       out.append(ss.str());
506     }
507     return true;
508   }
509 };
510
511 class HelpHook : public AdminSocketHook {
512   AdminSocket *m_as;
513 public:
514   explicit HelpHook(AdminSocket *as) : m_as(as) {}
515   bool call(string command, cmdmap_t &cmdmap, string format, bufferlist& out) override {
516     Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
517     f->open_object_section("help");
518     for (map<string,string>::iterator p = m_as->m_help.begin();
519          p != m_as->m_help.end();
520          ++p) {
521       if (p->second.length())
522         f->dump_string(p->first.c_str(), p->second);
523     }
524     f->close_section();
525     ostringstream ss;
526     f->flush(ss);
527     out.append(ss.str());
528     delete f;
529     return true;
530   }
531 };
532
533 class GetdescsHook : public AdminSocketHook {
534   AdminSocket *m_as;
535 public:
536   explicit GetdescsHook(AdminSocket *as) : m_as(as) {}
537   bool call(string command, cmdmap_t &cmdmap, string format, bufferlist& out) override {
538     int cmdnum = 0;
539     JSONFormatter jf;
540     jf.open_object_section("command_descriptions");
541     for (map<string,string>::iterator p = m_as->m_descs.begin();
542          p != m_as->m_descs.end();
543          ++p) {
544       ostringstream secname;
545       secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
546       dump_cmd_and_help_to_json(&jf,
547                                 secname.str().c_str(),
548                                 p->second.c_str(),
549                                 m_as->m_help[p->first]);
550       cmdnum++;
551     }
552     jf.close_section(); // command_descriptions
553     jf.enable_line_break();
554     ostringstream ss;
555     jf.flush(ss);
556     out.append(ss.str());
557     return true;
558   }
559 };
560
561 bool AdminSocket::init(const std::string &path)
562 {
563   ldout(m_cct, 5) << "init " << path << dendl;
564
565   /* Set up things for the new thread */
566   std::string err;
567   int pipe_rd = -1, pipe_wr = -1;
568   err = create_shutdown_pipe(&pipe_rd, &pipe_wr);
569   if (!err.empty()) {
570     lderr(m_cct) << "AdminSocketConfigObs::init: error: " << err << dendl;
571     return false;
572   }
573   int sock_fd;
574   err = bind_and_listen(path, &sock_fd);
575   if (!err.empty()) {
576     lderr(m_cct) << "AdminSocketConfigObs::init: failed: " << err << dendl;
577     close(pipe_rd);
578     close(pipe_wr);
579     return false;
580   }
581
582   /* Create new thread */
583   m_sock_fd = sock_fd;
584   m_shutdown_rd_fd = pipe_rd;
585   m_shutdown_wr_fd = pipe_wr;
586   m_path = path;
587
588   m_version_hook = new VersionHook;
589   register_command("0", "0", m_version_hook, "");
590   register_command("version", "version", m_version_hook, "get ceph version");
591   register_command("git_version", "git_version", m_version_hook, "get git sha1");
592   m_help_hook = new HelpHook(this);
593   register_command("help", "help", m_help_hook, "list available commands");
594   m_getdescs_hook = new GetdescsHook(this);
595   register_command("get_command_descriptions", "get_command_descriptions",
596                    m_getdescs_hook, "list available commands");
597
598   create("admin_socket");
599   add_cleanup_file(m_path.c_str());
600   return true;
601 }
602
603 void AdminSocket::shutdown()
604 {
605   std::string err;
606
607   // Under normal operation this is unlikely to occur.  However for some unit
608   // tests, some object members are not initialized and so cannot be deleted
609   // without fault.
610   if (m_shutdown_wr_fd < 0)
611     return;
612
613   ldout(m_cct, 5) << "shutdown" << dendl;
614
615   err = destroy_shutdown_pipe();
616   if (!err.empty()) {
617     lderr(m_cct) << "AdminSocket::shutdown: error: " << err << dendl;
618   }
619
620   VOID_TEMP_FAILURE_RETRY(close(m_sock_fd));
621
622   unregister_command("version");
623   unregister_command("git_version");
624   unregister_command("0");
625   delete m_version_hook;
626
627   unregister_command("help");
628   delete m_help_hook;
629
630   unregister_command("get_command_descriptions");
631   delete m_getdescs_hook;
632
633   remove_cleanup_file(m_path.c_str());
634   m_path.clear();
635 }