Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mds / MDSDaemon.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  *
13  */
14
15 #include <unistd.h>
16
17 #include "include/compat.h"
18 #include "include/types.h"
19 #include "include/str_list.h"
20
21 #include "common/Clock.h"
22 #include "common/HeartbeatMap.h"
23 #include "common/Timer.h"
24 #include "common/backport14.h"
25 #include "common/ceph_argparse.h"
26 #include "common/config.h"
27 #include "common/entity_name.h"
28 #include "common/errno.h"
29 #include "common/perf_counters.h"
30 #include "common/signal.h"
31 #include "common/version.h"
32
33 #include "global/signal_handler.h"
34
35 #include "msg/Messenger.h"
36 #include "mon/MonClient.h"
37
38 #include "osdc/Objecter.h"
39
40 #include "MDSMap.h"
41
42 #include "MDSDaemon.h"
43 #include "Server.h"
44 #include "Locker.h"
45
46 #include "SnapServer.h"
47 #include "SnapClient.h"
48
49 #include "events/ESession.h"
50 #include "events/ESubtreeMap.h"
51
52 #include "messages/MMDSMap.h"
53
54 #include "messages/MGenericMessage.h"
55
56 #include "messages/MMonCommand.h"
57 #include "messages/MCommand.h"
58 #include "messages/MCommandReply.h"
59
60 #include "auth/AuthAuthorizeHandler.h"
61 #include "auth/RotatingKeyRing.h"
62 #include "auth/KeyRing.h"
63
64 #include "perfglue/cpu_profiler.h"
65 #include "perfglue/heap_profiler.h"
66
67 #define dout_context g_ceph_context
68 #define dout_subsys ceph_subsys_mds
69 #undef dout_prefix
70 #define dout_prefix *_dout << "mds." << name << ' '
71
72 // cons/des
73 MDSDaemon::MDSDaemon(const std::string &n, Messenger *m, MonClient *mc) :
74   Dispatcher(m->cct),
75   mds_lock("MDSDaemon::mds_lock"),
76   stopping(false),
77   timer(m->cct, mds_lock),
78   beacon(m->cct, mc, n),
79   authorize_handler_cluster_registry(new AuthAuthorizeHandlerRegistry(m->cct,
80                                                                       m->cct->_conf->auth_supported.empty() ?
81                                                                       m->cct->_conf->auth_cluster_required :
82                                                                       m->cct->_conf->auth_supported)),
83   authorize_handler_service_registry(new AuthAuthorizeHandlerRegistry(m->cct,
84                                                                       m->cct->_conf->auth_supported.empty() ?
85                                                                       m->cct->_conf->auth_service_required :
86                                                                       m->cct->_conf->auth_supported)),
87   name(n),
88   messenger(m),
89   monc(mc),
90   mgrc(m->cct, m),
91   log_client(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
92   mds_rank(NULL),
93   asok_hook(NULL)
94 {
95   orig_argc = 0;
96   orig_argv = NULL;
97
98   clog = log_client.create_channel();
99
100   monc->set_messenger(messenger);
101
102   mdsmap = new MDSMap;
103 }
104
105 MDSDaemon::~MDSDaemon() {
106   Mutex::Locker lock(mds_lock);
107
108   delete mds_rank;
109   mds_rank = NULL;
110   delete mdsmap;
111   mdsmap = NULL;
112
113   delete authorize_handler_service_registry;
114   delete authorize_handler_cluster_registry;
115 }
116
117 class MDSSocketHook : public AdminSocketHook {
118   MDSDaemon *mds;
119 public:
120   explicit MDSSocketHook(MDSDaemon *m) : mds(m) {}
121   bool call(std::string command, cmdmap_t& cmdmap, std::string format,
122             bufferlist& out) override {
123     stringstream ss;
124     bool r = mds->asok_command(command, cmdmap, format, ss);
125     out.append(ss);
126     return r;
127   }
128 };
129
130 bool MDSDaemon::asok_command(string command, cmdmap_t& cmdmap, string format,
131                     ostream& ss)
132 {
133   dout(1) << "asok_command: " << command << " (starting...)" << dendl;
134
135   Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
136   bool handled = false;
137   if (command == "status") {
138     dump_status(f);
139     handled = true;
140   } else {
141     if (mds_rank == NULL) {
142       dout(1) << "Can't run that command on an inactive MDS!" << dendl;
143       f->dump_string("error", "mds_not_active");
144     } else {
145       handled = mds_rank->handle_asok_command(command, cmdmap, f, ss);
146     }
147   }
148   f->flush(ss);
149   delete f;
150
151   dout(1) << "asok_command: " << command << " (complete)" << dendl;
152
153   return handled;
154 }
155
156 void MDSDaemon::dump_status(Formatter *f)
157 {
158   f->open_object_section("status");
159   f->dump_stream("cluster_fsid") << monc->get_fsid();
160   if (mds_rank) {
161     f->dump_int("whoami", mds_rank->get_nodeid());
162   } else {
163     f->dump_int("whoami", MDS_RANK_NONE);
164   }
165
166   f->dump_int("id", monc->get_global_id());
167   f->dump_string("want_state", ceph_mds_state_name(beacon.get_want_state()));
168   f->dump_string("state", ceph_mds_state_name(mdsmap->get_state_gid(mds_gid_t(
169             monc->get_global_id()))));
170   if (mds_rank) {
171     Mutex::Locker l(mds_lock);
172     mds_rank->dump_status(f);
173   }
174
175   f->dump_unsigned("mdsmap_epoch", mdsmap->get_epoch());
176   if (mds_rank) {
177     f->dump_unsigned("osdmap_epoch", mds_rank->get_osd_epoch());
178     f->dump_unsigned("osdmap_epoch_barrier", mds_rank->get_osd_epoch_barrier());
179   } else {
180     f->dump_unsigned("osdmap_epoch", 0);
181     f->dump_unsigned("osdmap_epoch_barrier", 0);
182   }
183   f->close_section(); // status
184 }
185
186 void MDSDaemon::set_up_admin_socket()
187 {
188   int r;
189   AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
190   assert(asok_hook == nullptr);
191   asok_hook = new MDSSocketHook(this);
192   r = admin_socket->register_command("status", "status", asok_hook,
193                                      "high-level status of MDS");
194   assert(r == 0);
195   r = admin_socket->register_command("dump_ops_in_flight",
196                                      "dump_ops_in_flight", asok_hook,
197                                      "show the ops currently in flight");
198   assert(r == 0);
199   r = admin_socket->register_command("ops",
200                                      "ops", asok_hook,
201                                      "show the ops currently in flight");
202   assert(r == 0);
203   r = admin_socket->register_command("dump_blocked_ops", "dump_blocked_ops",
204       asok_hook,
205       "show the blocked ops currently in flight");
206   assert(r == 0);
207   r = admin_socket->register_command("dump_historic_ops", "dump_historic_ops",
208                                      asok_hook,
209                                      "show slowest recent ops");
210   assert(r == 0);
211   r = admin_socket->register_command("dump_historic_ops_by_duration", "dump_historic_ops_by_duration",
212                                      asok_hook,
213                                      "show slowest recent ops, sorted by op duration");
214   assert(r == 0);
215   r = admin_socket->register_command("scrub_path",
216                                      "scrub_path name=path,type=CephString "
217                                      "name=scrubops,type=CephChoices,"
218                                      "strings=force|recursive|repair,n=N,req=false",
219                                      asok_hook,
220                                      "scrub an inode and output results");
221   assert(r == 0);
222   r = admin_socket->register_command("tag path",
223                                      "tag path name=path,type=CephString"
224                                      " name=tag,type=CephString",
225                                      asok_hook,
226                                      "Apply scrub tag recursively");
227    assert(r == 0);
228   r = admin_socket->register_command("flush_path",
229                                      "flush_path name=path,type=CephString",
230                                      asok_hook,
231                                      "flush an inode (and its dirfrags)");
232   assert(r == 0);
233   r = admin_socket->register_command("export dir",
234                                      "export dir "
235                                      "name=path,type=CephString "
236                                      "name=rank,type=CephInt",
237                                      asok_hook,
238                                      "migrate a subtree to named MDS");
239   assert(r == 0);
240   r = admin_socket->register_command("dump cache",
241                                      "dump cache name=path,type=CephString,req=false",
242                                      asok_hook,
243                                      "dump metadata cache (optionally to a file)");
244   assert(r == 0);
245   r = admin_socket->register_command("cache status",
246                                      "cache status",
247                                      asok_hook,
248                                      "show cache status");
249   assert(r == 0);
250   r = admin_socket->register_command("dump tree",
251                                      "dump tree "
252                                      "name=root,type=CephString,req=true "
253                                      "name=depth,type=CephInt,req=false ",
254                                      asok_hook,
255                                      "dump metadata cache for subtree");
256   assert(r == 0);
257   r = admin_socket->register_command("session evict",
258                                      "session evict name=client_id,type=CephString",
259                                      asok_hook,
260                                      "Evict a CephFS client");
261   assert(r == 0);
262   r = admin_socket->register_command("osdmap barrier",
263                                      "osdmap barrier name=target_epoch,type=CephInt",
264                                      asok_hook,
265                                      "Wait until the MDS has this OSD map epoch");
266   assert(r == 0);
267   r = admin_socket->register_command("session ls",
268                                      "session ls",
269                                      asok_hook,
270                                      "Enumerate connected CephFS clients");
271   assert(r == 0);
272   r = admin_socket->register_command("flush journal",
273                                      "flush journal",
274                                      asok_hook,
275                                      "Flush the journal to the backing store");
276   assert(r == 0);
277   r = admin_socket->register_command("force_readonly",
278                                      "force_readonly",
279                                      asok_hook,
280                                      "Force MDS to read-only mode");
281   assert(r == 0);
282   r = admin_socket->register_command("get subtrees",
283                                      "get subtrees",
284                                      asok_hook,
285                                      "Return the subtree map");
286   assert(r == 0);
287   r = admin_socket->register_command("dirfrag split",
288                                      "dirfrag split "
289                                      "name=path,type=CephString,req=true "
290                                      "name=frag,type=CephString,req=true "
291                                      "name=bits,type=CephInt,req=true ",
292                                      asok_hook,
293                                      "Fragment directory by path");
294   assert(r == 0);
295   r = admin_socket->register_command("dirfrag merge",
296                                      "dirfrag merge "
297                                      "name=path,type=CephString,req=true "
298                                      "name=frag,type=CephString,req=true",
299                                      asok_hook,
300                                      "De-fragment directory by path");
301   assert(r == 0);
302   r = admin_socket->register_command("dirfrag ls",
303                                      "dirfrag ls "
304                                      "name=path,type=CephString,req=true",
305                                      asok_hook,
306                                      "List fragments in directory");
307   assert(r == 0);
308 }
309
310 void MDSDaemon::clean_up_admin_socket()
311 {
312   AdminSocket *admin_socket = g_ceph_context->get_admin_socket();
313   admin_socket->unregister_command("status");
314   admin_socket->unregister_command("dump_ops_in_flight");
315   admin_socket->unregister_command("ops");
316   admin_socket->unregister_command("dump_blocked_ops");
317   admin_socket->unregister_command("dump_historic_ops");
318   admin_socket->unregister_command("dump_historic_ops_by_duration");
319   admin_socket->unregister_command("scrub_path");
320   admin_socket->unregister_command("tag path");
321   admin_socket->unregister_command("flush_path");
322   admin_socket->unregister_command("export dir");
323   admin_socket->unregister_command("dump cache");
324   admin_socket->unregister_command("cache status");
325   admin_socket->unregister_command("dump tree");
326   admin_socket->unregister_command("session evict");
327   admin_socket->unregister_command("osdmap barrier");
328   admin_socket->unregister_command("session ls");
329   admin_socket->unregister_command("flush journal");
330   admin_socket->unregister_command("force_readonly");
331   admin_socket->unregister_command("get subtrees");
332   admin_socket->unregister_command("dirfrag split");
333   admin_socket->unregister_command("dirfrag merge");
334   admin_socket->unregister_command("dirfrag ls");
335   delete asok_hook;
336   asok_hook = NULL;
337 }
338
339 const char** MDSDaemon::get_tracked_conf_keys() const
340 {
341   static const char* KEYS[] = {
342     "mds_op_complaint_time", "mds_op_log_threshold",
343     "mds_op_history_size", "mds_op_history_duration",
344     "mds_enable_op_tracker",
345     "mds_log_pause",
346     // clog & admin clog
347     "clog_to_monitors",
348     "clog_to_syslog",
349     "clog_to_syslog_facility",
350     "clog_to_syslog_level",
351     // PurgeQueue
352     "mds_max_purge_ops",
353     "mds_max_purge_ops_per_pg",
354     "mds_max_purge_files",
355     "clog_to_graylog",
356     "clog_to_graylog_host",
357     "clog_to_graylog_port",
358     "host",
359     "fsid",
360     NULL
361   };
362   return KEYS;
363 }
364
365 void MDSDaemon::handle_conf_change(const struct md_config_t *conf,
366                              const std::set <std::string> &changed)
367 {
368   // We may be called within mds_lock (via `tell`) or outwith the
369   // lock (via admin socket `config set`), so handle either case.
370   const bool initially_locked = mds_lock.is_locked_by_me();
371   if (!initially_locked) {
372     mds_lock.Lock();
373   }
374
375   if (changed.count("mds_op_complaint_time") ||
376       changed.count("mds_op_log_threshold")) {
377     if (mds_rank) {
378       mds_rank->op_tracker.set_complaint_and_threshold(conf->mds_op_complaint_time,
379                                              conf->mds_op_log_threshold);
380     }
381   }
382   if (changed.count("mds_op_history_size") ||
383       changed.count("mds_op_history_duration")) {
384     if (mds_rank) {
385       mds_rank->op_tracker.set_history_size_and_duration(conf->mds_op_history_size,
386                                                conf->mds_op_history_duration);
387     }
388   }
389   if (changed.count("mds_enable_op_tracker")) {
390     if (mds_rank) {
391       mds_rank->op_tracker.set_tracking(conf->mds_enable_op_tracker);
392     }
393   }
394   if (changed.count("clog_to_monitors") ||
395       changed.count("clog_to_syslog") ||
396       changed.count("clog_to_syslog_level") ||
397       changed.count("clog_to_syslog_facility") ||
398       changed.count("clog_to_graylog") ||
399       changed.count("clog_to_graylog_host") ||
400       changed.count("clog_to_graylog_port") ||
401       changed.count("host") ||
402       changed.count("fsid")) {
403     if (mds_rank) {
404       mds_rank->update_log_config();
405     }
406   }
407
408   if (!g_conf->mds_log_pause && changed.count("mds_log_pause")) {
409     if (mds_rank) {
410       mds_rank->mdlog->kick_submitter();
411     }
412   }
413
414   if (mds_rank) {
415     mds_rank->handle_conf_change(conf, changed);
416   }
417
418   if (!initially_locked) {
419     mds_lock.Unlock();
420   }
421 }
422
423
424 int MDSDaemon::init()
425 {
426   dout(10) << sizeof(MDSCacheObject) << "\tMDSCacheObject" << dendl;
427   dout(10) << sizeof(CInode) << "\tCInode" << dendl;
428   dout(10) << sizeof(elist<void*>::item) << "\t elist<>::item   *7=" << 7*sizeof(elist<void*>::item) << dendl;
429   dout(10) << sizeof(inode_t) << "\t inode_t " << dendl;
430   dout(10) << sizeof(nest_info_t) << "\t  nest_info_t " << dendl;
431   dout(10) << sizeof(frag_info_t) << "\t  frag_info_t " << dendl;
432   dout(10) << sizeof(SimpleLock) << "\t SimpleLock   *5=" << 5*sizeof(SimpleLock) << dendl;
433   dout(10) << sizeof(ScatterLock) << "\t ScatterLock  *3=" << 3*sizeof(ScatterLock) << dendl;
434   dout(10) << sizeof(CDentry) << "\tCDentry" << dendl;
435   dout(10) << sizeof(elist<void*>::item) << "\t elist<>::item" << dendl;
436   dout(10) << sizeof(SimpleLock) << "\t SimpleLock" << dendl;
437   dout(10) << sizeof(CDir) << "\tCDir " << dendl;
438   dout(10) << sizeof(elist<void*>::item) << "\t elist<>::item   *2=" << 2*sizeof(elist<void*>::item) << dendl;
439   dout(10) << sizeof(fnode_t) << "\t fnode_t " << dendl;
440   dout(10) << sizeof(nest_info_t) << "\t  nest_info_t *2" << dendl;
441   dout(10) << sizeof(frag_info_t) << "\t  frag_info_t *2" << dendl;
442   dout(10) << sizeof(Capability) << "\tCapability " << dendl;
443   dout(10) << sizeof(xlist<void*>::item) << "\t xlist<>::item   *2=" << 2*sizeof(xlist<void*>::item) << dendl;
444
445   messenger->add_dispatcher_tail(&beacon);
446   messenger->add_dispatcher_tail(this);
447
448   // get monmap
449   monc->set_messenger(messenger);
450
451   monc->set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD |
452                       CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_MGR);
453   int r = 0;
454   r = monc->init();
455   if (r < 0) {
456     derr << "ERROR: failed to get monmap: " << cpp_strerror(-r) << dendl;
457     mds_lock.Lock();
458     suicide();
459     mds_lock.Unlock();
460     return r;
461   }
462
463   // tell monc about log_client so it will know about mon session resets
464   monc->set_log_client(&log_client);
465
466   r = monc->authenticate();
467   if (r < 0) {
468     derr << "ERROR: failed to authenticate: " << cpp_strerror(-r) << dendl;
469     mds_lock.Lock();
470     suicide();
471     mds_lock.Unlock();
472     return r;
473   }
474
475   int rotating_auth_attempts = 0;
476   while (monc->wait_auth_rotating(30.0) < 0) {
477     if (++rotating_auth_attempts <= g_conf->max_rotating_auth_attempts) {
478       derr << "unable to obtain rotating service keys; retrying" << dendl;
479       continue;
480     }
481     derr << "ERROR: failed to refresh rotating keys, "
482          << "maximum retry time reached." << dendl;
483     mds_lock.Lock();
484     suicide();
485     mds_lock.Unlock();
486     return -ETIMEDOUT;
487   }
488
489   mgrc.init();
490   messenger->add_dispatcher_head(&mgrc);
491
492   mds_lock.Lock();
493   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) {
494     dout(4) << __func__ << ": terminated already, dropping out" << dendl;
495     mds_lock.Unlock();
496     return 0;
497   }
498
499   monc->sub_want("mdsmap", 0, 0);
500   monc->sub_want("mgrmap", 0, 0);
501   monc->renew_subs();
502
503   mds_lock.Unlock();
504
505   // Set up admin socket before taking mds_lock, so that ordering
506   // is consistent (later we take mds_lock within asok callbacks)
507   set_up_admin_socket();
508   g_conf->add_observer(this);
509   mds_lock.Lock();
510   if (beacon.get_want_state() == MDSMap::STATE_DNE) {
511     suicide();  // we could do something more graceful here
512     dout(4) << __func__ << ": terminated already, dropping out" << dendl;
513     mds_lock.Unlock();
514     return 0; 
515   }
516
517   timer.init();
518
519   beacon.init(mdsmap);
520   messenger->set_myname(entity_name_t::MDS(MDS_RANK_NONE));
521
522   // schedule tick
523   reset_tick();
524   mds_lock.Unlock();
525
526   return 0;
527 }
528
529 void MDSDaemon::reset_tick()
530 {
531   // cancel old
532   if (tick_event) timer.cancel_event(tick_event);
533
534   // schedule
535   tick_event = timer.add_event_after(
536     g_conf->mds_tick_interval,
537     new FunctionContext([this](int) {
538         assert(mds_lock.is_locked_by_me());
539         tick();
540       }));
541 }
542
543 void MDSDaemon::tick()
544 {
545   // reschedule
546   reset_tick();
547
548   // Call through to subsystems' tick functions
549   if (mds_rank) {
550     mds_rank->tick();
551   }
552 }
553
554 void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank,
555                                    int r, bufferlist outbl,
556                                    const std::string& outs)
557 {
558   Session *session = static_cast<Session *>(m->get_connection()->get_priv());
559   assert(session != NULL);
560   // If someone is using a closed session for sending commands (e.g.
561   // the ceph CLI) then we should feel free to clean up this connection
562   // as soon as we've sent them a response.
563   const bool live_session = mds_rank &&
564     mds_rank->sessionmap.get_session(session->info.inst.name) != nullptr
565     && session->get_state_seq() > 0;
566
567   if (!live_session) {
568     // This session only existed to issue commands, so terminate it
569     // as soon as we can.
570     assert(session->is_closed());
571     session->connection->mark_disposable();
572     session->put();
573   }
574
575   MCommandReply *reply = new MCommandReply(r, outs);
576   reply->set_tid(m->get_tid());
577   reply->set_data(outbl);
578   m->get_connection()->send_message(reply);
579 }
580
581 /* This function DOES put the passed message before returning*/
582 void MDSDaemon::handle_command(MCommand *m)
583 {
584   Session *session = static_cast<Session *>(m->get_connection()->get_priv());
585   assert(session != NULL);
586
587   int r = 0;
588   cmdmap_t cmdmap;
589   std::stringstream ss;
590   std::string outs;
591   bufferlist outbl;
592   Context *run_after = NULL;
593   bool need_reply = true;
594
595   if (!session->auth_caps.allow_all()) {
596     dout(1) << __func__
597       << ": received command from client without `tell` capability: "
598       << m->get_connection()->peer_addr << dendl;
599
600     ss << "permission denied";
601     r = -EPERM;
602   } else if (m->cmd.empty()) {
603     r = -EINVAL;
604     ss << "no command given";
605     outs = ss.str();
606   } else if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
607     r = -EINVAL;
608     outs = ss.str();
609   } else {
610     r = _handle_command(cmdmap, m, &outbl, &outs, &run_after, &need_reply);
611   }
612
613   if (need_reply) {
614     send_command_reply(m, mds_rank, r, outbl, outs);
615   }
616
617   if (run_after) {
618     run_after->complete(0);
619   }
620
621   m->put();
622 }
623
624
625 struct MDSCommand {
626   string cmdstring;
627   string helpstring;
628   string module;
629   string perm;
630   string availability;
631 } mds_commands[] = {
632
633 #define COMMAND(parsesig, helptext, module, perm, availability) \
634   {parsesig, helptext, module, perm, availability},
635
636 COMMAND("injectargs " \
637         "name=injected_args,type=CephString,n=N",
638         "inject configuration arguments into running MDS",
639         "mds", "*", "cli,rest")
640 COMMAND("config set " \
641         "name=key,type=CephString name=value,type=CephString",
642         "Set a configuration option at runtime (not persistent)",
643         "mds", "*", "cli,rest")
644 COMMAND("exit",
645         "Terminate this MDS",
646         "mds", "*", "cli,rest")
647 COMMAND("respawn",
648         "Restart this MDS",
649         "mds", "*", "cli,rest")
650 COMMAND("session kill " \
651         "name=session_id,type=CephInt",
652         "End a client session",
653         "mds", "*", "cli,rest")
654 COMMAND("cpu_profiler " \
655         "name=arg,type=CephChoices,strings=status|flush",
656         "run cpu profiling on daemon", "mds", "rw", "cli,rest")
657 COMMAND("session ls " \
658         "name=filters,type=CephString,n=N,req=false",
659         "List client sessions", "mds", "r", "cli,rest")
660 COMMAND("client ls " \
661         "name=filters,type=CephString,n=N,req=false",
662         "List client sessions", "mds", "r", "cli,rest")
663 COMMAND("session evict " \
664         "name=filters,type=CephString,n=N,req=false",
665         "Evict client session(s)", "mds", "rw", "cli,rest")
666 COMMAND("client evict " \
667         "name=filters,type=CephString,n=N,req=false",
668         "Evict client session(s)", "mds", "rw", "cli,rest")
669 COMMAND("damage ls",
670         "List detected metadata damage", "mds", "r", "cli,rest")
671 COMMAND("damage rm name=damage_id,type=CephInt",
672         "Remove a damage table entry", "mds", "rw", "cli,rest")
673 COMMAND("version", "report version of MDS", "mds", "r", "cli,rest")
674 COMMAND("heap " \
675         "name=heapcmd,type=CephChoices,strings=dump|start_profiler|stop_profiler|release|stats", \
676         "show heap usage info (available only if compiled with tcmalloc)", \
677         "mds", "*", "cli,rest")
678 };
679
680
681 int MDSDaemon::_handle_command(
682     const cmdmap_t &cmdmap,
683     MCommand *m,
684     bufferlist *outbl,
685     std::string *outs,
686     Context **run_later,
687     bool *need_reply)
688 {
689   assert(outbl != NULL);
690   assert(outs != NULL);
691
692   class SuicideLater : public Context
693   {
694     MDSDaemon *mds;
695
696     public:
697     explicit SuicideLater(MDSDaemon *mds_) : mds(mds_) {}
698     void finish(int r) override {
699       // Wait a little to improve chances of caller getting
700       // our response before seeing us disappear from mdsmap
701       sleep(1);
702
703       mds->suicide();
704     }
705   };
706
707
708   class RespawnLater : public Context
709   {
710     MDSDaemon *mds;
711
712     public:
713
714     explicit RespawnLater(MDSDaemon *mds_) : mds(mds_) {}
715     void finish(int r) override {
716       // Wait a little to improve chances of caller getting
717       // our response before seeing us disappear from mdsmap
718       sleep(1);
719
720       mds->respawn();
721     }
722   };
723
724   std::stringstream ds;
725   std::stringstream ss;
726   std::string prefix;
727   std::string format;
728   std::unique_ptr<Formatter> f(Formatter::create(format));
729   cmd_getval(cct, cmdmap, "prefix", prefix);
730
731   int r = 0;
732
733   if (prefix == "get_command_descriptions") {
734     int cmdnum = 0;
735     std::unique_ptr<JSONFormatter> f(ceph::make_unique<JSONFormatter>());
736     f->open_object_section("command_descriptions");
737     for (MDSCommand *cp = mds_commands;
738          cp < &mds_commands[ARRAY_SIZE(mds_commands)]; cp++) {
739
740       ostringstream secname;
741       secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
742       dump_cmddesc_to_json(f.get(), secname.str(), cp->cmdstring, cp->helpstring,
743                            cp->module, cp->perm, cp->availability, 0);
744       cmdnum++;
745     }
746     f->close_section(); // command_descriptions
747
748     f->flush(ds);
749     goto out; 
750   }
751
752   cmd_getval(cct, cmdmap, "format", format);
753   if (prefix == "version") {
754     if (f) {
755       f->open_object_section("version");
756       f->dump_string("version", pretty_version_to_str());
757       f->close_section();
758       f->flush(ds);
759     } else {
760       ds << pretty_version_to_str();
761     }
762   } else if (prefix == "injectargs") {
763     vector<string> argsvec;
764     cmd_getval(cct, cmdmap, "injected_args", argsvec);
765
766     if (argsvec.empty()) {
767       r = -EINVAL;
768       ss << "ignoring empty injectargs";
769       goto out;
770     }
771     string args = argsvec.front();
772     for (vector<string>::iterator a = ++argsvec.begin(); a != argsvec.end(); ++a)
773       args += " " + *a;
774     r = cct->_conf->injectargs(args, &ss);
775   } else if (prefix == "config set") {
776     std::string key;
777     cmd_getval(cct, cmdmap, "key", key);
778     std::string val;
779     cmd_getval(cct, cmdmap, "value", val);
780     r = cct->_conf->set_val(key, val, true, &ss);
781     if (r == 0) {
782       cct->_conf->apply_changes(nullptr);
783     }
784   } else if (prefix == "exit") {
785     // We will send response before executing
786     ss << "Exiting...";
787     *run_later = new SuicideLater(this);
788   } else if (prefix == "respawn") {
789     // We will send response before executing
790     ss << "Respawning...";
791     *run_later = new RespawnLater(this);
792   } else if (prefix == "session kill") {
793     if (mds_rank == NULL) {
794       r = -EINVAL;
795       ss << "MDS not active";
796       goto out;
797     }
798     // FIXME harmonize `session kill` with admin socket session evict
799     int64_t session_id = 0;
800     bool got = cmd_getval(cct, cmdmap, "session_id", session_id);
801     assert(got);
802     bool killed = mds_rank->evict_client(session_id, false,
803                                          g_conf->mds_session_blacklist_on_evict,
804                                          ss);
805     if (!killed)
806       r = -ENOENT;
807   } else if (prefix == "heap") {
808     if (!ceph_using_tcmalloc()) {
809       r = -EOPNOTSUPP;
810       ss << "could not issue heap profiler command -- not using tcmalloc!";
811     } else {
812       string heapcmd;
813       cmd_getval(cct, cmdmap, "heapcmd", heapcmd);
814       vector<string> heapcmd_vec;
815       get_str_vec(heapcmd, heapcmd_vec);
816       ceph_heap_profiler_handle_command(heapcmd_vec, ds);
817     }
818   } else if (prefix == "cpu_profiler") {
819     string arg;
820     cmd_getval(cct, cmdmap, "arg", arg);
821     vector<string> argvec;
822     get_str_vec(arg, argvec);
823     cpu_profiler_handle_command(argvec, ds);
824   } else {
825     // Give MDSRank a shot at the command
826     if (mds_rank) {
827       bool handled = mds_rank->handle_command(cmdmap, m, &r, &ds, &ss,
828                                               need_reply);
829       if (handled) {
830         goto out;
831       }
832     }
833
834     // Neither MDSDaemon nor MDSRank know this command
835     std::ostringstream ss;
836     ss << "unrecognized command! " << prefix;
837     r = -EINVAL;
838   }
839
840 out:
841   *outs = ss.str();
842   outbl->append(ds);
843   return r;
844 }
845
846 /* This function deletes the passed message before returning. */
847
848 void MDSDaemon::handle_mds_map(MMDSMap *m)
849 {
850   version_t epoch = m->get_epoch();
851   dout(5) << "handle_mds_map epoch " << epoch << " from " << m->get_source() << dendl;
852
853   // is it new?
854   if (epoch <= mdsmap->get_epoch()) {
855     dout(5) << " old map epoch " << epoch << " <= " << mdsmap->get_epoch()
856             << ", discarding" << dendl;
857     m->put();
858     return;
859   }
860
861   entity_addr_t addr;
862
863   // keep old map, for a moment
864   MDSMap *oldmap = mdsmap;
865
866   // decode and process
867   mdsmap = new MDSMap;
868   mdsmap->decode(m->get_encoded());
869   const MDSMap::DaemonState new_state = mdsmap->get_state_gid(mds_gid_t(monc->get_global_id()));
870   const int incarnation = mdsmap->get_inc_gid(mds_gid_t(monc->get_global_id()));
871
872   monc->sub_got("mdsmap", mdsmap->get_epoch());
873
874   // Calculate my effective rank (either my owned rank or my
875   // standby_for_rank if in standby replay)
876   mds_rank_t whoami = mdsmap->get_rank_gid(mds_gid_t(monc->get_global_id()));
877
878   // verify compatset
879   CompatSet mdsmap_compat(get_mdsmap_compat_set_all());
880   dout(10) << "     my compat " << mdsmap_compat << dendl;
881   dout(10) << " mdsmap compat " << mdsmap->compat << dendl;
882   if (!mdsmap_compat.writeable(mdsmap->compat)) {
883     dout(0) << "handle_mds_map mdsmap compatset " << mdsmap->compat
884             << " not writeable with daemon features " << mdsmap_compat
885             << ", killing myself" << dendl;
886     suicide();
887     goto out;
888   }
889
890   // mark down any failed peers
891   for (map<mds_gid_t,MDSMap::mds_info_t>::const_iterator p = oldmap->get_mds_info().begin();
892        p != oldmap->get_mds_info().end();
893        ++p) {
894     if (mdsmap->get_mds_info().count(p->first) == 0) {
895       dout(10) << " peer mds gid " << p->first << " removed from map" << dendl;
896       messenger->mark_down(p->second.addr);
897     }
898   }
899
900   if (whoami == MDS_RANK_NONE && 
901       new_state == MDSMap::STATE_STANDBY_REPLAY) {
902     whoami = mdsmap->get_mds_info_gid(mds_gid_t(monc->get_global_id())).standby_for_rank;
903   }
904
905   // see who i am
906   addr = messenger->get_myaddr();
907   dout(10) << "map says I am " << addr << " mds." << whoami << "." << incarnation
908            << " state " << ceph_mds_state_name(new_state) << dendl;
909
910   if (whoami == MDS_RANK_NONE) {
911     if (mds_rank != NULL) {
912       const auto myid = monc->get_global_id();
913       // We have entered a rank-holding state, we shouldn't be back
914       // here!
915       if (g_conf->mds_enforce_unique_name) {
916         if (mds_gid_t existing = mdsmap->find_mds_gid_by_name(name)) {
917           const MDSMap::mds_info_t& i = mdsmap->get_info_gid(existing);
918           if (i.global_id > myid) {
919             dout(1) << "map replaced me with another mds." << whoami
920                     << " with gid (" << i.global_id << ") larger than myself ("
921                     << myid << "); quitting!" << dendl;
922             // Call suicide() rather than respawn() because if someone else
923             // has taken our ID, we don't want to keep restarting and
924             // fighting them for the ID.
925             suicide();
926             m->put();
927             return;
928           }
929         }
930       }
931
932       dout(1) << "map removed me (mds." << whoami << " gid:"
933               << myid << ") from cluster due to lost contact; respawning" << dendl;
934       respawn();
935     }
936     // MDSRank not active: process the map here to see if we have
937     // been assigned a rank.
938     dout(10) <<  __func__ << ": handling map in rankless mode" << dendl;
939     _handle_mds_map(oldmap);
940   } else {
941
942     // Did we already hold a different rank?  MDSMonitor shouldn't try
943     // to change that out from under me!
944     if (mds_rank && whoami != mds_rank->get_nodeid()) {
945       derr << "Invalid rank transition " << mds_rank->get_nodeid() << "->"
946            << whoami << dendl;
947       respawn();
948     }
949
950     // Did I previously not hold a rank?  Initialize!
951     if (mds_rank == NULL) {
952       mds_rank = new MDSRankDispatcher(whoami, mds_lock, clog,
953           timer, beacon, mdsmap, messenger, monc,
954           new FunctionContext([this](int r){respawn();}),
955           new FunctionContext([this](int r){suicide();}));
956       dout(10) <<  __func__ << ": initializing MDS rank "
957                << mds_rank->get_nodeid() << dendl;
958       mds_rank->init();
959     }
960
961     // MDSRank is active: let him process the map, we have no say.
962     dout(10) <<  __func__ << ": handling map as rank "
963              << mds_rank->get_nodeid() << dendl;
964     mds_rank->handle_mds_map(m, oldmap);
965   }
966
967 out:
968   beacon.notify_mdsmap(mdsmap);
969   m->put();
970   delete oldmap;
971 }
972
973 void MDSDaemon::_handle_mds_map(MDSMap *oldmap)
974 {
975   MDSMap::DaemonState new_state = mdsmap->get_state_gid(mds_gid_t(monc->get_global_id()));
976
977   // Normal rankless case, we're marked as standby
978   if (new_state == MDSMap::STATE_STANDBY) {
979     beacon.set_want_state(mdsmap, new_state);
980     dout(1) << "handle_mds_map standby" << dendl;
981
982     return;
983   }
984
985   // Case where we thought we were standby, but MDSMap disagrees
986   if (beacon.get_want_state() == MDSMap::STATE_STANDBY) {
987     dout(10) << "dropped out of mdsmap, try to re-add myself" << dendl;
988     new_state = MDSMap::STATE_BOOT;
989     beacon.set_want_state(mdsmap, new_state);
990     return;
991   }
992
993   // Case where we have sent a boot beacon that isn't reflected yet
994   if (beacon.get_want_state() == MDSMap::STATE_BOOT) {
995     dout(10) << "not in map yet" << dendl;
996   }
997 }
998
999 void MDSDaemon::handle_signal(int signum)
1000 {
1001   assert(signum == SIGINT || signum == SIGTERM);
1002   derr << "*** got signal " << sig_str(signum) << " ***" << dendl;
1003   {
1004     Mutex::Locker l(mds_lock);
1005     if (stopping) {
1006       return;
1007     }
1008     suicide();
1009   }
1010 }
1011
1012 void MDSDaemon::suicide()
1013 {
1014   assert(mds_lock.is_locked());
1015   
1016   // make sure we don't suicide twice
1017   assert(stopping == false);
1018   stopping = true;
1019
1020   dout(1) << "suicide.  wanted state "
1021           << ceph_mds_state_name(beacon.get_want_state()) << dendl;
1022
1023   if (tick_event) {
1024     timer.cancel_event(tick_event);
1025     tick_event = 0;
1026   }
1027
1028   //because add_observer is called after set_up_admin_socket
1029   //so we can use asok_hook to avoid assert in the remove_observer
1030   if (asok_hook != NULL)
1031     g_conf->remove_observer(this);
1032
1033   clean_up_admin_socket();
1034
1035   // Inform MDS we are going away, then shut down beacon
1036   beacon.set_want_state(mdsmap, MDSMap::STATE_DNE);
1037   if (!mdsmap->is_dne_gid(mds_gid_t(monc->get_global_id()))) {
1038     // Notify the MDSMonitor that we're dying, so that it doesn't have to
1039     // wait for us to go laggy.  Only do this if we're actually in the
1040     // MDSMap, because otherwise the MDSMonitor will drop our message.
1041     beacon.send_and_wait(1);
1042   }
1043   beacon.shutdown();
1044
1045   mgrc.shutdown();
1046
1047   if (mds_rank) {
1048     mds_rank->shutdown();
1049   } else {
1050     timer.shutdown();
1051
1052     monc->shutdown();
1053     messenger->shutdown();
1054   }
1055 }
1056
1057 void MDSDaemon::respawn()
1058 {
1059   dout(1) << "respawn" << dendl;
1060
1061   char *new_argv[orig_argc+1];
1062   dout(1) << " e: '" << orig_argv[0] << "'" << dendl;
1063   for (int i=0; i<orig_argc; i++) {
1064     new_argv[i] = (char *)orig_argv[i];
1065     dout(1) << " " << i << ": '" << orig_argv[i] << "'" << dendl;
1066   }
1067   new_argv[orig_argc] = NULL;
1068
1069   /* Determine the path to our executable, test if Linux /proc/self/exe exists.
1070    * This allows us to exec the same executable even if it has since been
1071    * unlinked.
1072    */
1073   char exe_path[PATH_MAX] = "";
1074   if (readlink(PROCPREFIX "/proc/self/exe", exe_path, PATH_MAX-1) == -1) {
1075     /* Print CWD for the user's interest */
1076     char buf[PATH_MAX];
1077     char *cwd = getcwd(buf, sizeof(buf));
1078     assert(cwd);
1079     dout(1) << " cwd " << cwd << dendl;
1080
1081     /* Fall back to a best-effort: just running in our CWD */
1082     strncpy(exe_path, orig_argv[0], PATH_MAX-1);
1083   } else {
1084     dout(1) << "respawning with exe " << exe_path << dendl;
1085     strcpy(exe_path, PROCPREFIX "/proc/self/exe");
1086   }
1087
1088   dout(1) << " exe_path " << exe_path << dendl;
1089
1090   unblock_all_signals(NULL);
1091   execv(exe_path, new_argv);
1092
1093   dout(0) << "respawn execv " << orig_argv[0]
1094           << " failed with " << cpp_strerror(errno) << dendl;
1095
1096   // We have to assert out here, because suicide() returns, and callers
1097   // to respawn expect it never to return.
1098   ceph_abort();
1099 }
1100
1101
1102
1103 bool MDSDaemon::ms_dispatch(Message *m)
1104 {
1105   Mutex::Locker l(mds_lock);
1106   if (stopping) {
1107     return false;
1108   }
1109
1110   // Drop out early if shutting down
1111   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) {
1112     dout(10) << " stopping, discarding " << *m << dendl;
1113     m->put();
1114     return true;
1115   }
1116
1117   // First see if it's a daemon message
1118   const bool handled_core = handle_core_message(m);
1119   if (handled_core) {
1120     return true;
1121   }
1122
1123   // Not core, try it as a rank message
1124   if (mds_rank) {
1125     return mds_rank->ms_dispatch(m);
1126   } else {
1127     return false;
1128   }
1129 }
1130
1131 bool MDSDaemon::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
1132 {
1133   dout(10) << "MDSDaemon::ms_get_authorizer type="
1134            << ceph_entity_type_name(dest_type) << dendl;
1135
1136   /* monitor authorization is being handled on different layer */
1137   if (dest_type == CEPH_ENTITY_TYPE_MON)
1138     return true;
1139
1140   if (force_new) {
1141     if (monc->wait_auth_rotating(10) < 0)
1142       return false;
1143   }
1144
1145   *authorizer = monc->build_authorizer(dest_type);
1146   return *authorizer != NULL;
1147 }
1148
1149
1150 /*
1151  * high priority messages we always process
1152  */
1153 bool MDSDaemon::handle_core_message(Message *m)
1154 {
1155   switch (m->get_type()) {
1156   case CEPH_MSG_MON_MAP:
1157     ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
1158     m->put();
1159     break;
1160
1161     // MDS
1162   case CEPH_MSG_MDS_MAP:
1163     ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_MDS);
1164     handle_mds_map(static_cast<MMDSMap*>(m));
1165     break;
1166
1167     // OSD
1168   case MSG_COMMAND:
1169     handle_command(static_cast<MCommand*>(m));
1170     break;
1171   case CEPH_MSG_OSD_MAP:
1172     ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
1173
1174     if (mds_rank) {
1175       mds_rank->handle_osd_map();
1176     }
1177     m->put();
1178     break;
1179
1180   case MSG_MON_COMMAND:
1181     ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
1182     clog->warn() << "dropping `mds tell` command from legacy monitor";
1183     m->put();
1184     break;
1185
1186   default:
1187     return false;
1188   }
1189   return true;
1190 }
1191
1192 void MDSDaemon::ms_handle_connect(Connection *con)
1193 {
1194 }
1195
1196 bool MDSDaemon::ms_handle_reset(Connection *con)
1197 {
1198   if (con->get_peer_type() != CEPH_ENTITY_TYPE_CLIENT)
1199     return false;
1200
1201   Mutex::Locker l(mds_lock);
1202   if (stopping) {
1203     return false;
1204   }
1205   dout(5) << "ms_handle_reset on " << con->get_peer_addr() << dendl;
1206   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
1207     return false;
1208
1209   Session *session = static_cast<Session *>(con->get_priv());
1210   if (session) {
1211     if (session->is_closed()) {
1212       dout(3) << "ms_handle_reset closing connection for session " << session->info.inst << dendl;
1213       con->mark_down();
1214       con->set_priv(NULL);
1215     }
1216     session->put();
1217   } else {
1218     con->mark_down();
1219   }
1220   return false;
1221 }
1222
1223
1224 void MDSDaemon::ms_handle_remote_reset(Connection *con)
1225 {
1226   if (con->get_peer_type() != CEPH_ENTITY_TYPE_CLIENT)
1227     return;
1228
1229   Mutex::Locker l(mds_lock);
1230   if (stopping) {
1231     return;
1232   }
1233
1234   dout(5) << "ms_handle_remote_reset on " << con->get_peer_addr() << dendl;
1235   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
1236     return;
1237
1238   Session *session = static_cast<Session *>(con->get_priv());
1239   if (session) {
1240     if (session->is_closed()) {
1241       dout(3) << "ms_handle_remote_reset closing connection for session " << session->info.inst << dendl;
1242       con->mark_down();
1243       con->set_priv(NULL);
1244     }
1245     session->put();
1246   }
1247 }
1248
1249 bool MDSDaemon::ms_handle_refused(Connection *con)
1250 {
1251   // do nothing for now
1252   return false;
1253 }
1254
1255 bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type,
1256                                int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply,
1257                                bool& is_valid, CryptoKey& session_key)
1258 {
1259   Mutex::Locker l(mds_lock);
1260   if (stopping) {
1261     return false;
1262   }
1263   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
1264     return false;
1265
1266   AuthAuthorizeHandler *authorize_handler = 0;
1267   switch (peer_type) {
1268   case CEPH_ENTITY_TYPE_MDS:
1269     authorize_handler = authorize_handler_cluster_registry->get_handler(protocol);
1270     break;
1271   default:
1272     authorize_handler = authorize_handler_service_registry->get_handler(protocol);
1273   }
1274   if (!authorize_handler) {
1275     dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
1276     is_valid = false;
1277     return true;
1278   }
1279
1280   AuthCapsInfo caps_info;
1281   EntityName name;
1282   uint64_t global_id;
1283
1284   RotatingKeyRing *keys = monc->rotating_secrets.get();
1285   if (keys) {
1286     is_valid = authorize_handler->verify_authorizer(
1287       cct, keys,
1288       authorizer_data, authorizer_reply, name, global_id, caps_info,
1289       session_key);
1290   } else {
1291     dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
1292     is_valid = false;
1293   }
1294
1295   if (is_valid) {
1296     entity_name_t n(con->get_peer_type(), global_id);
1297
1298     // We allow connections and assign Session instances to connections
1299     // even if we have not been assigned a rank, because clients with
1300     // "allow *" are allowed to connect and do 'tell' operations before
1301     // we have a rank.
1302     Session *s = NULL;
1303     if (mds_rank) {
1304       // If we do hold a rank, see if this is an existing client establishing
1305       // a new connection, rather than a new client
1306       s = mds_rank->sessionmap.get_session(n);
1307     }
1308
1309     // Wire up a Session* to this connection
1310     // It doesn't go into a SessionMap instance until it sends an explicit
1311     // request to open a session (initial state of Session is `closed`)
1312     if (!s) {
1313       s = new Session;
1314       s->info.auth_name = name;
1315       s->info.inst.addr = con->get_peer_addr();
1316       s->info.inst.name = n;
1317       dout(10) << " new session " << s << " for " << s->info.inst << " con " << con << dendl;
1318       con->set_priv(s);
1319       s->connection = con;
1320     } else {
1321       dout(10) << " existing session " << s << " for " << s->info.inst << " existing con " << s->connection
1322                << ", new/authorizing con " << con << dendl;
1323       con->set_priv(s->get());
1324
1325
1326
1327       // Wait until we fully accept the connection before setting
1328       // s->connection.  In particular, if there are multiple incoming
1329       // connection attempts, they will all get their authorizer
1330       // validated, but some of them may "lose the race" and get
1331       // dropped.  We only want to consider the winner(s).  See
1332       // ms_handle_accept().  This is important for Sessions we replay
1333       // from the journal on recovery that don't have established
1334       // messenger state; we want the con from only the winning
1335       // connect attempt(s).  (Normal reconnects that don't follow MDS
1336       // recovery are reconnected to the existing con by the
1337       // messenger.)
1338     }
1339
1340     if (caps_info.allow_all) {
1341       // Flag for auth providers that don't provide cap strings
1342       s->auth_caps.set_allow_all();
1343     } else {
1344       bufferlist::iterator p = caps_info.caps.begin();
1345       string auth_cap_str;
1346       try {
1347         ::decode(auth_cap_str, p);
1348
1349         dout(10) << __func__ << ": parsing auth_cap_str='" << auth_cap_str << "'" << dendl;
1350         std::ostringstream errstr;
1351         if (!s->auth_caps.parse(g_ceph_context, auth_cap_str, &errstr)) {
1352           dout(1) << __func__ << ": auth cap parse error: " << errstr.str()
1353                   << " parsing '" << auth_cap_str << "'" << dendl;
1354           clog->warn() << name << " mds cap '" << auth_cap_str
1355                        << "' does not parse: " << errstr.str();
1356           is_valid = false;
1357         }
1358       } catch (buffer::error& e) {
1359         // Assume legacy auth, defaults to:
1360         //  * permit all filesystem ops
1361         //  * permit no `tell` ops
1362         dout(1) << __func__ << ": cannot decode auth caps bl of length " << caps_info.caps.length() << dendl;
1363         is_valid = false;
1364       }
1365     }
1366   }
1367
1368   return true;  // we made a decision (see is_valid)
1369 }
1370
1371
1372 void MDSDaemon::ms_handle_accept(Connection *con)
1373 {
1374   Mutex::Locker l(mds_lock);
1375   if (stopping) {
1376     return;
1377   }
1378
1379   Session *s = static_cast<Session *>(con->get_priv());
1380   dout(10) << "ms_handle_accept " << con->get_peer_addr() << " con " << con << " session " << s << dendl;
1381   if (s) {
1382     if (s->connection != con) {
1383       dout(10) << " session connection " << s->connection << " -> " << con << dendl;
1384       s->connection = con;
1385
1386       // send out any queued messages
1387       while (!s->preopen_out_queue.empty()) {
1388         con->send_message(s->preopen_out_queue.front());
1389         s->preopen_out_queue.pop_front();
1390       }
1391     }
1392     s->put();
1393   }
1394 }
1395
1396 bool MDSDaemon::is_clean_shutdown()
1397 {
1398   if (mds_rank) {
1399     return mds_rank->is_stopped();
1400   } else {
1401     return true;
1402   }
1403 }