Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mgr / DaemonServer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2016 John Spray <john.spray@redhat.com>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  */
13
14 #include "DaemonServer.h"
15
16 #include "include/str_list.h"
17 #include "auth/RotatingKeyRing.h"
18 #include "json_spirit/json_spirit_writer.h"
19
20 #include "mgr/mgr_commands.h"
21 #include "mon/MonCommand.h"
22
23 #include "messages/MMgrOpen.h"
24 #include "messages/MMgrConfigure.h"
25 #include "messages/MMonMgrReport.h"
26 #include "messages/MCommand.h"
27 #include "messages/MCommandReply.h"
28 #include "messages/MPGStats.h"
29 #include "messages/MOSDScrub.h"
30 #include "messages/MOSDForceRecovery.h"
31 #include "common/errno.h"
32
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_mgr
35 #undef dout_prefix
36 #define dout_prefix *_dout << "mgr.server " << __func__ << " "
37
38
39
40 DaemonServer::DaemonServer(MonClient *monc_,
41                            Finisher &finisher_,
42                            DaemonStateIndex &daemon_state_,
43                            ClusterState &cluster_state_,
44                            PyModuleRegistry &py_modules_,
45                            LogChannelRef clog_,
46                            LogChannelRef audit_clog_)
47     : Dispatcher(g_ceph_context),
48       client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes",
49                                          g_conf->get_val<uint64_t>("mgr_client_bytes"))),
50       client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages",
51                                         g_conf->get_val<uint64_t>("mgr_client_messages"))),
52       osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes",
53                                       g_conf->get_val<uint64_t>("mgr_osd_bytes"))),
54       osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages",
55                                      g_conf->get_val<uint64_t>("mgr_osd_messages"))),
56       mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes",
57                                       g_conf->get_val<uint64_t>("mgr_mds_bytes"))),
58       mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages",
59                                      g_conf->get_val<uint64_t>("mgr_mds_messages"))),
60       mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes",
61                                       g_conf->get_val<uint64_t>("mgr_mon_bytes"))),
62       mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages",
63                                      g_conf->get_val<uint64_t>("mgr_mon_messages"))),
64       msgr(nullptr),
65       monc(monc_),
66       finisher(finisher_),
67       daemon_state(daemon_state_),
68       cluster_state(cluster_state_),
69       py_modules(py_modules_),
70       clog(clog_),
71       audit_clog(audit_clog_),
72       auth_registry(g_ceph_context,
73                     g_conf->auth_supported.empty() ?
74                       g_conf->auth_cluster_required :
75                       g_conf->auth_supported),
76       lock("DaemonServer"),
77       pgmap_ready(false)
78 {
79   g_conf->add_observer(this);
80 }
81
82 DaemonServer::~DaemonServer() {
83   delete msgr;
84   g_conf->remove_observer(this);
85 }
86
87 int DaemonServer::init(uint64_t gid, entity_addr_t client_addr)
88 {
89   // Initialize Messenger
90   std::string public_msgr_type = g_conf->ms_public_type.empty() ?
91     g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
92   msgr = Messenger::create(g_ceph_context, public_msgr_type,
93                            entity_name_t::MGR(gid),
94                            "mgr",
95                            getpid(), 0);
96   msgr->set_default_policy(Messenger::Policy::stateless_server(0));
97
98   // throttle clients
99   msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
100                               client_byte_throttler.get(),
101                               client_msg_throttler.get());
102
103   // servers
104   msgr->set_policy_throttlers(entity_name_t::TYPE_OSD,
105                               osd_byte_throttler.get(),
106                               osd_msg_throttler.get());
107   msgr->set_policy_throttlers(entity_name_t::TYPE_MDS,
108                               mds_byte_throttler.get(),
109                               mds_msg_throttler.get());
110   msgr->set_policy_throttlers(entity_name_t::TYPE_MON,
111                               mon_byte_throttler.get(),
112                               mon_msg_throttler.get());
113
114   int r = msgr->bind(g_conf->public_addr);
115   if (r < 0) {
116     derr << "unable to bind mgr to " << g_conf->public_addr << dendl;
117     return r;
118   }
119
120   msgr->set_myname(entity_name_t::MGR(gid));
121   msgr->set_addr_unknowns(client_addr);
122
123   msgr->start();
124   msgr->add_dispatcher_tail(this);
125
126   started_at = ceph_clock_now();
127
128   return 0;
129 }
130
131 entity_addr_t DaemonServer::get_myaddr() const
132 {
133   return msgr->get_myaddr();
134 }
135
136
137 bool DaemonServer::ms_verify_authorizer(Connection *con,
138     int peer_type,
139     int protocol,
140     ceph::bufferlist& authorizer_data,
141     ceph::bufferlist& authorizer_reply,
142     bool& is_valid,
143     CryptoKey& session_key)
144 {
145   auto handler = auth_registry.get_handler(protocol);
146   if (!handler) {
147     dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl;
148     is_valid = false;
149     return true;
150   }
151
152   MgrSessionRef s(new MgrSession(cct));
153   s->inst.addr = con->get_peer_addr();
154   AuthCapsInfo caps_info;
155
156   RotatingKeyRing *keys = monc->rotating_secrets.get();
157   if (keys) {
158     is_valid = handler->verify_authorizer(
159       cct, keys,
160       authorizer_data,
161       authorizer_reply, s->entity_name,
162       s->global_id, caps_info,
163       session_key);
164   } else {
165     dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl;
166     is_valid = false;
167   }
168
169   if (is_valid) {
170     if (caps_info.allow_all) {
171       dout(10) << " session " << s << " " << s->entity_name
172                << " allow_all" << dendl;
173       s->caps.set_allow_all();
174     }
175     if (caps_info.caps.length() > 0) {
176       bufferlist::iterator p = caps_info.caps.begin();
177       string str;
178       try {
179         ::decode(str, p);
180       }
181       catch (buffer::error& e) {
182       }
183       bool success = s->caps.parse(str);
184       if (success) {
185         dout(10) << " session " << s << " " << s->entity_name
186                  << " has caps " << s->caps << " '" << str << "'" << dendl;
187       } else {
188         dout(10) << " session " << s << " " << s->entity_name
189                  << " failed to parse caps '" << str << "'" << dendl;
190         is_valid = false;
191       }
192     }
193     con->set_priv(s->get());
194
195     if (peer_type == CEPH_ENTITY_TYPE_OSD) {
196       Mutex::Locker l(lock);
197       s->osd_id = atoi(s->entity_name.get_id().c_str());
198       dout(10) << "registering osd." << s->osd_id << " session "
199                << s << " con " << con << dendl;
200       osd_cons[s->osd_id].insert(con);
201     }
202   }
203
204   return true;
205 }
206
207
208 bool DaemonServer::ms_get_authorizer(int dest_type,
209     AuthAuthorizer **authorizer, bool force_new)
210 {
211   dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl;
212
213   if (dest_type == CEPH_ENTITY_TYPE_MON) {
214     return true;
215   }
216
217   if (force_new) {
218     if (monc->wait_auth_rotating(10) < 0)
219       return false;
220   }
221
222   *authorizer = monc->build_authorizer(dest_type);
223   dout(20) << "got authorizer " << *authorizer << dendl;
224   return *authorizer != NULL;
225 }
226
227 bool DaemonServer::ms_handle_reset(Connection *con)
228 {
229   if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
230     MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
231     if (!session) {
232       return false;
233     }
234     session->put(); // SessionRef takes a ref
235     Mutex::Locker l(lock);
236     dout(10) << "unregistering osd." << session->osd_id
237              << "  session " << session << " con " << con << dendl;
238     osd_cons[session->osd_id].erase(con);
239
240     auto iter = daemon_connections.find(con);
241     if (iter != daemon_connections.end()) {
242       daemon_connections.erase(iter);
243     }
244   }
245   return false;
246 }
247
248 bool DaemonServer::ms_handle_refused(Connection *con)
249 {
250   // do nothing for now
251   return false;
252 }
253
254 bool DaemonServer::ms_dispatch(Message *m)
255 {
256   // Note that we do *not* take ::lock here, in order to avoid
257   // serializing all message handling.  It's up to each handler
258   // to take whatever locks it needs.
259   switch (m->get_type()) {
260     case MSG_PGSTATS:
261       cluster_state.ingest_pgstats(static_cast<MPGStats*>(m));
262       maybe_ready(m->get_source().num());
263       m->put();
264       return true;
265     case MSG_MGR_REPORT:
266       return handle_report(static_cast<MMgrReport*>(m));
267     case MSG_MGR_OPEN:
268       return handle_open(static_cast<MMgrOpen*>(m));
269     case MSG_COMMAND:
270       return handle_command(static_cast<MCommand*>(m));
271     default:
272       dout(1) << "Unhandled message type " << m->get_type() << dendl;
273       return false;
274   };
275 }
276
277 void DaemonServer::maybe_ready(int32_t osd_id)
278 {
279   if (pgmap_ready.load()) {
280     // Fast path: we don't need to take lock because pgmap_ready
281     // is already set
282   } else {
283     Mutex::Locker l(lock);
284
285     if (reported_osds.find(osd_id) == reported_osds.end()) {
286       dout(4) << "initial report from osd " << osd_id << dendl;
287       reported_osds.insert(osd_id);
288       std::set<int32_t> up_osds;
289
290       cluster_state.with_osdmap([&](const OSDMap& osdmap) {
291           osdmap.get_up_osds(up_osds);
292       });
293
294       std::set<int32_t> unreported_osds;
295       std::set_difference(up_osds.begin(), up_osds.end(),
296                           reported_osds.begin(), reported_osds.end(),
297                           std::inserter(unreported_osds, unreported_osds.begin()));
298
299       if (unreported_osds.size() == 0) {
300         dout(4) << "all osds have reported, sending PG state to mon" << dendl;
301         pgmap_ready = true;
302         reported_osds.clear();
303         // Avoid waiting for next tick
304         send_report();
305       } else {
306         dout(4) << "still waiting for " << unreported_osds.size() << " osds"
307                    " to report in before PGMap is ready" << dendl;
308       }
309     }
310   }
311 }
312
313 void DaemonServer::shutdown()
314 {
315   dout(10) << "begin" << dendl;
316   msgr->shutdown();
317   msgr->wait();
318   dout(10) << "done" << dendl;
319 }
320
321
322
323 bool DaemonServer::handle_open(MMgrOpen *m)
324 {
325   Mutex::Locker l(lock);
326
327   DaemonKey key;
328   if (!m->service_name.empty()) {
329     key.first = m->service_name;
330   } else {
331     key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
332   }
333   key.second = m->daemon_name;
334
335   dout(4) << "from " << m->get_connection() << "  " << key << dendl;
336
337   _send_configure(m->get_connection());
338
339   DaemonStatePtr daemon;
340   if (daemon_state.exists(key)) {
341     daemon = daemon_state.get(key);
342   }
343   if (daemon) {
344     dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl;
345     Mutex::Locker l(daemon->lock);
346     daemon->perf_counters.clear();
347   }
348
349   if (m->service_daemon) {
350     if (!daemon) {
351       dout(4) << "constructing new DaemonState for " << key << dendl;
352       daemon = std::make_shared<DaemonState>(daemon_state.types);
353       daemon->key = key;
354       if (m->daemon_metadata.count("hostname")) {
355         daemon->hostname = m->daemon_metadata["hostname"];
356       }
357       daemon_state.insert(daemon);
358     }
359     Mutex::Locker l(daemon->lock);
360     daemon->service_daemon = true;
361     daemon->metadata = m->daemon_metadata;
362     daemon->service_status = m->daemon_status;
363
364     utime_t now = ceph_clock_now();
365     auto d = pending_service_map.get_daemon(m->service_name,
366                                             m->daemon_name);
367     if (d->gid != (uint64_t)m->get_source().num()) {
368       dout(10) << "registering " << key << " in pending_service_map" << dendl;
369       d->gid = m->get_source().num();
370       d->addr = m->get_source_addr();
371       d->start_epoch = pending_service_map.epoch;
372       d->start_stamp = now;
373       d->metadata = m->daemon_metadata;
374       pending_service_map_dirty = pending_service_map.epoch;
375     }
376   }
377
378   if (m->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT &&
379       m->service_name.empty())
380   {
381     // Store in set of the daemon/service connections, i.e. those
382     // connections that require an update in the event of stats
383     // configuration changes.
384     daemon_connections.insert(m->get_connection());
385   }
386
387   m->put();
388   return true;
389 }
390
391 bool DaemonServer::handle_report(MMgrReport *m)
392 {
393   DaemonKey key;
394   if (!m->service_name.empty()) {
395     key.first = m->service_name;
396   } else {
397     key.first = ceph_entity_type_name(m->get_connection()->get_peer_type());
398   }
399   key.second = m->daemon_name;
400
401   dout(4) << "from " << m->get_connection() << " " << key << dendl;
402
403   if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT &&
404       m->service_name.empty()) {
405     // Clients should not be sending us stats unless they are declaring
406     // themselves to be a daemon for some service.
407     dout(4) << "rejecting report from non-daemon client " << m->daemon_name
408             << dendl;
409     m->put();
410     return true;
411   }
412
413   // Look up the DaemonState
414   DaemonStatePtr daemon;
415   if (daemon_state.exists(key)) {
416     dout(20) << "updating existing DaemonState for " << key << dendl;
417     daemon = daemon_state.get(key);
418   } else {
419     dout(4) << "constructing new DaemonState for " << key << dendl;
420     daemon = std::make_shared<DaemonState>(daemon_state.types);
421     // FIXME: crap, we don't know the hostname at this stage.
422     daemon->key = key;
423     daemon_state.insert(daemon);
424     // FIXME: we should avoid this case by rejecting MMgrReport from
425     // daemons without sessions, and ensuring that session open
426     // always contains metadata.
427   }
428
429   // Update the DaemonState
430   assert(daemon != nullptr);
431   {
432     Mutex::Locker l(daemon->lock);
433     auto &daemon_counters = daemon->perf_counters;
434     daemon_counters.update(m);
435
436     if (daemon->service_daemon) {
437       utime_t now = ceph_clock_now();
438       if (m->daemon_status) {
439         daemon->service_status = *m->daemon_status;
440         daemon->service_status_stamp = now;
441       }
442       daemon->last_service_beacon = now;
443     } else if (m->daemon_status) {
444       derr << "got status from non-daemon " << key << dendl;
445     }
446   }
447
448   // if there are any schema updates, notify the python modules
449   if (!m->declare_types.empty() || !m->undeclare_types.empty()) {
450     ostringstream oss;
451     oss << key.first << '.' << key.second;
452     py_modules.notify_all("perf_schema_update", oss.str());
453   }
454
455   m->put();
456   return true;
457 }
458
459
460 void DaemonServer::_generate_command_map(
461   map<string,cmd_vartype>& cmdmap,
462   map<string,string> &param_str_map)
463 {
464   for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
465        p != cmdmap.end(); ++p) {
466     if (p->first == "prefix")
467       continue;
468     if (p->first == "caps") {
469       vector<string> cv;
470       if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
471           cv.size() % 2 == 0) {
472         for (unsigned i = 0; i < cv.size(); i += 2) {
473           string k = string("caps_") + cv[i];
474           param_str_map[k] = cv[i + 1];
475         }
476         continue;
477       }
478     }
479     param_str_map[p->first] = cmd_vartype_stringify(p->second);
480   }
481 }
482
483 const MonCommand *DaemonServer::_get_mgrcommand(
484   const string &cmd_prefix,
485   const std::vector<MonCommand> &cmds)
486 {
487   const MonCommand *this_cmd = nullptr;
488   for (const auto &cmd : cmds) {
489     if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
490       this_cmd = &cmd;
491       break;
492     }
493   }
494   return this_cmd;
495 }
496
497 bool DaemonServer::_allowed_command(
498   MgrSession *s,
499   const string &module,
500   const string &prefix,
501   const map<string,cmd_vartype>& cmdmap,
502   const map<string,string>& param_str_map,
503   const MonCommand *this_cmd) {
504
505   if (s->entity_name.is_mon()) {
506     // mon is all-powerful.  even when it is forwarding commands on behalf of
507     // old clients; we expect the mon is validating commands before proxying!
508     return true;
509   }
510
511   bool cmd_r = this_cmd->requires_perm('r');
512   bool cmd_w = this_cmd->requires_perm('w');
513   bool cmd_x = this_cmd->requires_perm('x');
514
515   bool capable = s->caps.is_capable(
516     g_ceph_context,
517     CEPH_ENTITY_TYPE_MGR,
518     s->entity_name,
519     module, prefix, param_str_map,
520     cmd_r, cmd_w, cmd_x);
521
522   dout(10) << " " << s->entity_name << " "
523            << (capable ? "" : "not ") << "capable" << dendl;
524   return capable;
525 }
526
527 bool DaemonServer::handle_command(MCommand *m)
528 {
529   Mutex::Locker l(lock);
530   int r = 0;
531   std::stringstream ss;
532   std::string prefix;
533
534   assert(lock.is_locked_by_me());
535
536   /**
537    * The working data for processing an MCommand.  This lives in
538    * a class to enable passing it into other threads for processing
539    * outside of the thread/locks that called handle_command.
540    */
541   class CommandContext
542   {
543     public:
544     MCommand *m;
545     bufferlist odata;
546     cmdmap_t cmdmap;
547
548     CommandContext(MCommand *m_)
549       : m(m_)
550     {
551     }
552
553     ~CommandContext()
554     {
555       m->put();
556     }
557
558     void reply(int r, const std::stringstream &ss)
559     {
560       reply(r, ss.str());
561     }
562
563     void reply(int r, const std::string &rs)
564     {
565       // Let the connection drop as soon as we've sent our response
566       ConnectionRef con = m->get_connection();
567       if (con) {
568         con->mark_disposable();
569       }
570
571       dout(1) << "handle_command " << cpp_strerror(r) << " " << rs << dendl;
572       if (con) {
573         MCommandReply *reply = new MCommandReply(r, rs);
574         reply->set_tid(m->get_tid());
575         reply->set_data(odata);
576         con->send_message(reply);
577       }
578     }
579   };
580
581   /**
582    * A context for receiving a bufferlist/error string from a background
583    * function and then calling back to a CommandContext when it's done
584    */
585   class ReplyOnFinish : public Context {
586     std::shared_ptr<CommandContext> cmdctx;
587
588   public:
589     bufferlist from_mon;
590     string outs;
591
592     ReplyOnFinish(std::shared_ptr<CommandContext> cmdctx_)
593       : cmdctx(cmdctx_)
594     {}
595     void finish(int r) override {
596       cmdctx->odata.claim_append(from_mon);
597       cmdctx->reply(r, outs);
598     }
599   };
600
601   std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
602
603   MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
604   if (!session) {
605     return true;
606   }
607   session->put(); // SessionRef takes a ref
608   if (session->inst.name == entity_name_t())
609     session->inst.name = m->get_source();
610
611   std::string format;
612   boost::scoped_ptr<Formatter> f;
613   map<string,string> param_str_map;
614
615   if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) {
616     cmdctx->reply(-EINVAL, ss);
617     return true;
618   }
619
620   {
621     cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain"));
622     f.reset(Formatter::create(format));
623   }
624
625   cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix);
626
627   dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl;
628   dout(4) << "prefix=" << prefix << dendl;
629
630   if (prefix == "get_command_descriptions") {
631     dout(10) << "reading commands from python modules" << dendl;
632     const auto py_commands = py_modules.get_commands();
633
634     int cmdnum = 0;
635     JSONFormatter f;
636     f.open_object_section("command_descriptions");
637
638     auto dump_cmd = [&cmdnum, &f](const MonCommand &mc){
639       ostringstream secname;
640       secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
641       dump_cmddesc_to_json(&f, secname.str(), mc.cmdstring, mc.helpstring,
642                            mc.module, mc.req_perms, mc.availability, 0);
643       cmdnum++;
644     };
645
646     for (const auto &pyc : py_commands) {
647       dump_cmd(pyc);
648     }
649
650     for (const auto &mgr_cmd : mgr_commands) {
651       dump_cmd(mgr_cmd);
652     }
653
654     f.close_section();  // command_descriptions
655     f.flush(cmdctx->odata);
656     cmdctx->reply(0, ss);
657     return true;
658   }
659
660   // lookup command
661   const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands);
662   _generate_command_map(cmdctx->cmdmap, param_str_map);
663   if (!mgr_cmd) {
664     MonCommand py_command = {"", "", "py", "rw", "cli"};
665     if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap,
666                           param_str_map, &py_command)) {
667       dout(1) << " access denied" << dendl;
668       ss << "access denied; does your client key have mgr caps?"
669         " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
670       cmdctx->reply(-EACCES, ss);
671       return true;
672     }
673   } else {
674     // validate user's permissions for requested command
675     if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap,
676                           param_str_map, mgr_cmd)) {
677       dout(1) << " access denied" << dendl;
678       audit_clog->info() << "from='" << session->inst << "' "
679                          << "entity='" << session->entity_name << "' "
680                          << "cmd=" << m->cmd << ":  access denied";
681       ss << "access denied' does your client key have mgr caps?"
682         " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication";
683       cmdctx->reply(-EACCES, ss);
684       return true;
685     }
686   }
687
688   audit_clog->debug()
689     << "from='" << session->inst << "' "
690     << "entity='" << session->entity_name << "' "
691     << "cmd=" << m->cmd << ": dispatch";
692
693   // ----------------
694   // service map commands
695   if (prefix == "service dump") {
696     if (!f)
697       f.reset(Formatter::create("json-pretty"));
698     cluster_state.with_servicemap([&](const ServiceMap &service_map) {
699         f->dump_object("service_map", service_map);
700       });
701     f->flush(cmdctx->odata);
702     cmdctx->reply(0, ss);
703     return true;
704   }
705   if (prefix == "service status") {
706     if (!f)
707       f.reset(Formatter::create("json-pretty"));
708     // only include state from services that are in the persisted service map
709     f->open_object_section("service_status");
710     ServiceMap s;
711     cluster_state.with_servicemap([&](const ServiceMap& service_map) {
712         s = service_map;
713       });
714     for (auto& p : s.services) {
715       f->open_object_section(p.first.c_str());
716       for (auto& q : p.second.daemons) {
717         f->open_object_section(q.first.c_str());
718         DaemonKey key(p.first, q.first);
719         assert(daemon_state.exists(key));
720         auto daemon = daemon_state.get(key);
721         Mutex::Locker l(daemon->lock);
722         f->dump_stream("status_stamp") << daemon->service_status_stamp;
723         f->dump_stream("last_beacon") << daemon->last_service_beacon;
724         f->open_object_section("status");
725         for (auto& r : daemon->service_status) {
726           f->dump_string(r.first.c_str(), r.second);
727         }
728         f->close_section();
729         f->close_section();
730       }
731       f->close_section();
732     }
733     f->close_section();
734     f->flush(cmdctx->odata);
735     cmdctx->reply(0, ss);
736     return true;
737   }
738
739   if (prefix == "config set") {
740     std::string key;
741     std::string val;
742     cmd_getval(cct, cmdctx->cmdmap, "key", key);
743     cmd_getval(cct, cmdctx->cmdmap, "value", val);
744     r = cct->_conf->set_val(key, val, true, &ss);
745     if (r == 0) {
746       cct->_conf->apply_changes(nullptr);
747     }
748     cmdctx->reply(0, ss);
749     return true;
750   }
751
752   // -----------
753   // PG commands
754
755   if (prefix == "pg scrub" ||
756       prefix == "pg repair" ||
757       prefix == "pg deep-scrub") {
758     string scrubop = prefix.substr(3, string::npos);
759     pg_t pgid;
760     string pgidstr;
761     cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
762     if (!pgid.parse(pgidstr.c_str())) {
763       ss << "invalid pgid '" << pgidstr << "'";
764       cmdctx->reply(-EINVAL, ss);
765       return true;
766     }
767     bool pg_exists = false;
768     cluster_state.with_osdmap([&](const OSDMap& osdmap) {
769         pg_exists = osdmap.pg_exists(pgid);
770       });
771     if (!pg_exists) {
772       ss << "pg " << pgid << " dne";
773       cmdctx->reply(-ENOENT, ss);
774       return true;
775     }
776     int acting_primary = -1;
777     cluster_state.with_osdmap([&](const OSDMap& osdmap) {
778         acting_primary = osdmap.get_pg_acting_primary(pgid);
779       });
780     if (acting_primary == -1) {
781       ss << "pg " << pgid << " has no primary osd";
782       cmdctx->reply(-EAGAIN, ss);
783       return true;
784     }
785     auto p = osd_cons.find(acting_primary);
786     if (p == osd_cons.end()) {
787       ss << "pg " << pgid << " primary osd." << acting_primary
788          << " is not currently connected";
789       cmdctx->reply(-EAGAIN, ss);
790     }
791     vector<pg_t> pgs = { pgid };
792     for (auto& con : p->second) {
793       con->send_message(new MOSDScrub(monc->get_fsid(),
794                                       pgs,
795                                       scrubop == "repair",
796                                       scrubop == "deep-scrub"));
797     }
798     ss << "instructing pg " << pgid << " on osd." << acting_primary
799        << " to " << scrubop;
800     cmdctx->reply(0, ss);
801     return true;
802   } else if (prefix == "osd scrub" ||
803               prefix == "osd deep-scrub" ||
804               prefix == "osd repair") {
805     string whostr;
806     cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr);
807     vector<string> pvec;
808     get_str_vec(prefix, pvec);
809
810     set<int> osds;
811     if (whostr == "*" || whostr == "all" || whostr == "any") {
812       cluster_state.with_osdmap([&](const OSDMap& osdmap) {
813           for (int i = 0; i < osdmap.get_max_osd(); i++)
814             if (osdmap.is_up(i)) {
815               osds.insert(i);
816             }
817         });
818     } else {
819       long osd = parse_osd_id(whostr.c_str(), &ss);
820       if (osd < 0) {
821         ss << "invalid osd '" << whostr << "'";
822         cmdctx->reply(-EINVAL, ss);
823         return true;
824       }
825       cluster_state.with_osdmap([&](const OSDMap& osdmap) {
826           if (osdmap.is_up(osd)) {
827             osds.insert(osd);
828           }
829         });
830       if (osds.empty()) {
831         ss << "osd." << osd << " is not up";
832         cmdctx->reply(-EAGAIN, ss);
833         return true;
834       }
835     }
836     set<int> sent_osds, failed_osds;
837     for (auto osd : osds) {
838       auto p = osd_cons.find(osd);
839       if (p == osd_cons.end()) {
840         failed_osds.insert(osd);
841       } else {
842         sent_osds.insert(osd);
843         for (auto& con : p->second) {
844           con->send_message(new MOSDScrub(monc->get_fsid(),
845                                           pvec.back() == "repair",
846                                           pvec.back() == "deep-scrub"));
847         }
848       }
849     }
850     if (failed_osds.size() == osds.size()) {
851       ss << "failed to instruct osd(s) " << osds << " to " << pvec.back()
852          << " (not connected)";
853       r = -EAGAIN;
854     } else {
855       ss << "instructed osd(s) " << sent_osds << " to " << pvec.back();
856       if (!failed_osds.empty()) {
857         ss << "; osd(s) " << failed_osds << " were not connected";
858       }
859       r = 0;
860     }
861     cmdctx->reply(0, ss);
862     return true;
863   } else if (prefix == "osd reweight-by-pg" ||
864              prefix == "osd reweight-by-utilization" ||
865              prefix == "osd test-reweight-by-pg" ||
866              prefix == "osd test-reweight-by-utilization") {
867     bool by_pg =
868       prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg";
869     bool dry_run =
870       prefix == "osd test-reweight-by-pg" ||
871       prefix == "osd test-reweight-by-utilization";
872     int64_t oload;
873     cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120));
874     set<int64_t> pools;
875     vector<string> poolnames;
876     cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames);
877     cluster_state.with_osdmap([&](const OSDMap& osdmap) {
878         for (const auto& poolname : poolnames) {
879           int64_t pool = osdmap.lookup_pg_pool_name(poolname);
880           if (pool < 0) {
881             ss << "pool '" << poolname << "' does not exist";
882             r = -ENOENT;
883           }
884           pools.insert(pool);
885         }
886       });
887     if (r) {
888       cmdctx->reply(r, ss);
889       return true;
890     }
891     double max_change = g_conf->mon_reweight_max_change;
892     cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change);
893     if (max_change <= 0.0) {
894       ss << "max_change " << max_change << " must be positive";
895       cmdctx->reply(-EINVAL, ss);
896       return true;
897     }
898     int64_t max_osds = g_conf->mon_reweight_max_osds;
899     cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds);
900     if (max_osds <= 0) {
901       ss << "max_osds " << max_osds << " must be positive";
902       cmdctx->reply(-EINVAL, ss);
903       return true;
904     }
905     string no_increasing;
906     cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing);
907     string out_str;
908     mempool::osdmap::map<int32_t, uint32_t> new_weights;
909     r = cluster_state.with_pgmap([&](const PGMap& pgmap) {
910         return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
911             return reweight::by_utilization(osdmap, pgmap,
912                                             oload,
913                                             max_change,
914                                             max_osds,
915                                             by_pg,
916                                             pools.empty() ? NULL : &pools,
917                                             no_increasing == "--no-increasing",
918                                             &new_weights,
919                                             &ss, &out_str, f.get());
920           });
921       });
922     if (r >= 0) {
923       dout(10) << "reweight::by_utilization: finished with " << out_str << dendl;
924     }
925     if (f) {
926       f->flush(cmdctx->odata);
927     } else {
928       cmdctx->odata.append(out_str);
929     }
930     if (r < 0) {
931       ss << "FAILED reweight-by-pg";
932       cmdctx->reply(r, ss);
933       return true;
934     } else if (r == 0 || dry_run) {
935       ss << "no change";
936       cmdctx->reply(r, ss);
937       return true;
938     } else {
939       json_spirit::Object json_object;
940       for (const auto& osd_weight : new_weights) {
941         json_spirit::Config::add(json_object,
942                                  std::to_string(osd_weight.first),
943                                  std::to_string(osd_weight.second));
944       }
945       string s = json_spirit::write(json_object);
946       std::replace(begin(s), end(s), '\"', '\'');
947       const string cmd =
948         "{"
949         "\"prefix\": \"osd reweightn\", "
950         "\"weights\": \"" + s + "\""
951         "}";
952       auto on_finish = new ReplyOnFinish(cmdctx);
953       monc->start_mon_command({cmd}, {},
954                               &on_finish->from_mon, &on_finish->outs, on_finish);
955       return true;
956     }
957   } else if (prefix == "osd df") {
958     string method;
959     cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method);
960     r = cluster_state.with_pgservice([&](const PGMapStatService& pgservice) {
961         return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
962             print_osd_utilization(osdmap, &pgservice, ss,
963                                   f.get(), method == "tree");
964                                   
965             cmdctx->odata.append(ss);
966             return 0;
967           });
968       });
969     cmdctx->reply(r, "");
970     return true;
971   } else if (prefix == "osd safe-to-destroy") {
972     vector<string> ids;
973     cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
974     set<int> osds;
975     int r;
976     cluster_state.with_osdmap([&](const OSDMap& osdmap) {
977         r = osdmap.parse_osd_id_list(ids, &osds, &ss);
978       });
979     if (!r && osds.empty()) {
980       ss << "must specify one or more OSDs";
981       r = -EINVAL;
982     }
983     if (r < 0) {
984       cmdctx->reply(r, ss);
985       return true;
986     }
987     set<int> active_osds, missing_stats, stored_pgs;
988     int affected_pgs = 0;
989     cluster_state.with_pgmap([&](const PGMap& pg_map) {
990         if (pg_map.num_pg_unknown > 0) {
991           ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw"
992              << " any conclusions";
993           r = -EAGAIN;
994           return;
995         }
996         int num_active_clean = 0;
997         for (auto& p : pg_map.num_pg_by_state) {
998           unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN;
999           if ((p.first & want) == want) {
1000             num_active_clean += p.second;
1001           }
1002         }
1003         cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1004             for (auto osd : osds) {
1005               if (!osdmap.exists(osd)) {
1006                 continue;  // clearly safe to destroy
1007               }
1008               auto q = pg_map.num_pg_by_osd.find(osd);
1009               if (q != pg_map.num_pg_by_osd.end()) {
1010                 if (q->second.acting > 0 || q->second.up > 0) {
1011                   active_osds.insert(osd);
1012                   affected_pgs += q->second.acting + q->second.up;
1013                   continue;
1014                 }
1015               }
1016               if (num_active_clean < pg_map.num_pg) {
1017                 // all pgs aren't active+clean; we need to be careful.
1018                 auto p = pg_map.osd_stat.find(osd);
1019                 if (p == pg_map.osd_stat.end()) {
1020                   missing_stats.insert(osd);
1021                 }
1022                 if (p->second.num_pgs > 0) {
1023                   stored_pgs.insert(osd);
1024                 }
1025               }
1026             }
1027           });
1028       });
1029     if (!r && !active_osds.empty()) {
1030       ss << "OSD(s) " << active_osds << " have " << affected_pgs
1031          << " pgs currently mapped to them";
1032       r = -EBUSY;
1033     } else if (!missing_stats.empty()) {
1034       ss << "OSD(s) " << missing_stats << " have no reported stats, and not all"
1035          << " PGs are active+clean; we cannot draw any conclusions";
1036       r = -EAGAIN;
1037     } else if (!stored_pgs.empty()) {
1038       ss << "OSD(s) " << stored_pgs << " last reported they still store some PG"
1039          << " data, and not all PGs are active+clean; we cannot be sure they"
1040          << " aren't still needed.";
1041       r = -EBUSY;
1042     }
1043     if (r) {
1044       cmdctx->reply(r, ss);
1045       return true;
1046     }
1047     ss << "OSD(s) " << osds << " are safe to destroy without reducing data"
1048        << " durability.";
1049     cmdctx->reply(0, ss);
1050     return true;
1051   } else if (prefix == "osd ok-to-stop") {
1052     vector<string> ids;
1053     cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids);
1054     set<int> osds;
1055     int r;
1056     cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1057         r = osdmap.parse_osd_id_list(ids, &osds, &ss);
1058       });
1059     if (!r && osds.empty()) {
1060       ss << "must specify one or more OSDs";
1061       r = -EINVAL;
1062     }
1063     if (r < 0) {
1064       cmdctx->reply(r, ss);
1065       return true;
1066     }
1067     map<pg_t,int> pg_delta;  // pgid -> net acting set size change
1068     int dangerous_pgs = 0;
1069     cluster_state.with_pgmap([&](const PGMap& pg_map) {
1070         return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1071             if (pg_map.num_pg_unknown > 0) {
1072               ss << pg_map.num_pg_unknown << " pgs have unknown state; "
1073                  << "cannot draw any conclusions";
1074               r = -EAGAIN;
1075               return;
1076             }
1077             for (auto osd : osds) {
1078               auto p = pg_map.pg_by_osd.find(osd);
1079               if (p != pg_map.pg_by_osd.end()) {
1080                 for (auto& pgid : p->second) {
1081                   --pg_delta[pgid];
1082                 }
1083               }
1084             }
1085             for (auto& p : pg_delta) {
1086               auto q = pg_map.pg_stat.find(p.first);
1087               if (q == pg_map.pg_stat.end()) {
1088                 ss << "missing information about " << p.first << "; cannot draw"
1089                    << " any conclusions";
1090                 r = -EAGAIN;
1091                 return;
1092               }
1093               if (!(q->second.state & PG_STATE_ACTIVE) ||
1094                   (q->second.state & PG_STATE_DEGRADED)) {
1095                 // we don't currently have a good way to tell *how* degraded
1096                 // a degraded PG is, so we have to assume we cannot remove
1097                 // any more replicas/shards.
1098                 ++dangerous_pgs;
1099                 continue;
1100               }
1101               const pg_pool_t *pi = osdmap.get_pg_pool(p.first.pool());
1102               if (!pi) {
1103                 ++dangerous_pgs; // pool is creating or deleting
1104               } else {
1105                 if (q->second.acting.size() + p.second < pi->min_size) {
1106                   ++dangerous_pgs;
1107                 }
1108               }
1109             }
1110           });
1111       });
1112     if (r) {
1113       cmdctx->reply(r, ss);
1114       return true;
1115     }
1116     if (dangerous_pgs) {
1117       ss << dangerous_pgs << " PGs are already degraded or might become "
1118          << "unavailable";
1119       cmdctx->reply(-EBUSY, ss);
1120       return true;
1121     }
1122     ss << "OSD(s) " << osds << " are ok to stop without reducing"
1123        << " availability, provided there are no other concurrent failures"
1124        << " or interventions. " << pg_delta.size() << " PGs are likely to be"
1125        << " degraded (but remain available) as a result.";
1126     cmdctx->reply(0, ss);
1127     return true;
1128   } else if (prefix == "pg force-recovery" ||
1129                prefix == "pg force-backfill" ||
1130                prefix == "pg cancel-force-recovery" ||
1131                prefix == "pg cancel-force-backfill") {
1132     string forceop = prefix.substr(3, string::npos);
1133     list<pg_t> parsed_pgs;
1134     map<int, list<pg_t> > osdpgs;
1135
1136     // figure out actual op just once
1137     int actual_op = 0;
1138     if (forceop == "force-recovery") {
1139       actual_op = OFR_RECOVERY;
1140     } else if (forceop == "force-backfill") {
1141       actual_op = OFR_BACKFILL;
1142     } else if (forceop == "cancel-force-backfill") {
1143       actual_op = OFR_BACKFILL | OFR_CANCEL;
1144     } else if (forceop == "cancel-force-recovery") {
1145       actual_op = OFR_RECOVERY | OFR_CANCEL;
1146     }
1147
1148     // covnert pg names to pgs, discard any invalid ones while at it
1149     {
1150       // we don't want to keep pgidstr and pgidstr_nodup forever
1151       vector<string> pgidstr;
1152       // get pgids to process and prune duplicates
1153       cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr);
1154       set<string> pgidstr_nodup(pgidstr.begin(), pgidstr.end());
1155       if (pgidstr.size() != pgidstr_nodup.size()) {
1156         // move elements only when there were duplicates, as this
1157         // reorders them
1158         pgidstr.resize(pgidstr_nodup.size());
1159         auto it = pgidstr_nodup.begin();
1160         for (size_t i = 0 ; i < pgidstr_nodup.size(); i++) {
1161           pgidstr[i] = std::move(*it++);
1162         }
1163       }
1164
1165       cluster_state.with_pgmap([&](const PGMap& pg_map) {
1166         for (auto& pstr : pgidstr) {
1167           pg_t parsed_pg;
1168           if (!parsed_pg.parse(pstr.c_str())) {
1169             ss << "invalid pgid '" << pstr << "'; ";
1170             r = -EINVAL;
1171           } else {
1172             auto workit = pg_map.pg_stat.find(parsed_pg);
1173             if (workit == pg_map.pg_stat.end()) {
1174               ss << "pg " << pstr << " not exists; ";
1175               r = -ENOENT;
1176             } else {
1177               pg_stat_t workpg = workit->second;
1178
1179               // discard pgs for which user requests are pointless
1180               switch (actual_op)
1181               {
1182                 case OFR_RECOVERY:
1183                   if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING)) == 0) {
1184                     // don't return error, user script may be racing with cluster. not fatal.
1185                     ss << "pg " << pstr << " doesn't require recovery; ";
1186                     continue;
1187                   } else  if (workpg.state & PG_STATE_FORCED_RECOVERY) {
1188                     ss << "pg " << pstr << " recovery already forced; ";
1189                     // return error, as it may be a bug in user script
1190                     r = -EINVAL;
1191                     continue;
1192                   }
1193                   break;
1194                 case OFR_BACKFILL:
1195                   if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING)) == 0) {
1196                     ss << "pg " << pstr << " doesn't require backfilling; ";
1197                     continue;
1198                   } else  if (workpg.state & PG_STATE_FORCED_BACKFILL) {
1199                     ss << "pg " << pstr << " backfill already forced; ";
1200                     r = -EINVAL;
1201                     continue;
1202                   }
1203                   break;
1204                 case OFR_BACKFILL | OFR_CANCEL:
1205                   if ((workpg.state & PG_STATE_FORCED_BACKFILL) == 0) {
1206                     ss << "pg " << pstr << " backfill not forced; ";
1207                     continue;
1208                   }
1209                   break;
1210                 case OFR_RECOVERY | OFR_CANCEL:
1211                   if ((workpg.state & PG_STATE_FORCED_RECOVERY) == 0) {
1212                     ss << "pg " << pstr << " recovery not forced; ";
1213                     continue;
1214                   }
1215                   break;
1216                 default:
1217                   assert(0 == "actual_op value is not supported");
1218               }
1219
1220               parsed_pgs.push_back(std::move(parsed_pg));
1221             }
1222           }
1223         }
1224
1225         // group pgs to process by osd
1226         for (auto& pgid : parsed_pgs) {
1227           auto workit = pg_map.pg_stat.find(pgid);
1228           if (workit != pg_map.pg_stat.end()) {
1229             pg_stat_t workpg = workit->second;
1230             set<int32_t> osds(workpg.up.begin(), workpg.up.end());
1231             osds.insert(workpg.acting.begin(), workpg.acting.end());
1232             for (auto i : osds) {
1233               osdpgs[i].push_back(pgid);
1234             }
1235           }
1236         }
1237
1238       });
1239     }
1240
1241     // respond with error only when no pgs are correct
1242     // yes, in case of mixed errors, only the last one will be emitted,
1243     // but the message presented will be fine
1244     if (parsed_pgs.size() != 0) {
1245       // clear error to not confuse users/scripts
1246       r = 0;
1247     }
1248
1249     // optimize the command -> messages conversion, use only one message per distinct OSD
1250     cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1251       for (auto& i : osdpgs) {
1252         if (osdmap.is_up(i.first)) {
1253           vector<pg_t> pgvec(make_move_iterator(i.second.begin()), make_move_iterator(i.second.end()));
1254           auto p = osd_cons.find(i.first);
1255           if (p == osd_cons.end()) {
1256             ss << "osd." << i.first << " is not currently connected";
1257             r = -EAGAIN;
1258             continue;
1259           }
1260           for (auto& con : p->second) {
1261             con->send_message(new MOSDForceRecovery(monc->get_fsid(), pgvec, actual_op));
1262           }
1263           ss << "instructing pg(s) " << i.second << " on osd." << i.first << " to " << forceop << "; ";
1264         }
1265       }
1266     });
1267     ss << std::endl;
1268     cmdctx->reply(r, ss);
1269     return true;
1270   } else {
1271     r = cluster_state.with_pgmap([&](const PGMap& pg_map) {
1272         return cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1273             return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap,
1274                                           f.get(), &ss, &cmdctx->odata);
1275           });
1276       });
1277
1278     if (r != -EOPNOTSUPP) {
1279       cmdctx->reply(r, ss);
1280       return true;
1281     }
1282   }
1283
1284   // None of the special native commands, 
1285   ActivePyModule *handler = nullptr;
1286   auto py_commands = py_modules.get_py_commands();
1287   for (const auto &pyc : py_commands) {
1288     auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring);
1289     dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl;
1290     if (pyc_prefix == prefix) {
1291       handler = pyc.handler;
1292       break;
1293     }
1294   }
1295
1296   if (handler == nullptr) {
1297     ss << "No handler found for '" << prefix << "'";
1298     dout(4) << "No handler found for '" << prefix << "'" << dendl;
1299     cmdctx->reply(-EINVAL, ss);
1300     return true;
1301   } else {
1302     // Okay, now we have a handler to call, but we must not call it
1303     // in this thread, because the python handlers can do anything,
1304     // including blocking, and including calling back into mgr.
1305     dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl;
1306     finisher.queue(new FunctionContext([cmdctx, handler](int r_) {
1307       std::stringstream ds;
1308       std::stringstream ss;
1309       int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss);
1310       cmdctx->odata.append(ds);
1311       cmdctx->reply(r, ss);
1312     }));
1313     return true;
1314   }
1315 }
1316
1317 void DaemonServer::_prune_pending_service_map()
1318 {
1319   utime_t cutoff = ceph_clock_now();
1320   cutoff -= g_conf->get_val<double>("mgr_service_beacon_grace");
1321   auto p = pending_service_map.services.begin();
1322   while (p != pending_service_map.services.end()) {
1323     auto q = p->second.daemons.begin();
1324     while (q != p->second.daemons.end()) {
1325       DaemonKey key(p->first, q->first);
1326       if (!daemon_state.exists(key)) {
1327         derr << "missing key " << key << dendl;
1328         ++q;
1329         continue;
1330       }
1331       auto daemon = daemon_state.get(key);
1332       Mutex::Locker l(daemon->lock);
1333       if (daemon->last_service_beacon == utime_t()) {
1334         // we must have just restarted; assume they are alive now.
1335         daemon->last_service_beacon = ceph_clock_now();
1336         ++q;
1337         continue;
1338       }
1339       if (daemon->last_service_beacon < cutoff) {
1340         dout(10) << "pruning stale " << p->first << "." << q->first
1341                  << " last_beacon " << daemon->last_service_beacon << dendl;
1342         q = p->second.daemons.erase(q);
1343         pending_service_map_dirty = pending_service_map.epoch;
1344       } else {
1345         ++q;
1346       }
1347     }
1348     if (p->second.daemons.empty()) {
1349       p = pending_service_map.services.erase(p);
1350       pending_service_map_dirty = pending_service_map.epoch;
1351     } else {
1352       ++p;
1353     }
1354   }
1355 }
1356
1357 void DaemonServer::send_report()
1358 {
1359   if (!pgmap_ready) {
1360     if (ceph_clock_now() - started_at > g_conf->get_val<int64_t>("mgr_stats_period") * 4.0) {
1361       pgmap_ready = true;
1362       reported_osds.clear();
1363       dout(1) << "Giving up on OSDs that haven't reported yet, sending "
1364               << "potentially incomplete PG state to mon" << dendl;
1365     } else {
1366       dout(1) << "Not sending PG status to monitor yet, waiting for OSDs"
1367               << dendl;
1368       return;
1369     }
1370   }
1371
1372   auto m = new MMonMgrReport();
1373   py_modules.get_health_checks(&m->health_checks);
1374
1375   cluster_state.with_pgmap([&](const PGMap& pg_map) {
1376       cluster_state.update_delta_stats();
1377
1378       if (pending_service_map.epoch) {
1379         _prune_pending_service_map();
1380         if (pending_service_map_dirty >= pending_service_map.epoch) {
1381           pending_service_map.modified = ceph_clock_now();
1382           ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL);
1383           dout(10) << "sending service_map e" << pending_service_map.epoch
1384                    << dendl;
1385           pending_service_map.epoch++;
1386         }
1387       }
1388
1389       cluster_state.with_osdmap([&](const OSDMap& osdmap) {
1390           // FIXME: no easy way to get mon features here.  this will do for
1391           // now, though, as long as we don't make a backward-incompat change.
1392           pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL);
1393           dout(10) << pg_map << dendl;
1394
1395           pg_map.get_health_checks(g_ceph_context, osdmap,
1396                                    &m->health_checks);
1397
1398           dout(10) << m->health_checks.checks.size() << " health checks"
1399                    << dendl;
1400           dout(20) << "health checks:\n";
1401           JSONFormatter jf(true);
1402           jf.dump_object("health_checks", m->health_checks);
1403           jf.flush(*_dout);
1404           *_dout << dendl;
1405         });
1406     });
1407   // TODO? We currently do not notify the PyModules
1408   // TODO: respect needs_send, so we send the report only if we are asked to do
1409   //       so, or the state is updated.
1410   monc->send_mon_message(m);
1411 }
1412
1413 void DaemonServer::got_service_map()
1414 {
1415   Mutex::Locker l(lock);
1416
1417   cluster_state.with_servicemap([&](const ServiceMap& service_map) {
1418       if (pending_service_map.epoch == 0) {
1419         // we just started up
1420         dout(10) << "got initial map e" << service_map.epoch << dendl;
1421         pending_service_map = service_map;
1422       } else {
1423         // we we already active and therefore must have persisted it,
1424         // which means ours is the same or newer.
1425         dout(10) << "got updated map e" << service_map.epoch << dendl;
1426       }
1427       pending_service_map.epoch = service_map.epoch + 1;
1428     });
1429
1430   // cull missing daemons, populate new ones
1431   for (auto& p : pending_service_map.services) {
1432     std::set<std::string> names;
1433     for (auto& q : p.second.daemons) {
1434       names.insert(q.first);
1435       DaemonKey key(p.first, q.first);
1436       if (!daemon_state.exists(key)) {
1437         auto daemon = std::make_shared<DaemonState>(daemon_state.types);
1438         daemon->key = key;
1439         daemon->metadata = q.second.metadata;
1440         if (q.second.metadata.count("hostname")) {
1441           daemon->hostname = q.second.metadata["hostname"];
1442         }
1443         daemon->service_daemon = true;
1444         daemon_state.insert(daemon);
1445         dout(10) << "added missing " << key << dendl;
1446       }
1447     }
1448     daemon_state.cull(p.first, names);
1449   }
1450 }
1451
1452
1453 const char** DaemonServer::get_tracked_conf_keys() const
1454 {
1455   static const char *KEYS[] = {
1456     "mgr_stats_threshold",
1457     "mgr_stats_period",
1458     nullptr
1459   };
1460
1461   return KEYS;
1462 }
1463
1464 void DaemonServer::handle_conf_change(const struct md_config_t *conf,
1465                                               const std::set <std::string> &changed)
1466 {
1467   dout(4) << "ohai" << dendl;
1468   // We may be called within lock (via MCommand `config set`) or outwith the
1469   // lock (via admin socket `config set`), so handle either case.
1470   const bool initially_locked = lock.is_locked_by_me();
1471   if (!initially_locked) {
1472     lock.Lock();
1473   }
1474
1475   if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) {
1476     dout(4) << "Updating stats threshold/period on "
1477             << daemon_connections.size() << " clients" << dendl;
1478     // Send a fresh MMgrConfigure to all clients, so that they can follow
1479     // the new policy for transmitting stats
1480     for (auto &c : daemon_connections) {
1481       _send_configure(c);
1482     }
1483   }
1484 }
1485
1486 void DaemonServer::_send_configure(ConnectionRef c)
1487 {
1488   assert(lock.is_locked_by_me());
1489
1490   auto configure = new MMgrConfigure();
1491   configure->stats_period = g_conf->get_val<int64_t>("mgr_stats_period");
1492   configure->stats_threshold = g_conf->get_val<int64_t>("mgr_stats_threshold");
1493   c->send_message(configure);
1494 }
1495