Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mon / Monitor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software 
11  * Foundation.  See file COPYING.
12  * 
13  */
14
15
16 #include <sstream>
17 #include <stdlib.h>
18 #include <signal.h>
19 #include <limits.h>
20 #include <cstring>
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
23
24 #include "Monitor.h"
25 #include "common/version.h"
26
27 #include "osd/OSDMap.h"
28
29 #include "MonitorDBStore.h"
30
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
48 #include "messages/MStatfs.h"
49
50 #include "messages/MMonSubscribe.h"
51 #include "messages/MMonSubscribeAck.h"
52
53 #include "messages/MAuthReply.h"
54
55 #include "messages/MTimeCheck.h"
56 #include "messages/MPing.h"
57
58 #include "common/strtol.h"
59 #include "common/ceph_argparse.h"
60 #include "common/Timer.h"
61 #include "common/Clock.h"
62 #include "common/errno.h"
63 #include "common/perf_counters.h"
64 #include "common/admin_socket.h"
65 #include "global/signal_handler.h"
66 #include "common/Formatter.h"
67 #include "include/stringify.h"
68 #include "include/color.h"
69 #include "include/ceph_fs.h"
70 #include "include/str_list.h"
71
72 #include "OSDMonitor.h"
73 #include "MDSMonitor.h"
74 #include "MonmapMonitor.h"
75 #include "PGMonitor.h"
76 #include "LogMonitor.h"
77 #include "AuthMonitor.h"
78 #include "MgrMonitor.h"
79 #include "MgrStatMonitor.h"
80 #include "mon/QuorumService.h"
81 #include "mon/OldHealthMonitor.h"
82 #include "mon/HealthMonitor.h"
83 #include "mon/ConfigKeyService.h"
84 #include "common/config.h"
85 #include "common/cmdparse.h"
86 #include "include/assert.h"
87 #include "include/compat.h"
88 #include "perfglue/heap_profiler.h"
89
90 #include "auth/none/AuthNoneClientHandler.h"
91
92 #define dout_subsys ceph_subsys_mon
93 #undef dout_prefix
94 #define dout_prefix _prefix(_dout, this)
95 static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
96   return *_dout << "mon." << mon->name << "@" << mon->rank
97                 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
98 }
99
100 const string Monitor::MONITOR_NAME = "monitor";
101 const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
102
103
104 #undef FLAG
105 #undef COMMAND
106 #undef COMMAND_WITH_FLAG
107 #define FLAG(f) (MonCommand::FLAG_##f)
108 #define COMMAND(parsesig, helptext, modulename, req_perms, avail)       \
109   {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
110 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
111   {parsesig, helptext, modulename, req_perms, avail, flags},
112 MonCommand mon_commands[] = {
113 #include <mon/MonCommands.h>
114 };
115 MonCommand pgmonitor_commands[] = {
116 #include <mon/PGMonitorCommands.h>
117 };
118 #undef COMMAND
119 #undef COMMAND_WITH_FLAG
120
121
122 void C_MonContext::finish(int r) {
123   if (mon->is_shutdown())
124     return;
125   FunctionContext::finish(r);
126 }
127
128 Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
129                  Messenger *m, Messenger *mgr_m, MonMap *map) :
130   Dispatcher(cct_),
131   name(nm),
132   rank(-1), 
133   messenger(m),
134   con_self(m ? m->get_loopback_connection() : NULL),
135   lock("Monitor::lock"),
136   timer(cct_, lock),
137   finisher(cct_, "mon_finisher", "fin"),
138   cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
139   has_ever_joined(false),
140   logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
141   monmap(map),
142   log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
143   key_server(cct, &keyring),
144   auth_cluster_required(cct,
145                         cct->_conf->auth_supported.empty() ?
146                         cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
147   auth_service_required(cct,
148                         cct->_conf->auth_supported.empty() ?
149                         cct->_conf->auth_service_required : cct->_conf->auth_supported ),
150   mgr_messenger(mgr_m),
151   mgr_client(cct_, mgr_m),
152   pgservice(nullptr),
153   store(s),
154   
155   state(STATE_PROBING),
156   
157   elector(this),
158   required_features(0),
159   leader(0),
160   quorum_con_features(0),
161   // scrub
162   scrub_version(0),
163   scrub_event(NULL),
164   scrub_timeout_event(NULL),
165
166   // sync state
167   sync_provider_count(0),
168   sync_cookie(0),
169   sync_full(false),
170   sync_start_version(0),
171   sync_timeout_event(NULL),
172   sync_last_committed_floor(0),
173
174   timecheck_round(0),
175   timecheck_acks(0),
176   timecheck_rounds_since_clean(0),
177   timecheck_event(NULL),
178
179   paxos_service(PAXOS_NUM),
180   admin_hook(NULL),
181   routed_request_tid(0),
182   op_tracker(cct, true, 1)
183 {
184   clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
185   audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
186
187   update_log_clients();
188
189   paxos = new Paxos(this, "paxos");
190
191   paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
192   paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
193   paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
194   paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
195   paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
196   paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
197   paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
198   paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
199   paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
200
201   health_monitor = new OldHealthMonitor(this);
202   config_key_service = new ConfigKeyService(this, paxos);
203
204   mon_caps = new MonCap();
205   bool r = mon_caps->parse("allow *", NULL);
206   assert(r);
207
208   exited_quorum = ceph_clock_now();
209
210   // prepare local commands
211   local_mon_commands.resize(ARRAY_SIZE(mon_commands));
212   for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) {
213     local_mon_commands[i] = mon_commands[i];
214   }
215   MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
216
217   local_upgrading_mon_commands = local_mon_commands;
218   for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) {
219     local_upgrading_mon_commands.push_back(pgmonitor_commands[i]);
220   }
221   MonCommand::encode_vector(local_upgrading_mon_commands,
222                             local_upgrading_mon_commands_bl);
223
224   // assume our commands until we have an election.  this only means
225   // we won't reply with EINVAL before the election; any command that
226   // actually matters will wait until we have quorum etc and then
227   // retry (and revalidate).
228   leader_mon_commands = local_mon_commands;
229
230   // note: OSDMonitor may update this based on the luminous flag.
231   pgservice = mgrstatmon()->get_pg_stat_service();
232 }
233
234 Monitor::~Monitor()
235 {
236   for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
237     delete *p;
238   delete health_monitor;
239   delete config_key_service;
240   delete paxos;
241   assert(session_map.sessions.empty());
242   delete mon_caps;
243 }
244
245
246 class AdminHook : public AdminSocketHook {
247   Monitor *mon;
248 public:
249   explicit AdminHook(Monitor *m) : mon(m) {}
250   bool call(std::string command, cmdmap_t& cmdmap, std::string format,
251             bufferlist& out) override {
252     stringstream ss;
253     mon->do_admin_command(command, cmdmap, format, ss);
254     out.append(ss);
255     return true;
256   }
257 };
258
259 void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
260                                ostream& ss)
261 {
262   Mutex::Locker l(lock);
263
264   boost::scoped_ptr<Formatter> f(Formatter::create(format));
265
266   string args;
267   for (cmdmap_t::iterator p = cmdmap.begin();
268        p != cmdmap.end(); ++p) {
269     if (p->first == "prefix")
270       continue;
271     if (!args.empty())
272       args += ", ";
273     args += cmd_vartype_stringify(p->second);
274   }
275   args = "[" + args + "]";
276
277   bool read_only = (command == "mon_status" ||
278                     command == "mon metadata" ||
279                     command == "quorum_status" ||
280                     command == "ops" ||
281                     command == "sessions");
282
283   (read_only ? audit_clog->debug() : audit_clog->info())
284     << "from='admin socket' entity='admin socket' "
285     << "cmd='" << command << "' args=" << args << ": dispatch";
286
287   if (command == "mon_status") {
288     get_mon_status(f.get(), ss);
289     if (f)
290       f->flush(ss);
291   } else if (command == "quorum_status") {
292     _quorum_status(f.get(), ss);
293   } else if (command == "sync_force") {
294     string validate;
295     if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) ||
296         (validate != "--yes-i-really-mean-it")) {
297       ss << "are you SURE? this will mean the monitor store will be erased "
298             "the next time the monitor is restarted.  pass "
299             "'--yes-i-really-mean-it' if you really do.";
300       goto abort;
301     }
302     sync_force(f.get(), ss);
303   } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
304     if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
305       goto abort;
306   } else if (command == "quorum enter") {
307     elector.start_participating();
308     start_election();
309     ss << "started responding to quorum, initiated new election";
310   } else if (command == "quorum exit") {
311     start_election();
312     elector.stop_participating();
313     ss << "stopped responding to quorum, initiated new election";
314   } else if (command == "ops") {
315     (void)op_tracker.dump_ops_in_flight(f.get());
316     if (f) {
317       f->flush(ss);
318     }
319   } else if (command == "sessions") {
320
321     if (f) {
322       f->open_array_section("sessions");
323       for (auto p : session_map.sessions) {
324         f->dump_stream("session") << *p;
325       }
326       f->close_section();
327       f->flush(ss);
328     }
329
330   } else {
331     assert(0 == "bad AdminSocket command binding");
332   }
333   (read_only ? audit_clog->debug() : audit_clog->info())
334     << "from='admin socket' "
335     << "entity='admin socket' "
336     << "cmd=" << command << " "
337     << "args=" << args << ": finished";
338   return;
339
340 abort:
341   (read_only ? audit_clog->debug() : audit_clog->info())
342     << "from='admin socket' "
343     << "entity='admin socket' "
344     << "cmd=" << command << " "
345     << "args=" << args << ": aborted";
346 }
347
348 void Monitor::handle_signal(int signum)
349 {
350   assert(signum == SIGINT || signum == SIGTERM);
351   derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
352   shutdown();
353 }
354
355 CompatSet Monitor::get_initial_supported_features()
356 {
357   CompatSet::FeatureSet ceph_mon_feature_compat;
358   CompatSet::FeatureSet ceph_mon_feature_ro_compat;
359   CompatSet::FeatureSet ceph_mon_feature_incompat;
360   ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
361   ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
362   return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
363                    ceph_mon_feature_incompat);
364 }
365
366 CompatSet Monitor::get_supported_features()
367 {
368   CompatSet compat = get_initial_supported_features();
369   compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
370   compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
371   compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
372   compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
373   compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
374   compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
375   return compat;
376 }
377
378 CompatSet Monitor::get_legacy_features()
379 {
380   CompatSet::FeatureSet ceph_mon_feature_compat;
381   CompatSet::FeatureSet ceph_mon_feature_ro_compat;
382   CompatSet::FeatureSet ceph_mon_feature_incompat;
383   ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
384   return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
385                    ceph_mon_feature_incompat);
386 }
387
388 int Monitor::check_features(MonitorDBStore *store)
389 {
390   CompatSet required = get_supported_features();
391   CompatSet ondisk;
392
393   read_features_off_disk(store, &ondisk);
394
395   if (!required.writeable(ondisk)) {
396     CompatSet diff = required.unsupported(ondisk);
397     generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
398     return -EPERM;
399   }
400
401   return 0;
402 }
403
404 void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
405 {
406   bufferlist featuresbl;
407   store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
408   if (featuresbl.length() == 0) {
409     generic_dout(0) << "WARNING: mon fs missing feature list.\n"
410             << "Assuming it is old-style and introducing one." << dendl;
411     //we only want the baseline ~v.18 features assumed to be on disk.
412     //If new features are introduced this code needs to disappear or
413     //be made smarter.
414     *features = get_legacy_features();
415
416     features->encode(featuresbl);
417     auto t(std::make_shared<MonitorDBStore::Transaction>());
418     t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
419     store->apply_transaction(t);
420   } else {
421     bufferlist::iterator it = featuresbl.begin();
422     features->decode(it);
423   }
424 }
425
426 void Monitor::read_features()
427 {
428   read_features_off_disk(store, &features);
429   dout(10) << "features " << features << dendl;
430
431   calc_quorum_requirements();
432   dout(10) << "required_features " << required_features << dendl;
433 }
434
435 void Monitor::write_features(MonitorDBStore::TransactionRef t)
436 {
437   bufferlist bl;
438   features.encode(bl);
439   t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
440 }
441
442 const char** Monitor::get_tracked_conf_keys() const
443 {
444   static const char* KEYS[] = {
445     "crushtool", // helpful for testing
446     "mon_election_timeout",
447     "mon_lease",
448     "mon_lease_renew_interval_factor",
449     "mon_lease_ack_timeout_factor",
450     "mon_accept_timeout_factor",
451     // clog & admin clog
452     "clog_to_monitors",
453     "clog_to_syslog",
454     "clog_to_syslog_facility",
455     "clog_to_syslog_level",
456     "clog_to_graylog",
457     "clog_to_graylog_host",
458     "clog_to_graylog_port",
459     "host",
460     "fsid",
461     // periodic health to clog
462     "mon_health_to_clog",
463     "mon_health_to_clog_interval",
464     "mon_health_to_clog_tick_interval",
465     // scrub interval
466     "mon_scrub_interval",
467     NULL
468   };
469   return KEYS;
470 }
471
472 void Monitor::handle_conf_change(const struct md_config_t *conf,
473                                  const std::set<std::string> &changed)
474 {
475   sanitize_options();
476
477   dout(10) << __func__ << " " << changed << dendl;
478
479   if (changed.count("clog_to_monitors") ||
480       changed.count("clog_to_syslog") ||
481       changed.count("clog_to_syslog_level") ||
482       changed.count("clog_to_syslog_facility") ||
483       changed.count("clog_to_graylog") ||
484       changed.count("clog_to_graylog_host") ||
485       changed.count("clog_to_graylog_port") ||
486       changed.count("host") ||
487       changed.count("fsid")) {
488     update_log_clients();
489   }
490
491   if (changed.count("mon_health_to_clog") ||
492       changed.count("mon_health_to_clog_interval") ||
493       changed.count("mon_health_to_clog_tick_interval")) {
494     health_to_clog_update_conf(changed);
495   }
496
497   if (changed.count("mon_scrub_interval")) {
498     scrub_update_interval(conf->mon_scrub_interval);
499   }
500 }
501
502 void Monitor::update_log_clients()
503 {
504   map<string,string> log_to_monitors;
505   map<string,string> log_to_syslog;
506   map<string,string> log_channel;
507   map<string,string> log_prio;
508   map<string,string> log_to_graylog;
509   map<string,string> log_to_graylog_host;
510   map<string,string> log_to_graylog_port;
511   uuid_d fsid;
512   string host;
513
514   if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
515                                log_channel, log_prio, log_to_graylog,
516                                log_to_graylog_host, log_to_graylog_port,
517                                fsid, host))
518     return;
519
520   clog->update_config(log_to_monitors, log_to_syslog,
521                       log_channel, log_prio, log_to_graylog,
522                       log_to_graylog_host, log_to_graylog_port,
523                       fsid, host);
524
525   audit_clog->update_config(log_to_monitors, log_to_syslog,
526                             log_channel, log_prio, log_to_graylog,
527                             log_to_graylog_host, log_to_graylog_port,
528                             fsid, host);
529 }
530
531 int Monitor::sanitize_options()
532 {
533   int r = 0;
534
535   // mon_lease must be greater than mon_lease_renewal; otherwise we
536   // may incur in leases expiring before they are renewed.
537   if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
538     clog->error() << "mon_lease_renew_interval_factor ("
539                   << g_conf->mon_lease_renew_interval_factor
540                   << ") must be less than 1.0";
541     r = -EINVAL;
542   }
543
544   // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
545   // got time to renew the lease and get an ack for it. Having both options
546   // with the same value, for a given small vale, could mean timing out if
547   // the monitors happened to be overloaded -- or even under normal load for
548   // a small enough value.
549   if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
550     clog->error() << "mon_lease_ack_timeout_factor ("
551                   << g_conf->mon_lease_ack_timeout_factor
552                   << ") must be greater than 1.0";
553     r = -EINVAL;
554   }
555
556   return r;
557 }
558
559 int Monitor::preinit()
560 {
561   lock.Lock();
562
563   dout(1) << "preinit fsid " << monmap->fsid << dendl;
564
565   int r = sanitize_options();
566   if (r < 0) {
567     derr << "option sanitization failed!" << dendl;
568     lock.Unlock();
569     return r;
570   }
571
572   assert(!logger);
573   {
574     PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
575     pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
576         PerfCountersBuilder::PRIO_USEFUL);
577     pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
578         "sadd", PerfCountersBuilder::PRIO_INTERESTING);
579     pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
580         "srm", PerfCountersBuilder::PRIO_INTERESTING);
581     pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
582         "strm", PerfCountersBuilder::PRIO_USEFUL);
583     pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
584         "ecnt", PerfCountersBuilder::PRIO_USEFUL);
585     pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
586         "estt", PerfCountersBuilder::PRIO_INTERESTING);
587     pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
588         "ewon", PerfCountersBuilder::PRIO_INTERESTING);
589     pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
590         "elst", PerfCountersBuilder::PRIO_INTERESTING);
591     logger = pcb.create_perf_counters();
592     cct->get_perfcounters_collection()->add(logger);
593   }
594
595   assert(!cluster_logger);
596   {
597     PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
598     pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
599     pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
600     pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
601     pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
602     pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
603     pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
604     pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster");
605     pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space");
606     pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space");
607     pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
608     pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
609     pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
610     pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
611     pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
612     pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
613     pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
614     pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
615     pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
616     pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
617     pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
618     pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
619     pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
620     pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
621     cluster_logger = pcb.create_perf_counters();
622   }
623
624   paxos->init_logger();
625
626   // verify cluster_uuid
627   {
628     int r = check_fsid();
629     if (r == -ENOENT)
630       r = write_fsid();
631     if (r < 0) {
632       lock.Unlock();
633       return r;
634     }
635   }
636
637   // open compatset
638   read_features();
639
640   // have we ever joined a quorum?
641   has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
642   dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
643
644   if (!has_ever_joined) {
645     // impose initial quorum restrictions?
646     list<string> initial_members;
647     get_str_list(g_conf->mon_initial_members, initial_members);
648
649     if (!initial_members.empty()) {
650       dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
651
652       monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
653                                   &extra_probe_peers);
654
655       dout(10) << " monmap is " << *monmap << dendl;
656       dout(10) << " extra probe peers " << extra_probe_peers << dendl;
657     }
658   } else if (!monmap->contains(name)) {
659     derr << "not in monmap and have been in a quorum before; "
660          << "must have been removed" << dendl;
661     if (g_conf->mon_force_quorum_join) {
662       dout(0) << "we should have died but "
663               << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
664     } else {
665       derr << "commit suicide!" << dendl;
666       lock.Unlock();
667       return -ENOENT;
668     }
669   }
670
671   {
672     // We have a potentially inconsistent store state in hands. Get rid of it
673     // and start fresh.
674     bool clear_store = false;
675     if (store->exists("mon_sync", "in_sync")) {
676       dout(1) << __func__ << " clean up potentially inconsistent store state"
677               << dendl;
678       clear_store = true;
679     }
680
681     if (store->get("mon_sync", "force_sync") > 0) {
682       dout(1) << __func__ << " force sync by clearing store state" << dendl;
683       clear_store = true;
684     }
685
686     if (clear_store) {
687       set<string> sync_prefixes = get_sync_targets_names();
688       store->clear(sync_prefixes);
689     }
690   }
691
692   sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
693   dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
694
695   init_paxos();
696   health_monitor->init();
697
698   if (is_keyring_required()) {
699     // we need to bootstrap authentication keys so we can form an
700     // initial quorum.
701     if (authmon()->get_last_committed() == 0) {
702       dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
703       bufferlist bl;
704       int err = store->get("mkfs", "keyring", bl);
705       if (err == 0 && bl.length() > 0) {
706         // Attempt to decode and extract keyring only if it is found.
707         KeyRing keyring;
708         bufferlist::iterator p = bl.begin();
709         ::decode(keyring, p);
710         extract_save_mon_key(keyring);
711       }
712     }
713
714     string keyring_loc = g_conf->mon_data + "/keyring";
715
716     r = keyring.load(cct, keyring_loc);
717     if (r < 0) {
718       EntityName mon_name;
719       mon_name.set_type(CEPH_ENTITY_TYPE_MON);
720       EntityAuth mon_key;
721       if (key_server.get_auth(mon_name, mon_key)) {
722         dout(1) << "copying mon. key from old db to external keyring" << dendl;
723         keyring.add(mon_name, mon_key);
724         bufferlist bl;
725         keyring.encode_plaintext(bl);
726         write_default_keyring(bl);
727       } else {
728         derr << "unable to load initial keyring " << g_conf->keyring << dendl;
729         lock.Unlock();
730         return r;
731       }
732     }
733   }
734
735   admin_hook = new AdminHook(this);
736   AdminSocket* admin_socket = cct->get_admin_socket();
737
738   // unlock while registering to avoid mon_lock -> admin socket lock dependency.
739   lock.Unlock();
740   r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
741                                      "show current monitor status");
742   assert(r == 0);
743   r = admin_socket->register_command("quorum_status", "quorum_status",
744                                      admin_hook, "show current quorum status");
745   assert(r == 0);
746   r = admin_socket->register_command("sync_force",
747                                      "sync_force name=validate,"
748                                      "type=CephChoices,"
749                                      "strings=--yes-i-really-mean-it",
750                                      admin_hook,
751                                      "force sync of and clear monitor store");
752   assert(r == 0);
753   r = admin_socket->register_command("add_bootstrap_peer_hint",
754                                      "add_bootstrap_peer_hint name=addr,"
755                                      "type=CephIPAddr",
756                                      admin_hook,
757                                      "add peer address as potential bootstrap"
758                                      " peer for cluster bringup");
759   assert(r == 0);
760   r = admin_socket->register_command("quorum enter", "quorum enter",
761                                      admin_hook,
762                                      "force monitor back into quorum");
763   assert(r == 0);
764   r = admin_socket->register_command("quorum exit", "quorum exit",
765                                      admin_hook,
766                                      "force monitor out of the quorum");
767   assert(r == 0);
768   r = admin_socket->register_command("ops",
769                                      "ops",
770                                      admin_hook,
771                                      "show the ops currently in flight");
772   assert(r == 0);
773   r = admin_socket->register_command("sessions",
774                                      "sessions",
775                                      admin_hook,
776                                      "list existing sessions");
777   assert(r == 0);
778
779   lock.Lock();
780
781   // add ourselves as a conf observer
782   g_conf->add_observer(this);
783
784   lock.Unlock();
785   return 0;
786 }
787
788 int Monitor::init()
789 {
790   dout(2) << "init" << dendl;
791   Mutex::Locker l(lock);
792
793   finisher.start();
794
795   // start ticker
796   timer.init();
797   new_tick();
798
799   cpu_tp.start();
800
801   // i'm ready!
802   messenger->add_dispatcher_tail(this);
803
804   mgr_client.init();
805   mgr_messenger->add_dispatcher_tail(&mgr_client);
806   mgr_messenger->add_dispatcher_tail(this);  // for auth ms_* calls
807
808   bootstrap();
809   // add features of myself into feature_map
810   session_map.feature_map.add_mon(con_self->get_features());
811   return 0;
812 }
813
814 void Monitor::init_paxos()
815 {
816   dout(10) << __func__ << dendl;
817   paxos->init();
818
819   // init services
820   for (int i = 0; i < PAXOS_NUM; ++i) {
821     paxos_service[i]->init();
822   }
823
824   refresh_from_paxos(NULL);
825 }
826
827 void Monitor::refresh_from_paxos(bool *need_bootstrap)
828 {
829   dout(10) << __func__ << dendl;
830
831   bufferlist bl;
832   int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
833   if (r >= 0) {
834     try {
835       bufferlist::iterator p = bl.begin();
836       ::decode(fingerprint, p);
837     }
838     catch (buffer::error& e) {
839       dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
840     }
841   } else {
842     dout(10) << __func__ << " no cluster_fingerprint" << dendl;
843   }
844
845   for (int i = 0; i < PAXOS_NUM; ++i) {
846     paxos_service[i]->refresh(need_bootstrap);
847   }
848   for (int i = 0; i < PAXOS_NUM; ++i) {
849     paxos_service[i]->post_refresh();
850   }
851   load_metadata();
852 }
853
854 void Monitor::register_cluster_logger()
855 {
856   if (!cluster_logger_registered) {
857     dout(10) << "register_cluster_logger" << dendl;
858     cluster_logger_registered = true;
859     cct->get_perfcounters_collection()->add(cluster_logger);
860   } else {
861     dout(10) << "register_cluster_logger - already registered" << dendl;
862   }
863 }
864
865 void Monitor::unregister_cluster_logger()
866 {
867   if (cluster_logger_registered) {
868     dout(10) << "unregister_cluster_logger" << dendl;
869     cluster_logger_registered = false;
870     cct->get_perfcounters_collection()->remove(cluster_logger);
871   } else {
872     dout(10) << "unregister_cluster_logger - not registered" << dendl;
873   }
874 }
875
876 void Monitor::update_logger()
877 {
878   cluster_logger->set(l_cluster_num_mon, monmap->size());
879   cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
880 }
881
882 void Monitor::shutdown()
883 {
884   dout(1) << "shutdown" << dendl;
885
886   lock.Lock();
887
888   wait_for_paxos_write();
889
890   state = STATE_SHUTDOWN;
891
892   g_conf->remove_observer(this);
893
894   if (admin_hook) {
895     AdminSocket* admin_socket = cct->get_admin_socket();
896     admin_socket->unregister_command("mon_status");
897     admin_socket->unregister_command("quorum_status");
898     admin_socket->unregister_command("sync_force");
899     admin_socket->unregister_command("add_bootstrap_peer_hint");
900     admin_socket->unregister_command("quorum enter");
901     admin_socket->unregister_command("quorum exit");
902     admin_socket->unregister_command("ops");
903     admin_socket->unregister_command("sessions");
904     delete admin_hook;
905     admin_hook = NULL;
906   }
907
908   elector.shutdown();
909
910   mgr_client.shutdown();
911
912   lock.Unlock();
913   finisher.wait_for_empty();
914   finisher.stop();
915   lock.Lock();
916
917   // clean up
918   paxos->shutdown();
919   for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
920     (*p)->shutdown();
921   health_monitor->shutdown();
922
923   finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
924   finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
925
926   timer.shutdown();
927
928   cpu_tp.stop();
929
930   remove_all_sessions();
931
932   if (logger) {
933     cct->get_perfcounters_collection()->remove(logger);
934     delete logger;
935     logger = NULL;
936   }
937   if (cluster_logger) {
938     if (cluster_logger_registered)
939       cct->get_perfcounters_collection()->remove(cluster_logger);
940     delete cluster_logger;
941     cluster_logger = NULL;
942   }
943
944   log_client.shutdown();
945
946   // unlock before msgr shutdown...
947   lock.Unlock();
948
949   messenger->shutdown();  // last thing!  ceph_mon.cc will delete mon.
950   mgr_messenger->shutdown();
951 }
952
953 void Monitor::wait_for_paxos_write()
954 {
955   if (paxos->is_writing() || paxos->is_writing_previous()) {
956     dout(10) << __func__ << " flushing pending write" << dendl;
957     lock.Unlock();
958     store->flush();
959     lock.Lock();
960     dout(10) << __func__ << " flushed pending write" << dendl;
961   }
962 }
963
964 void Monitor::bootstrap()
965 {
966   dout(10) << "bootstrap" << dendl;
967   wait_for_paxos_write();
968
969   sync_reset_requester();
970   unregister_cluster_logger();
971   cancel_probe_timeout();
972
973   // note my rank
974   int newrank = monmap->get_rank(messenger->get_myaddr());
975   if (newrank < 0 && rank >= 0) {
976     // was i ever part of the quorum?
977     if (has_ever_joined) {
978       dout(0) << " removed from monmap, suicide." << dendl;
979       exit(0);
980     }
981   }
982   if (newrank != rank) {
983     dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
984     messenger->set_myname(entity_name_t::MON(newrank));
985     rank = newrank;
986
987     // reset all connections, or else our peers will think we are someone else.
988     messenger->mark_down_all();
989   }
990
991   // reset
992   state = STATE_PROBING;
993
994   _reset();
995
996   // sync store
997   if (g_conf->mon_compact_on_bootstrap) {
998     dout(10) << "bootstrap -- triggering compaction" << dendl;
999     store->compact();
1000     dout(10) << "bootstrap -- finished compaction" << dendl;
1001   }
1002
1003   // singleton monitor?
1004   if (monmap->size() == 1 && rank == 0) {
1005     win_standalone_election();
1006     return;
1007   }
1008
1009   reset_probe_timeout();
1010
1011   // i'm outside the quorum
1012   if (monmap->contains(name))
1013     outside_quorum.insert(name);
1014
1015   // probe monitors
1016   dout(10) << "probing other monitors" << dendl;
1017   for (unsigned i = 0; i < monmap->size(); i++) {
1018     if ((int)i != rank)
1019       messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
1020                               monmap->get_inst(i));
1021   }
1022   for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
1023        p != extra_probe_peers.end();
1024        ++p) {
1025     if (*p != messenger->get_myaddr()) {
1026       entity_inst_t i;
1027       i.name = entity_name_t::MON(-1);
1028       i.addr = *p;
1029       messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
1030     }
1031   }
1032 }
1033
1034 bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
1035 {
1036   string addrstr;
1037   if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
1038     ss << "unable to parse address string value '"
1039          << cmd_vartype_stringify(cmdmap["addr"]) << "'";
1040     return false;
1041   }
1042   dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
1043            << addrstr << "'" << dendl;
1044
1045   entity_addr_t addr;
1046   const char *end = 0;
1047   if (!addr.parse(addrstr.c_str(), &end)) {
1048     ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1049     return false;
1050   }
1051
1052   if (is_leader() || is_peon()) {
1053     ss << "mon already active; ignoring bootstrap hint";
1054     return true;
1055   }
1056
1057   if (addr.get_port() == 0)
1058     addr.set_port(CEPH_MON_PORT);
1059
1060   extra_probe_peers.insert(addr);
1061   ss << "adding peer " << addr << " to list: " << extra_probe_peers;
1062   return true;
1063 }
1064
1065 // called by bootstrap(), or on leader|peon -> electing
1066 void Monitor::_reset()
1067 {
1068   dout(10) << __func__ << dendl;
1069
1070   cancel_probe_timeout();
1071   timecheck_finish();
1072   health_events_cleanup();
1073   health_check_log_times.clear();
1074   scrub_event_cancel();
1075
1076   leader_since = utime_t();
1077   if (!quorum.empty()) {
1078     exited_quorum = ceph_clock_now();
1079   }
1080   quorum.clear();
1081   outside_quorum.clear();
1082   quorum_feature_map.clear();
1083
1084   scrub_reset();
1085
1086   paxos->restart();
1087
1088   for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
1089     (*p)->restart();
1090   health_monitor->finish();
1091 }
1092
1093
1094 // -----------------------------------------------------------
1095 // sync
1096
1097 set<string> Monitor::get_sync_targets_names()
1098 {
1099   set<string> targets;
1100   targets.insert(paxos->get_name());
1101   for (int i = 0; i < PAXOS_NUM; ++i)
1102     paxos_service[i]->get_store_prefixes(targets);
1103   ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1104   assert(config_key_service_ptr);
1105   config_key_service_ptr->get_store_prefixes(targets);
1106   return targets;
1107 }
1108
1109
1110 void Monitor::sync_timeout()
1111 {
1112   dout(10) << __func__ << dendl;
1113   assert(state == STATE_SYNCHRONIZING);
1114   bootstrap();
1115 }
1116
1117 void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1118 {
1119   dout(1) << __func__ << dendl;
1120
1121   MonMap latest_monmap;
1122
1123   // Grab latest monmap from MonmapMonitor
1124   bufferlist monmon_bl;
1125   int err = monmon()->get_monmap(monmon_bl);
1126   if (err < 0) {
1127     if (err != -ENOENT) {
1128       derr << __func__
1129            << " something wrong happened while reading the store: "
1130            << cpp_strerror(err) << dendl;
1131       assert(0 == "error reading the store");
1132     }
1133   } else {
1134     latest_monmap.decode(monmon_bl);
1135   }
1136
1137   // Grab last backed up monmap (if any) and compare epochs
1138   if (store->exists("mon_sync", "latest_monmap")) {
1139     bufferlist backup_bl;
1140     int err = store->get("mon_sync", "latest_monmap", backup_bl);
1141     if (err < 0) {
1142       derr << __func__
1143            << " something wrong happened while reading the store: "
1144            << cpp_strerror(err) << dendl;
1145       assert(0 == "error reading the store");
1146     }
1147     assert(backup_bl.length() > 0);
1148
1149     MonMap backup_monmap;
1150     backup_monmap.decode(backup_bl);
1151
1152     if (backup_monmap.epoch > latest_monmap.epoch)
1153       latest_monmap = backup_monmap;
1154   }
1155
1156   // Check if our current monmap's epoch is greater than the one we've
1157   // got so far.
1158   if (monmap->epoch > latest_monmap.epoch)
1159     latest_monmap = *monmap;
1160
1161   dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1162
1163   latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1164 }
1165
1166 void Monitor::sync_reset_requester()
1167 {
1168   dout(10) << __func__ << dendl;
1169
1170   if (sync_timeout_event) {
1171     timer.cancel_event(sync_timeout_event);
1172     sync_timeout_event = NULL;
1173   }
1174
1175   sync_provider = entity_inst_t();
1176   sync_cookie = 0;
1177   sync_full = false;
1178   sync_start_version = 0;
1179 }
1180
1181 void Monitor::sync_reset_provider()
1182 {
1183   dout(10) << __func__ << dendl;
1184   sync_providers.clear();
1185 }
1186
1187 void Monitor::sync_start(entity_inst_t &other, bool full)
1188 {
1189   dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
1190
1191   assert(state == STATE_PROBING ||
1192          state == STATE_SYNCHRONIZING);
1193   state = STATE_SYNCHRONIZING;
1194
1195   // make sure are not a provider for anyone!
1196   sync_reset_provider();
1197
1198   sync_full = full;
1199
1200   if (sync_full) {
1201     // stash key state, and mark that we are syncing
1202     auto t(std::make_shared<MonitorDBStore::Transaction>());
1203     sync_stash_critical_state(t);
1204     t->put("mon_sync", "in_sync", 1);
1205
1206     sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
1207     dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1208              << sync_last_committed_floor << dendl;
1209     t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1210
1211     store->apply_transaction(t);
1212
1213     assert(g_conf->mon_sync_requester_kill_at != 1);
1214
1215     // clear the underlying store
1216     set<string> targets = get_sync_targets_names();
1217     dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1218     store->clear(targets);
1219
1220     // make sure paxos knows it has been reset.  this prevents a
1221     // bootstrap and then different probe reply order from possibly
1222     // deciding a partial or no sync is needed.
1223     paxos->init();
1224
1225     assert(g_conf->mon_sync_requester_kill_at != 2);
1226   }
1227
1228   // assume 'other' as the leader. We will update the leader once we receive
1229   // a reply to the sync start.
1230   sync_provider = other;
1231
1232   sync_reset_timeout();
1233
1234   MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1235   if (!sync_full)
1236     m->last_committed = paxos->get_version();
1237   messenger->send_message(m, sync_provider);
1238 }
1239
1240 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1241 {
1242   dout(10) << __func__ << dendl;
1243   bufferlist backup_monmap;
1244   sync_obtain_latest_monmap(backup_monmap);
1245   assert(backup_monmap.length() > 0);
1246   t->put("mon_sync", "latest_monmap", backup_monmap);
1247 }
1248
1249 void Monitor::sync_reset_timeout()
1250 {
1251   dout(10) << __func__ << dendl;
1252   if (sync_timeout_event)
1253     timer.cancel_event(sync_timeout_event);
1254   sync_timeout_event = timer.add_event_after(
1255     g_conf->mon_sync_timeout,
1256     new C_MonContext(this, [this](int) {
1257         sync_timeout();
1258       }));
1259 }
1260
1261 void Monitor::sync_finish(version_t last_committed)
1262 {
1263   dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1264
1265   assert(g_conf->mon_sync_requester_kill_at != 7);
1266
1267   if (sync_full) {
1268     // finalize the paxos commits
1269     auto tx(std::make_shared<MonitorDBStore::Transaction>());
1270     paxos->read_and_prepare_transactions(tx, sync_start_version,
1271                                          last_committed);
1272     tx->put(paxos->get_name(), "last_committed", last_committed);
1273
1274     dout(30) << __func__ << " final tx dump:\n";
1275     JSONFormatter f(true);
1276     tx->dump(&f);
1277     f.flush(*_dout);
1278     *_dout << dendl;
1279
1280     store->apply_transaction(tx);
1281   }
1282
1283   assert(g_conf->mon_sync_requester_kill_at != 8);
1284
1285   auto t(std::make_shared<MonitorDBStore::Transaction>());
1286   t->erase("mon_sync", "in_sync");
1287   t->erase("mon_sync", "force_sync");
1288   t->erase("mon_sync", "last_committed_floor");
1289   store->apply_transaction(t);
1290
1291   assert(g_conf->mon_sync_requester_kill_at != 9);
1292
1293   init_paxos();
1294
1295   assert(g_conf->mon_sync_requester_kill_at != 10);
1296
1297   bootstrap();
1298 }
1299
1300 void Monitor::handle_sync(MonOpRequestRef op)
1301 {
1302   MMonSync *m = static_cast<MMonSync*>(op->get_req());
1303   dout(10) << __func__ << " " << *m << dendl;
1304   switch (m->op) {
1305
1306     // provider ---------
1307
1308   case MMonSync::OP_GET_COOKIE_FULL:
1309   case MMonSync::OP_GET_COOKIE_RECENT:
1310     handle_sync_get_cookie(op);
1311     break;
1312   case MMonSync::OP_GET_CHUNK:
1313     handle_sync_get_chunk(op);
1314     break;
1315
1316     // client -----------
1317
1318   case MMonSync::OP_COOKIE:
1319     handle_sync_cookie(op);
1320     break;
1321
1322   case MMonSync::OP_CHUNK:
1323   case MMonSync::OP_LAST_CHUNK:
1324     handle_sync_chunk(op);
1325     break;
1326   case MMonSync::OP_NO_COOKIE:
1327     handle_sync_no_cookie(op);
1328     break;
1329
1330   default:
1331     dout(0) << __func__ << " unknown op " << m->op << dendl;
1332     assert(0 == "unknown op");
1333   }
1334 }
1335
1336 // leader
1337
1338 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1339 {
1340   MMonSync *m = static_cast<MMonSync*>(op->get_req());
1341   MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1342   m->get_connection()->send_message(reply);
1343 }
1344
1345 void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1346 {
1347   MMonSync *m = static_cast<MMonSync*>(op->get_req());
1348   if (is_synchronizing()) {
1349     _sync_reply_no_cookie(op);
1350     return;
1351   }
1352
1353   assert(g_conf->mon_sync_provider_kill_at != 1);
1354
1355   // make sure they can understand us.
1356   if ((required_features ^ m->get_connection()->get_features()) &
1357       required_features) {
1358     dout(5) << " ignoring peer mon." << m->get_source().num()
1359             << " has features " << std::hex
1360             << m->get_connection()->get_features()
1361             << " but we require " << required_features << std::dec << dendl;
1362     return;
1363   }
1364
1365   // make up a unique cookie.  include election epoch (which persists
1366   // across restarts for the whole cluster) and a counter for this
1367   // process instance.  there is no need to be unique *across*
1368   // monitors, though.
1369   uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1370   assert(sync_providers.count(cookie) == 0);
1371
1372   dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1373
1374   SyncProvider& sp = sync_providers[cookie];
1375   sp.cookie = cookie;
1376   sp.entity = m->get_source_inst();
1377   sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1378
1379   set<string> sync_targets;
1380   if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1381     // full scan
1382     sync_targets = get_sync_targets_names();
1383     sp.last_committed = paxos->get_version();
1384     sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1385     sp.full = true;
1386     dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1387   } else {
1388     // just catch up paxos
1389     sp.last_committed = m->last_committed;
1390   }
1391   dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1392
1393   MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1394   reply->last_committed = sp.last_committed;
1395   m->get_connection()->send_message(reply);
1396 }
1397
1398 void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1399 {
1400   MMonSync *m = static_cast<MMonSync*>(op->get_req());
1401   dout(10) << __func__ << " " << *m << dendl;
1402
1403   if (sync_providers.count(m->cookie) == 0) {
1404     dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1405     _sync_reply_no_cookie(op);
1406     return;
1407   }
1408
1409   assert(g_conf->mon_sync_provider_kill_at != 2);
1410
1411   SyncProvider& sp = sync_providers[m->cookie];
1412   sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1413
1414   if (sp.last_committed < paxos->get_first_committed() &&
1415       paxos->get_first_committed() > 1) {
1416     dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1417              << " < our fc " << paxos->get_first_committed() << dendl;
1418     sync_providers.erase(m->cookie);
1419     _sync_reply_no_cookie(op);
1420     return;
1421   }
1422
1423   MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1424   auto tx(std::make_shared<MonitorDBStore::Transaction>());
1425
1426   int left = g_conf->mon_sync_max_payload_size;
1427   while (sp.last_committed < paxos->get_version() && left > 0) {
1428     bufferlist bl;
1429     sp.last_committed++;
1430
1431     int err = store->get(paxos->get_name(), sp.last_committed, bl);
1432     assert(err == 0);
1433
1434     tx->put(paxos->get_name(), sp.last_committed, bl);
1435     left -= bl.length();
1436     dout(20) << __func__ << " including paxos state " << sp.last_committed
1437              << dendl;
1438   }
1439   reply->last_committed = sp.last_committed;
1440
1441   if (sp.full && left > 0) {
1442     sp.synchronizer->get_chunk_tx(tx, left);
1443     sp.last_key = sp.synchronizer->get_last_key();
1444     reply->last_key = sp.last_key;
1445   }
1446
1447   if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1448       sp.last_committed < paxos->get_version()) {
1449     dout(10) << __func__ << " chunk, through version " << sp.last_committed
1450              << " key " << sp.last_key << dendl;
1451   } else {
1452     dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1453              << " key " << sp.last_key << dendl;
1454     reply->op = MMonSync::OP_LAST_CHUNK;
1455
1456     assert(g_conf->mon_sync_provider_kill_at != 3);
1457
1458     // clean up our local state
1459     sync_providers.erase(sp.cookie);
1460   }
1461
1462   ::encode(*tx, reply->chunk_bl);
1463
1464   m->get_connection()->send_message(reply);
1465 }
1466
1467 // requester
1468
1469 void Monitor::handle_sync_cookie(MonOpRequestRef op)
1470 {
1471   MMonSync *m = static_cast<MMonSync*>(op->get_req());
1472   dout(10) << __func__ << " " << *m << dendl;
1473   if (sync_cookie) {
1474     dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1475     return;
1476   }
1477   if (m->get_source_inst() != sync_provider) {
1478     dout(10) << __func__ << " source does not match, discarding" << dendl;
1479     return;
1480   }
1481   sync_cookie = m->cookie;
1482   sync_start_version = m->last_committed;
1483
1484   sync_reset_timeout();
1485   sync_get_next_chunk();
1486
1487   assert(g_conf->mon_sync_requester_kill_at != 3);
1488 }
1489
1490 void Monitor::sync_get_next_chunk()
1491 {
1492   dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1493   if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
1494     dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
1495     usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
1496   }
1497   MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1498   messenger->send_message(r, sync_provider);
1499
1500   assert(g_conf->mon_sync_requester_kill_at != 4);
1501 }
1502
1503 void Monitor::handle_sync_chunk(MonOpRequestRef op)
1504 {
1505   MMonSync *m = static_cast<MMonSync*>(op->get_req());
1506   dout(10) << __func__ << " " << *m << dendl;
1507
1508   if (m->cookie != sync_cookie) {
1509     dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1510     return;
1511   }
1512   if (m->get_source_inst() != sync_provider) {
1513     dout(10) << __func__ << " source does not match, discarding" << dendl;
1514     return;
1515   }
1516
1517   assert(state == STATE_SYNCHRONIZING);
1518   assert(g_conf->mon_sync_requester_kill_at != 5);
1519
1520   auto tx(std::make_shared<MonitorDBStore::Transaction>());
1521   tx->append_from_encoded(m->chunk_bl);
1522
1523   dout(30) << __func__ << " tx dump:\n";
1524   JSONFormatter f(true);
1525   tx->dump(&f);
1526   f.flush(*_dout);
1527   *_dout << dendl;
1528
1529   store->apply_transaction(tx);
1530
1531   assert(g_conf->mon_sync_requester_kill_at != 6);
1532
1533   if (!sync_full) {
1534     dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1535     auto tx(std::make_shared<MonitorDBStore::Transaction>());
1536     paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1537                                          m->last_committed);
1538     tx->put(paxos->get_name(), "last_committed", m->last_committed);
1539
1540     dout(30) << __func__ << " tx dump:\n";
1541     JSONFormatter f(true);
1542     tx->dump(&f);
1543     f.flush(*_dout);
1544     *_dout << dendl;
1545
1546     store->apply_transaction(tx);
1547     paxos->init();  // to refresh what we just wrote
1548   }
1549
1550   if (m->op == MMonSync::OP_CHUNK) {
1551     sync_reset_timeout();
1552     sync_get_next_chunk();
1553   } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1554     sync_finish(m->last_committed);
1555   }
1556 }
1557
1558 void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1559 {
1560   dout(10) << __func__ << dendl;
1561   bootstrap();
1562 }
1563
1564 void Monitor::sync_trim_providers()
1565 {
1566   dout(20) << __func__ << dendl;
1567
1568   utime_t now = ceph_clock_now();
1569   map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1570   while (p != sync_providers.end()) {
1571     if (now > p->second.timeout) {
1572       dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
1573       sync_providers.erase(p++);
1574     } else {
1575       ++p;
1576     }
1577   }
1578 }
1579
1580 // ---------------------------------------------------
1581 // probe
1582
1583 void Monitor::cancel_probe_timeout()
1584 {
1585   if (probe_timeout_event) {
1586     dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1587     timer.cancel_event(probe_timeout_event);
1588     probe_timeout_event = NULL;
1589   } else {
1590     dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1591   }
1592 }
1593
1594 void Monitor::reset_probe_timeout()
1595 {
1596   cancel_probe_timeout();
1597   probe_timeout_event = new C_MonContext(this, [this](int r) {
1598       probe_timeout(r);
1599     });
1600   double t = g_conf->mon_probe_timeout;
1601   if (timer.add_event_after(t, probe_timeout_event)) {
1602     dout(10) << "reset_probe_timeout " << probe_timeout_event
1603              << " after " << t << " seconds" << dendl;
1604   } else {
1605     probe_timeout_event = nullptr;
1606   }
1607 }
1608
1609 void Monitor::probe_timeout(int r)
1610 {
1611   dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1612   assert(is_probing() || is_synchronizing());
1613   assert(probe_timeout_event);
1614   probe_timeout_event = NULL;
1615   bootstrap();
1616 }
1617
1618 void Monitor::handle_probe(MonOpRequestRef op)
1619 {
1620   MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1621   dout(10) << "handle_probe " << *m << dendl;
1622
1623   if (m->fsid != monmap->fsid) {
1624     dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1625     return;
1626   }
1627
1628   switch (m->op) {
1629   case MMonProbe::OP_PROBE:
1630     handle_probe_probe(op);
1631     break;
1632
1633   case MMonProbe::OP_REPLY:
1634     handle_probe_reply(op);
1635     break;
1636
1637   case MMonProbe::OP_MISSING_FEATURES:
1638     derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
1639          << ", required " << m->required_features
1640          << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
1641          << dendl;
1642     break;
1643   }
1644 }
1645
1646 void Monitor::handle_probe_probe(MonOpRequestRef op)
1647 {
1648   MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1649
1650   dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1651            << " features " << m->get_connection()->get_features() << dendl;
1652   uint64_t missing = required_features & ~m->get_connection()->get_features();
1653   if (missing) {
1654     dout(1) << " peer " << m->get_source_addr() << " missing features "
1655             << missing << dendl;
1656     if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
1657       MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1658                                    name, has_ever_joined);
1659       m->required_features = required_features;
1660       m->get_connection()->send_message(r);
1661     }
1662     goto out;
1663   }
1664
1665   if (!is_probing() && !is_synchronizing()) {
1666     // If the probing mon is way ahead of us, we need to re-bootstrap.
1667     // Normally we capture this case when we initially bootstrap, but
1668     // it is possible we pass those checks (we overlap with
1669     // quorum-to-be) but fail to join a quorum before it moves past
1670     // us.  We need to be kicked back to bootstrap so we can
1671     // synchonize, not keep calling elections.
1672     if (paxos->get_version() + 1 < m->paxos_first_version) {
1673       dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1674               << "ahead of us, re-bootstrapping" << dendl;
1675       bootstrap();
1676       goto out;
1677
1678     }
1679   }
1680   
1681   MMonProbe *r;
1682   r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
1683   r->name = name;
1684   r->quorum = quorum;
1685   monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1686   r->paxos_first_version = paxos->get_first_committed();
1687   r->paxos_last_version = paxos->get_version();
1688   m->get_connection()->send_message(r);
1689
1690   // did we discover a peer here?
1691   if (!monmap->contains(m->get_source_addr())) {
1692     dout(1) << " adding peer " << m->get_source_addr()
1693             << " to list of hints" << dendl;
1694     extra_probe_peers.insert(m->get_source_addr());
1695   }
1696
1697  out:
1698   return;
1699 }
1700
1701 void Monitor::handle_probe_reply(MonOpRequestRef op)
1702 {
1703   MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1704   dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
1705   dout(10) << " monmap is " << *monmap << dendl;
1706
1707   // discover name and addrs during probing or electing states.
1708   if (!is_probing() && !is_electing()) {
1709     return;
1710   }
1711
1712   // newer map, or they've joined a quorum and we haven't?
1713   bufferlist mybl;
1714   monmap->encode(mybl, m->get_connection()->get_features());
1715   // make sure it's actually different; the checks below err toward
1716   // taking the other guy's map, which could cause us to loop.
1717   if (!mybl.contents_equal(m->monmap_bl)) {
1718     MonMap *newmap = new MonMap;
1719     newmap->decode(m->monmap_bl);
1720     if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1721                                !has_ever_joined)) {
1722       dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1723                << ", mine was " << monmap->get_epoch() << dendl;
1724       delete newmap;
1725       monmap->decode(m->monmap_bl);
1726
1727       bootstrap();
1728       return;
1729     }
1730     delete newmap;
1731   }
1732
1733   // rename peer?
1734   string peer_name = monmap->get_name(m->get_source_addr());
1735   if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1736     dout(10) << " renaming peer " << m->get_source_addr() << " "
1737              << peer_name << " -> " << m->name << " in my monmap"
1738              << dendl;
1739     monmap->rename(peer_name, m->name);
1740
1741     if (is_electing()) {
1742       bootstrap();
1743       return;
1744     }
1745   } else {
1746     dout(10) << " peer name is " << peer_name << dendl;
1747   }
1748
1749   // new initial peer?
1750   if (monmap->get_epoch() == 0 &&
1751       monmap->contains(m->name) &&
1752       monmap->get_addr(m->name).is_blank_ip()) {
1753     dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
1754     monmap->set_addr(m->name, m->get_source_addr());
1755
1756     bootstrap();
1757     return;
1758   }
1759
1760   // end discover phase
1761   if (!is_probing()) {
1762     return;
1763   }
1764
1765   assert(paxos != NULL);
1766
1767   if (is_synchronizing()) {
1768     dout(10) << " currently syncing" << dendl;
1769     return;
1770   }
1771
1772   entity_inst_t other = m->get_source_inst();
1773
1774   if (m->paxos_last_version < sync_last_committed_floor) {
1775     dout(10) << " peer paxos versions [" << m->paxos_first_version
1776              << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
1777              << sync_last_committed_floor << ", ignoring"
1778              << dendl;
1779   } else {
1780     if (paxos->get_version() < m->paxos_first_version &&
1781         m->paxos_first_version > 1) {  // no need to sync if we're 0 and they start at 1.
1782       dout(10) << " peer paxos first versions [" << m->paxos_first_version
1783                << "," << m->paxos_last_version << "]"
1784                << " vs my version " << paxos->get_version()
1785                << " (too far ahead)"
1786                << dendl;
1787       cancel_probe_timeout();
1788       sync_start(other, true);
1789       return;
1790     }
1791     if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
1792       dout(10) << " peer paxos last version " << m->paxos_last_version
1793                << " vs my version " << paxos->get_version()
1794                << " (too far ahead)"
1795                << dendl;
1796       cancel_probe_timeout();
1797       sync_start(other, false);
1798       return;
1799     }
1800   }
1801
1802   // is there an existing quorum?
1803   if (m->quorum.size()) {
1804     dout(10) << " existing quorum " << m->quorum << dendl;
1805
1806     dout(10) << " peer paxos version " << m->paxos_last_version
1807              << " vs my version " << paxos->get_version()
1808              << " (ok)"
1809              << dendl;
1810
1811     if (monmap->contains(name) &&
1812         !monmap->get_addr(name).is_blank_ip()) {
1813       // i'm part of the cluster; just initiate a new election
1814       start_election();
1815     } else {
1816       dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
1817       messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
1818                               monmap->get_inst(*m->quorum.begin()));
1819     }
1820   } else {
1821     if (monmap->contains(m->name)) {
1822       dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
1823       outside_quorum.insert(m->name);
1824     } else {
1825       dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
1826       return;
1827     }
1828
1829     unsigned need = monmap->size() / 2 + 1;
1830     dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
1831     if (outside_quorum.size() >= need) {
1832       if (outside_quorum.count(name)) {
1833         dout(10) << " that's enough to form a new quorum, calling election" << dendl;
1834         start_election();
1835       } else {
1836         dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
1837       }
1838     } else {
1839       dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
1840     }
1841   }
1842 }
1843
1844 void Monitor::join_election()
1845 {
1846   dout(10) << __func__ << dendl;
1847   wait_for_paxos_write();
1848   _reset();
1849   state = STATE_ELECTING;
1850
1851   logger->inc(l_mon_num_elections);
1852 }
1853
1854 void Monitor::start_election()
1855 {
1856   dout(10) << "start_election" << dendl;
1857   wait_for_paxos_write();
1858   _reset();
1859   state = STATE_ELECTING;
1860
1861   logger->inc(l_mon_num_elections);
1862   logger->inc(l_mon_election_call);
1863
1864   clog->info() << "mon." << name << " calling new monitor election";
1865   elector.call_election();
1866 }
1867
1868 void Monitor::win_standalone_election()
1869 {
1870   dout(1) << "win_standalone_election" << dendl;
1871
1872   // bump election epoch, in case the previous epoch included other
1873   // monitors; we need to be able to make the distinction.
1874   elector.init();
1875   elector.advance_epoch();
1876
1877   rank = monmap->get_rank(name);
1878   assert(rank == 0);
1879   set<int> q;
1880   q.insert(rank);
1881
1882   map<int,Metadata> metadata;
1883   collect_metadata(&metadata[0]);
1884
1885   win_election(elector.get_epoch(), q,
1886                CEPH_FEATURES_ALL,
1887                ceph::features::mon::get_supported(),
1888                metadata);
1889 }
1890
1891 const utime_t& Monitor::get_leader_since() const
1892 {
1893   assert(state == STATE_LEADER);
1894   return leader_since;
1895 }
1896
1897 epoch_t Monitor::get_epoch()
1898 {
1899   return elector.get_epoch();
1900 }
1901
1902 void Monitor::_finish_svc_election()
1903 {
1904   assert(state == STATE_LEADER || state == STATE_PEON);
1905
1906   for (auto p : paxos_service) {
1907     // we already called election_finished() on monmon(); avoid callig twice
1908     if (state == STATE_LEADER && p == monmon())
1909       continue;
1910     p->election_finished();
1911   }
1912 }
1913
1914 void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
1915                            const mon_feature_t& mon_features,
1916                            const map<int,Metadata>& metadata)
1917 {
1918   dout(10) << __func__ << " epoch " << epoch << " quorum " << active
1919            << " features " << features
1920            << " mon_features " << mon_features
1921            << dendl;
1922   assert(is_electing());
1923   state = STATE_LEADER;
1924   leader_since = ceph_clock_now();
1925   leader = rank;
1926   quorum = active;
1927   quorum_con_features = features;
1928   quorum_mon_features = mon_features;
1929   pending_metadata = metadata;
1930   outside_quorum.clear();
1931
1932   clog->info() << "mon." << name << "@" << rank
1933                 << " won leader election with quorum " << quorum;
1934
1935   set_leader_commands(get_local_commands(mon_features));
1936
1937   paxos->leader_init();
1938   // NOTE: tell monmap monitor first.  This is important for the
1939   // bootstrap case to ensure that the very first paxos proposal
1940   // codifies the monmap.  Otherwise any manner of chaos can ensue
1941   // when monitors are call elections or participating in a paxos
1942   // round without agreeing on who the participants are.
1943   monmon()->election_finished();
1944   _finish_svc_election();
1945   health_monitor->start(epoch);
1946
1947   logger->inc(l_mon_election_win);
1948
1949   // inject new metadata in first transaction.
1950   {
1951     // include previous metadata for missing mons (that aren't part of
1952     // the current quorum).
1953     map<int,Metadata> m = metadata;
1954     for (unsigned rank = 0; rank < monmap->size(); ++rank) {
1955       if (m.count(rank) == 0 &&
1956           mon_metadata.count(rank)) {
1957         m[rank] = mon_metadata[rank];
1958       }
1959     }
1960
1961     // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1962     // a new transaction immediately after the election finishes.  We should
1963     // do that anyway for other reasons, though.
1964     MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
1965     bufferlist bl;
1966     ::encode(m, bl);
1967     t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
1968   }
1969
1970   finish_election();
1971   if (monmap->size() > 1 &&
1972       monmap->get_epoch() > 0) {
1973     timecheck_start();
1974     health_tick_start();
1975     do_health_to_clog_interval();
1976     scrub_event_start();
1977   }
1978 }
1979
1980 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
1981                             uint64_t features,
1982                             const mon_feature_t& mon_features)
1983 {
1984   state = STATE_PEON;
1985   leader_since = utime_t();
1986   leader = l;
1987   quorum = q;
1988   outside_quorum.clear();
1989   quorum_con_features = features;
1990   quorum_mon_features = mon_features;
1991   dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
1992            << " quorum is " << quorum << " features are " << quorum_con_features
1993            << " mon_features are " << quorum_mon_features
1994            << dendl;
1995
1996   paxos->peon_init();
1997   _finish_svc_election();
1998   health_monitor->start(epoch);
1999
2000   logger->inc(l_mon_election_lose);
2001
2002   finish_election();
2003
2004   if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
2005       !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
2006     // for pre-luminous mons only
2007     Metadata sys_info;
2008     collect_metadata(&sys_info);
2009     messenger->send_message(new MMonMetadata(sys_info),
2010                             monmap->get_inst(get_leader()));
2011   }
2012 }
2013
2014 void Monitor::collect_metadata(Metadata *m)
2015 {
2016   collect_sys_info(m, g_ceph_context);
2017   (*m)["addr"] = stringify(messenger->get_myaddr());
2018 }
2019
2020 void Monitor::finish_election()
2021 {
2022   apply_quorum_to_compatset_features();
2023   apply_monmap_to_compatset_features();
2024   timecheck_finish();
2025   exited_quorum = utime_t();
2026   finish_contexts(g_ceph_context, waitfor_quorum);
2027   finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2028   resend_routed_requests();
2029   update_logger();
2030   register_cluster_logger();
2031
2032   // am i named properly?
2033   string cur_name = monmap->get_name(messenger->get_myaddr());
2034   if (cur_name != name) {
2035     dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
2036     messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
2037                             monmap->get_inst(*quorum.begin()));
2038   }
2039 }
2040
2041 void Monitor::_apply_compatset_features(CompatSet &new_features)
2042 {
2043   if (new_features.compare(features) != 0) {
2044     CompatSet diff = features.unsupported(new_features);
2045     dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2046     features = new_features;
2047
2048     auto t = std::make_shared<MonitorDBStore::Transaction>();
2049     write_features(t);
2050     store->apply_transaction(t);
2051
2052     calc_quorum_requirements();
2053   }
2054 }
2055
2056 void Monitor::apply_quorum_to_compatset_features()
2057 {
2058   CompatSet new_features(features);
2059   if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
2060     new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2061   }
2062   if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2063     new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2064   }
2065   if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
2066     new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2067   }
2068   if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
2069     new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2070   }
2071   dout(5) << __func__ << dendl;
2072   _apply_compatset_features(new_features);
2073 }
2074
2075 void Monitor::apply_monmap_to_compatset_features()
2076 {
2077   CompatSet new_features(features);
2078   mon_feature_t monmap_features = monmap->get_required_features();
2079
2080   /* persistent monmap features may go into the compatset.
2081    * optional monmap features may not - why?
2082    *   because optional monmap features may be set/unset by the admin,
2083    *   and possibly by other means that haven't yet been thought out,
2084    *   so we can't make the monitor enforce them on start - because they
2085    *   may go away.
2086    *   this, of course, does not invalidate setting a compatset feature
2087    *   for an optional feature - as long as you make sure to clean it up
2088    *   once you unset it.
2089    */
2090   if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2091     assert(ceph::features::mon::get_persistent().contains_all(
2092            ceph::features::mon::FEATURE_KRAKEN));
2093     // this feature should only ever be set if the quorum supports it.
2094     assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2095     new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2096   }
2097   if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
2098     assert(ceph::features::mon::get_persistent().contains_all(
2099            ceph::features::mon::FEATURE_LUMINOUS));
2100     // this feature should only ever be set if the quorum supports it.
2101     assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
2102     new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2103   }
2104
2105   dout(5) << __func__ << dendl;
2106   _apply_compatset_features(new_features);
2107 }
2108
2109 void Monitor::calc_quorum_requirements()
2110 {
2111   required_features = 0;
2112
2113   // compatset
2114   if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
2115     required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
2116   }
2117   if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2118     required_features |= CEPH_FEATURE_OSDMAP_ENC;
2119   }
2120   if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
2121     required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
2122   }
2123   if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
2124     required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
2125   }
2126   if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2127     required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2128   }
2129   if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2130     required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2131   }
2132
2133   // monmap
2134   if (monmap->get_required_features().contains_all(
2135         ceph::features::mon::FEATURE_KRAKEN)) {
2136     required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2137   }
2138   if (monmap->get_required_features().contains_all(
2139         ceph::features::mon::FEATURE_LUMINOUS)) {
2140     required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2141   }
2142   dout(10) << __func__ << " required_features " << required_features << dendl;
2143 }
2144
2145 void Monitor::get_combined_feature_map(FeatureMap *fm)
2146 {
2147   *fm += session_map.feature_map;
2148   for (auto id : quorum) {
2149     if (id != rank) {
2150       *fm += quorum_feature_map[id];
2151     }
2152   }
2153 }
2154
2155 void Monitor::sync_force(Formatter *f, ostream& ss)
2156 {
2157   bool free_formatter = false;
2158
2159   if (!f) {
2160     // louzy/lazy hack: default to json if no formatter has been defined
2161     f = new JSONFormatter();
2162     free_formatter = true;
2163   }
2164
2165   auto tx(std::make_shared<MonitorDBStore::Transaction>());
2166   sync_stash_critical_state(tx);
2167   tx->put("mon_sync", "force_sync", 1);
2168   store->apply_transaction(tx);
2169
2170   f->open_object_section("sync_force");
2171   f->dump_int("ret", 0);
2172   f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2173   f->close_section(); // sync_force
2174   f->flush(ss);
2175   if (free_formatter)
2176     delete f;
2177 }
2178
2179 void Monitor::_quorum_status(Formatter *f, ostream& ss)
2180 {
2181   bool free_formatter = false;
2182
2183   if (!f) {
2184     // louzy/lazy hack: default to json if no formatter has been defined
2185     f = new JSONFormatter();
2186     free_formatter = true;
2187   }
2188   f->open_object_section("quorum_status");
2189   f->dump_int("election_epoch", get_epoch());
2190
2191   f->open_array_section("quorum");
2192   for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2193     f->dump_int("mon", *p);
2194   f->close_section(); // quorum
2195
2196   list<string> quorum_names = get_quorum_names();
2197   f->open_array_section("quorum_names");
2198   for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2199     f->dump_string("mon", *p);
2200   f->close_section(); // quorum_names
2201
2202   f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2203
2204   f->open_object_section("monmap");
2205   monmap->dump(f);
2206   f->close_section(); // monmap
2207
2208   f->close_section(); // quorum_status
2209   f->flush(ss);
2210   if (free_formatter)
2211     delete f;
2212 }
2213
2214 void Monitor::get_mon_status(Formatter *f, ostream& ss)
2215 {
2216   bool free_formatter = false;
2217
2218   if (!f) {
2219     // louzy/lazy hack: default to json if no formatter has been defined
2220     f = new JSONFormatter();
2221     free_formatter = true;
2222   }
2223
2224   f->open_object_section("mon_status");
2225   f->dump_string("name", name);
2226   f->dump_int("rank", rank);
2227   f->dump_string("state", get_state_name());
2228   f->dump_int("election_epoch", get_epoch());
2229
2230   f->open_array_section("quorum");
2231   for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2232     f->dump_int("mon", *p);
2233   }
2234
2235   f->close_section(); // quorum
2236
2237   f->open_object_section("features");
2238   f->dump_stream("required_con") << required_features;
2239   mon_feature_t req_mon_features = get_required_mon_features();
2240   req_mon_features.dump(f, "required_mon");
2241   f->dump_stream("quorum_con") << quorum_con_features;
2242   quorum_mon_features.dump(f, "quorum_mon");
2243   f->close_section(); // features
2244
2245   f->open_array_section("outside_quorum");
2246   for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2247     f->dump_string("mon", *p);
2248   f->close_section(); // outside_quorum
2249
2250   f->open_array_section("extra_probe_peers");
2251   for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
2252        p != extra_probe_peers.end();
2253        ++p)
2254     f->dump_stream("peer") << *p;
2255   f->close_section(); // extra_probe_peers
2256
2257   f->open_array_section("sync_provider");
2258   for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2259        p != sync_providers.end();
2260        ++p) {
2261     f->dump_unsigned("cookie", p->second.cookie);
2262     f->dump_stream("entity") << p->second.entity;
2263     f->dump_stream("timeout") << p->second.timeout;
2264     f->dump_unsigned("last_committed", p->second.last_committed);
2265     f->dump_stream("last_key") << p->second.last_key;
2266   }
2267   f->close_section();
2268
2269   if (is_synchronizing()) {
2270     f->open_object_section("sync");
2271     f->dump_stream("sync_provider") << sync_provider;
2272     f->dump_unsigned("sync_cookie", sync_cookie);
2273     f->dump_unsigned("sync_start_version", sync_start_version);
2274     f->close_section();
2275   }
2276
2277   if (g_conf->mon_sync_provider_kill_at > 0)
2278     f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
2279   if (g_conf->mon_sync_requester_kill_at > 0)
2280     f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
2281
2282   f->open_object_section("monmap");
2283   monmap->dump(f);
2284   f->close_section();
2285
2286   f->dump_object("feature_map", session_map.feature_map);
2287   f->close_section(); // mon_status
2288
2289   if (free_formatter) {
2290     // flush formatter to ss and delete it iff we created the formatter
2291     f->flush(ss);
2292     delete f;
2293   }
2294 }
2295
2296
2297 // health status to clog
2298
2299 void Monitor::health_tick_start()
2300 {
2301   if (!cct->_conf->mon_health_to_clog ||
2302       cct->_conf->mon_health_to_clog_tick_interval <= 0)
2303     return;
2304
2305   dout(15) << __func__ << dendl;
2306
2307   health_tick_stop();
2308   health_tick_event = timer.add_event_after(
2309     cct->_conf->mon_health_to_clog_tick_interval,
2310     new C_MonContext(this, [this](int r) {
2311         if (r < 0)
2312           return;
2313         do_health_to_clog();
2314         health_tick_start();
2315       }));
2316 }
2317
2318 void Monitor::health_tick_stop()
2319 {
2320   dout(15) << __func__ << dendl;
2321
2322   if (health_tick_event) {
2323     timer.cancel_event(health_tick_event);
2324     health_tick_event = NULL;
2325   }
2326 }
2327
2328 utime_t Monitor::health_interval_calc_next_update()
2329 {
2330   utime_t now = ceph_clock_now();
2331
2332   time_t secs = now.sec();
2333   int remainder = secs % cct->_conf->mon_health_to_clog_interval;
2334   int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2335   utime_t next = utime_t(secs + adjustment, 0);
2336
2337   dout(20) << __func__
2338     << " now: " << now << ","
2339     << " next: " << next << ","
2340     << " interval: " << cct->_conf->mon_health_to_clog_interval
2341     << dendl;
2342
2343   return next;
2344 }
2345
2346 void Monitor::health_interval_start()
2347 {
2348   dout(15) << __func__ << dendl;
2349
2350   if (!cct->_conf->mon_health_to_clog ||
2351       cct->_conf->mon_health_to_clog_interval <= 0) {
2352     return;
2353   }
2354
2355   health_interval_stop();
2356   utime_t next = health_interval_calc_next_update();
2357   health_interval_event = new C_MonContext(this, [this](int r) {
2358       if (r < 0)
2359         return;
2360       do_health_to_clog_interval();
2361     });
2362   if (!timer.add_event_at(next, health_interval_event)) {
2363     health_interval_event = nullptr;
2364   }
2365 }
2366
2367 void Monitor::health_interval_stop()
2368 {
2369   dout(15) << __func__ << dendl;
2370   if (health_interval_event) {
2371     timer.cancel_event(health_interval_event);
2372   }
2373   health_interval_event = NULL;
2374 }
2375
2376 void Monitor::health_events_cleanup()
2377 {
2378   health_tick_stop();
2379   health_interval_stop();
2380   health_status_cache.reset();
2381 }
2382
2383 void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2384 {
2385   dout(20) << __func__ << dendl;
2386
2387   if (changed.count("mon_health_to_clog")) {
2388     if (!cct->_conf->mon_health_to_clog) {
2389       health_events_cleanup();
2390     } else {
2391       if (!health_tick_event) {
2392         health_tick_start();
2393       }
2394       if (!health_interval_event) {
2395         health_interval_start();
2396       }
2397     }
2398   }
2399
2400   if (changed.count("mon_health_to_clog_interval")) {
2401     if (cct->_conf->mon_health_to_clog_interval <= 0) {
2402       health_interval_stop();
2403     } else {
2404       health_interval_start();
2405     }
2406   }
2407
2408   if (changed.count("mon_health_to_clog_tick_interval")) {
2409     if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2410       health_tick_stop();
2411     } else {
2412       health_tick_start();
2413     }
2414   }
2415 }
2416
2417 void Monitor::do_health_to_clog_interval()
2418 {
2419   // outputting to clog may have been disabled in the conf
2420   // since we were scheduled.
2421   if (!cct->_conf->mon_health_to_clog ||
2422       cct->_conf->mon_health_to_clog_interval <= 0)
2423     return;
2424
2425   dout(10) << __func__ << dendl;
2426
2427   // do we have a cached value for next_clog_update?  if not,
2428   // do we know when the last update was?
2429
2430   do_health_to_clog(true);
2431   health_interval_start();
2432 }
2433
2434 void Monitor::do_health_to_clog(bool force)
2435 {
2436   // outputting to clog may have been disabled in the conf
2437   // since we were scheduled.
2438   if (!cct->_conf->mon_health_to_clog ||
2439       cct->_conf->mon_health_to_clog_interval <= 0)
2440     return;
2441
2442   dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2443
2444   if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2445     string summary;
2446     health_status_t level = get_health_status(false, nullptr, &summary);
2447     if (!force &&
2448         summary == health_status_cache.summary &&
2449         level == health_status_cache.overall)
2450       return;
2451     clog->health(level) << "overall " << summary;
2452     health_status_cache.summary = summary;
2453     health_status_cache.overall = level;
2454   } else {
2455     // for jewel only
2456     list<string> status;
2457     health_status_t overall = get_health(status, NULL, NULL);
2458     dout(25) << __func__
2459              << (force ? " (force)" : "")
2460              << dendl;
2461
2462     string summary = joinify(status.begin(), status.end(), string("; "));
2463
2464     if (!force &&
2465         overall == health_status_cache.overall &&
2466         !health_status_cache.summary.empty() &&
2467         health_status_cache.summary == summary) {
2468       // we got a dup!
2469       return;
2470     }
2471
2472     clog->info() << summary;
2473
2474     health_status_cache.overall = overall;
2475     health_status_cache.summary = summary;
2476   }
2477 }
2478
2479 health_status_t Monitor::get_health_status(
2480   bool want_detail,
2481   Formatter *f,
2482   std::string *plain,
2483   const char *sep1,
2484   const char *sep2)
2485 {
2486   health_status_t r = HEALTH_OK;
2487   bool compat = g_conf->mon_health_preluminous_compat;
2488   bool compat_warn = g_conf->get_val<bool>("mon_health_preluminous_compat_warning");
2489   if (f) {
2490     f->open_object_section("health");
2491     f->open_object_section("checks");
2492   }
2493
2494   string summary;
2495   string *psummary = f ? nullptr : &summary;
2496   for (auto& svc : paxos_service) {
2497     r = std::min(r, svc->get_health_checks().dump_summary(
2498                    f, psummary, sep2, want_detail));
2499   }
2500
2501   if (f) {
2502     f->close_section();
2503     f->dump_stream("status") << r;
2504   } else {
2505     // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2506     *plain = stringify(r);
2507     if (summary.size()) {
2508       *plain += sep1;
2509       *plain += summary;
2510     }
2511     *plain += "\n";
2512   }
2513
2514   const std::string old_fields_message = "'ceph health' JSON format has "
2515     "changed in luminous. If you see this your monitoring system is "
2516     "scraping the wrong fields. Disable this with 'mon health preluminous "
2517     "compat warning = false'";
2518
2519   if (f && (compat || compat_warn)) {
2520     health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r;
2521     f->open_array_section("summary");
2522     if (compat_warn) {
2523       f->open_object_section("item");
2524       f->dump_stream("severity") << HEALTH_WARN;
2525       f->dump_string("summary", old_fields_message);
2526       f->close_section();
2527     }
2528     if (compat) {
2529       for (auto& svc : paxos_service) {
2530         svc->get_health_checks().dump_summary_compat(f);
2531       }
2532     }
2533     f->close_section();
2534     f->dump_stream("overall_status") << cr;
2535   }
2536
2537   if (want_detail) {
2538     if (f && (compat || compat_warn)) {
2539       f->open_array_section("detail");
2540       if (compat_warn) {
2541         f->dump_string("item", old_fields_message);
2542       }
2543     }
2544
2545     for (auto& svc : paxos_service) {
2546       svc->get_health_checks().dump_detail(f, plain, compat);
2547     }
2548
2549     if (f && (compat || compat_warn)) {
2550       f->close_section();
2551     }
2552   }
2553   if (f) {
2554     f->close_section();
2555   }
2556   return r;
2557 }
2558
2559 void Monitor::log_health(
2560   const health_check_map_t& updated,
2561   const health_check_map_t& previous,
2562   MonitorDBStore::TransactionRef t)
2563 {
2564   if (!g_conf->mon_health_to_clog) {
2565     return;
2566   }
2567
2568   const utime_t now = ceph_clock_now();
2569
2570   // FIXME: log atomically as part of @t instead of using clog.
2571   dout(10) << __func__ << " updated " << updated.checks.size()
2572            << " previous " << previous.checks.size()
2573            << dendl;
2574   const auto min_log_period = g_conf->get_val<int64_t>(
2575       "mon_health_log_update_period");
2576   for (auto& p : updated.checks) {
2577     auto q = previous.checks.find(p.first);
2578     bool logged = false;
2579     if (q == previous.checks.end()) {
2580       // new
2581       ostringstream ss;
2582       ss << "Health check failed: " << p.second.summary << " ("
2583          << p.first << ")";
2584       clog->health(p.second.severity) << ss.str();
2585
2586       logged = true;
2587     } else {
2588       if (p.second.summary != q->second.summary ||
2589           p.second.severity != q->second.severity) {
2590
2591         auto status_iter = health_check_log_times.find(p.first);
2592         if (status_iter != health_check_log_times.end()) {
2593           if (p.second.severity == q->second.severity &&
2594               now - status_iter->second.updated_at < min_log_period) {
2595             // We already logged this recently and the severity is unchanged,
2596             // so skip emitting an update of the summary string.
2597             // We'll get an update out of tick() later if the check
2598             // is still failing.
2599             continue;
2600           }
2601         }
2602
2603         // summary or severity changed (ignore detail changes at this level)
2604         ostringstream ss;
2605         ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2606         clog->health(p.second.severity) << ss.str();
2607
2608         logged = true;
2609       }
2610     }
2611     // Record the time at which we last logged, so that we can check this
2612     // when considering whether/when to print update messages.
2613     if (logged) {
2614       auto iter = health_check_log_times.find(p.first);
2615       if (iter == health_check_log_times.end()) {
2616         health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2617           p.second.severity, p.second.summary, now));
2618       } else {
2619         iter->second = HealthCheckLogStatus(
2620           p.second.severity, p.second.summary, now);
2621       }
2622     }
2623   }
2624   for (auto& p : previous.checks) {
2625     if (!updated.checks.count(p.first)) {
2626       // cleared
2627       ostringstream ss;
2628       if (p.first == "DEGRADED_OBJECTS") {
2629         clog->info() << "All degraded objects recovered";
2630       } else if (p.first == "OSD_FLAGS") {
2631         clog->info() << "OSD flags cleared";
2632       } else {
2633         clog->info() << "Health check cleared: " << p.first << " (was: "
2634                      << p.second.summary << ")";
2635       }
2636
2637       if (health_check_log_times.count(p.first)) {
2638         health_check_log_times.erase(p.first);
2639       }
2640     }
2641   }
2642
2643   if (previous.checks.size() && updated.checks.size() == 0) {
2644     // We might be going into a fully healthy state, check
2645     // other subsystems
2646     bool any_checks = false;
2647     for (auto& svc : paxos_service) {
2648       if (&(svc->get_health_checks()) == &(previous)) {
2649         // Ignore the ones we're clearing right now
2650         continue;
2651       }
2652
2653       if (svc->get_health_checks().checks.size() > 0) {
2654         any_checks = true;
2655         break;
2656       }
2657     }
2658     if (!any_checks) {
2659       clog->info() << "Cluster is now healthy";
2660     }
2661   }
2662 }
2663
2664 health_status_t Monitor::get_health(list<string>& status,
2665                                     bufferlist *detailbl,
2666                                     Formatter *f)
2667 {
2668   list<pair<health_status_t,string> > summary;
2669   list<pair<health_status_t,string> > detail;
2670
2671   if (f)
2672     f->open_object_section("health");
2673
2674   for (vector<PaxosService*>::iterator p = paxos_service.begin();
2675        p != paxos_service.end();
2676        ++p) {
2677     PaxosService *s = *p;
2678     s->get_health(summary, detailbl ? &detail : NULL, cct);
2679   }
2680
2681   health_monitor->get_health(summary, (detailbl ? &detail : NULL));
2682
2683   health_status_t overall = HEALTH_OK;
2684   if (!timecheck_skews.empty()) {
2685     list<string> warns;
2686     for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
2687          i != timecheck_skews.end(); ++i) {
2688       entity_inst_t inst = i->first;
2689       double skew = i->second;
2690       double latency = timecheck_latencies[inst];
2691       string name = monmap->get_name(inst.addr);
2692       ostringstream tcss;
2693       health_status_t tcstatus = timecheck_status(tcss, skew, latency);
2694       if (tcstatus != HEALTH_OK) {
2695         if (overall > tcstatus)
2696           overall = tcstatus;
2697         warns.push_back(name);
2698         ostringstream tmp_ss;
2699         tmp_ss << "mon." << name
2700                << " addr " << inst.addr << " " << tcss.str()
2701                << " (latency " << latency << "s)";
2702         detail.push_back(make_pair(tcstatus, tmp_ss.str()));
2703       }
2704     }
2705     if (!warns.empty()) {
2706       ostringstream ss;
2707       ss << "clock skew detected on";
2708       while (!warns.empty()) {
2709         ss << " mon." << warns.front();
2710         warns.pop_front();
2711         if (!warns.empty())
2712           ss << ",";
2713       }
2714       status.push_back(ss.str());
2715       summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
2716     }
2717   }
2718
2719   if (f)
2720     f->open_array_section("summary");
2721   if (!summary.empty()) {
2722     while (!summary.empty()) {
2723       if (overall > summary.front().first)
2724         overall = summary.front().first;
2725       status.push_back(summary.front().second);
2726       if (f) {
2727         f->open_object_section("item");
2728         f->dump_stream("severity") <<  summary.front().first;
2729         f->dump_string("summary", summary.front().second);
2730         f->close_section();
2731       }
2732       summary.pop_front();
2733     }
2734   }
2735   if (f)
2736     f->close_section();
2737
2738   stringstream fss;
2739   fss << overall;
2740   status.push_front(fss.str());
2741   if (f)
2742     f->dump_stream("overall_status") << overall;
2743
2744   if (f)
2745     f->open_array_section("detail");
2746   while (!detail.empty()) {
2747     if (f)
2748       f->dump_string("item", detail.front().second);
2749     else if (detailbl != NULL) {
2750       detailbl->append(detail.front().second);
2751       detailbl->append('\n');
2752     }
2753     detail.pop_front();
2754   }
2755   if (f)
2756     f->close_section();
2757
2758   if (f)
2759     f->close_section();
2760
2761   return overall;
2762 }
2763
2764 void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2765 {
2766   if (f)
2767     f->open_object_section("status");
2768
2769   if (f) {
2770     f->dump_stream("fsid") << monmap->get_fsid();
2771     if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2772       get_health_status(false, f, nullptr);
2773     } else {
2774       list<string> health_str;
2775       get_health(health_str, nullptr, f);
2776     }
2777     f->dump_unsigned("election_epoch", get_epoch());
2778     {
2779       f->open_array_section("quorum");
2780       for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2781         f->dump_int("rank", *p);
2782       f->close_section();
2783       f->open_array_section("quorum_names");
2784       for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2785         f->dump_string("id", monmap->get_name(*p));
2786       f->close_section();
2787     }
2788     f->open_object_section("monmap");
2789     monmap->dump(f);
2790     f->close_section();
2791     f->open_object_section("osdmap");
2792     osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
2793     f->close_section();
2794     f->open_object_section("pgmap");
2795     pgservice->print_summary(f, NULL);
2796     f->close_section();
2797     f->open_object_section("fsmap");
2798     mdsmon()->get_fsmap().print_summary(f, NULL);
2799     f->close_section();
2800     f->open_object_section("mgrmap");
2801     mgrmon()->get_map().print_summary(f, nullptr);
2802     f->close_section();
2803
2804     f->dump_object("servicemap", mgrstatmon()->get_service_map());
2805     f->close_section();
2806   } else {
2807     ss << "  cluster:\n";
2808     ss << "    id:     " << monmap->get_fsid() << "\n";
2809
2810     string health;
2811     if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2812       get_health_status(false, nullptr, &health,
2813                         "\n            ", "\n            ");
2814     } else {
2815       list<string> ls;
2816       get_health(ls, NULL, f);
2817       health = joinify(ls.begin(), ls.end(),
2818                        string("\n            "));
2819     }
2820     ss << "    health: " << health << "\n";
2821
2822     ss << "\n \n  services:\n";
2823     {
2824       size_t maxlen = 3;
2825       auto& service_map = mgrstatmon()->get_service_map();
2826       for (auto& p : service_map.services) {
2827         maxlen = std::max(maxlen, p.first.size());
2828       }
2829       string spacing(maxlen - 3, ' ');
2830       const auto quorum_names = get_quorum_names();
2831       const auto mon_count = monmap->mon_info.size();
2832       ss << "    mon: " << spacing << mon_count << " daemons, quorum "
2833          << quorum_names;
2834       if (quorum_names.size() != mon_count) {
2835         std::list<std::string> out_of_q;
2836         for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2837           if (quorum.count(i) == 0) {
2838             out_of_q.push_back(monmap->ranks[i]);
2839           }
2840         }
2841         ss << ", out of quorum: " << joinify(out_of_q.begin(),
2842                                              out_of_q.end(), std::string(", "));
2843       }
2844       ss << "\n";
2845       if (mgrmon()->in_use()) {
2846         ss << "    mgr: " << spacing;
2847         mgrmon()->get_map().print_summary(nullptr, &ss);
2848         ss << "\n";
2849       }
2850       if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2851         ss << "    mds: " << spacing << mdsmon()->get_fsmap() << "\n";
2852       }
2853       ss << "    osd: " << spacing;
2854       osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2855       ss << "\n";
2856       for (auto& p : service_map.services) {
2857         ss << "    " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2858            << p.second.get_summary() << "\n";
2859       }
2860     }
2861
2862     ss << "\n \n  data:\n";
2863     pgservice->print_summary(NULL, &ss);
2864     ss << "\n ";
2865   }
2866 }
2867
2868 void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
2869                                     map<string,string> &param_str_map)
2870 {
2871   for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
2872        p != cmdmap.end(); ++p) {
2873     if (p->first == "prefix")
2874       continue;
2875     if (p->first == "caps") {
2876       vector<string> cv;
2877       if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
2878           cv.size() % 2 == 0) {
2879         for (unsigned i = 0; i < cv.size(); i += 2) {
2880           string k = string("caps_") + cv[i];
2881           param_str_map[k] = cv[i + 1];
2882         }
2883         continue;
2884       }
2885     }
2886     param_str_map[p->first] = cmd_vartype_stringify(p->second);
2887   }
2888 }
2889
2890 const MonCommand *Monitor::_get_moncommand(
2891   const string &cmd_prefix,
2892   const vector<MonCommand>& cmds)
2893 {
2894   for (auto& c : cmds) {
2895     if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
2896       return &c;
2897     }
2898   }
2899   return nullptr;
2900 }
2901
2902 bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
2903                                const map<string,cmd_vartype>& cmdmap,
2904                                const map<string,string>& param_str_map,
2905                                const MonCommand *this_cmd) {
2906
2907   bool cmd_r = this_cmd->requires_perm('r');
2908   bool cmd_w = this_cmd->requires_perm('w');
2909   bool cmd_x = this_cmd->requires_perm('x');
2910
2911   bool capable = s->caps.is_capable(
2912     g_ceph_context,
2913     CEPH_ENTITY_TYPE_MON,
2914     s->entity_name,
2915     module, prefix, param_str_map,
2916     cmd_r, cmd_w, cmd_x);
2917
2918   dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
2919   return capable;
2920 }
2921
2922 void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
2923                                           Formatter *f,
2924                                           bufferlist *rdata,
2925                                           bool hide_mgr_flag)
2926 {
2927   int cmdnum = 0;
2928   f->open_object_section("command_descriptions");
2929   for (const auto &cmd : commands) {
2930     unsigned flags = cmd.flags;
2931     if (hide_mgr_flag) {
2932       flags &= ~MonCommand::FLAG_MGR;
2933     }
2934     ostringstream secname;
2935     secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
2936     dump_cmddesc_to_json(f, secname.str(),
2937                          cmd.cmdstring, cmd.helpstring, cmd.module,
2938                          cmd.req_perms, cmd.availability, flags);
2939     cmdnum++;
2940   }
2941   f->close_section();   // command_descriptions
2942
2943   f->flush(*rdata);
2944 }
2945
2946 bool Monitor::is_keyring_required()
2947 {
2948   string auth_cluster_required = g_conf->auth_supported.empty() ?
2949     g_conf->auth_cluster_required : g_conf->auth_supported;
2950   string auth_service_required = g_conf->auth_supported.empty() ?
2951     g_conf->auth_service_required : g_conf->auth_supported;
2952
2953   return auth_service_required == "cephx" ||
2954     auth_cluster_required == "cephx";
2955 }
2956
2957 struct C_MgrProxyCommand : public Context {
2958   Monitor *mon;
2959   MonOpRequestRef op;
2960   uint64_t size;
2961   bufferlist outbl;
2962   string outs;
2963   C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
2964     : mon(mon), op(op), size(s) { }
2965   void finish(int r) {
2966     Mutex::Locker l(mon->lock);
2967     mon->mgr_proxy_bytes -= size;
2968     mon->reply_command(op, r, outs, outbl, 0);
2969   }
2970 };
2971
2972 void Monitor::handle_command(MonOpRequestRef op)
2973 {
2974   assert(op->is_type_command());
2975   MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
2976   if (m->fsid != monmap->fsid) {
2977     dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
2978     reply_command(op, -EPERM, "wrong fsid", 0);
2979     return;
2980   }
2981
2982   MonSession *session = static_cast<MonSession *>(
2983     m->get_connection()->get_priv());
2984   if (!session) {
2985     dout(5) << __func__ << " dropping stray message " << *m << dendl;
2986     return;
2987   }
2988   BOOST_SCOPE_EXIT_ALL(=) {
2989     session->put();
2990   };
2991
2992   if (m->cmd.empty()) {
2993     string rs = "No command supplied";
2994     reply_command(op, -EINVAL, rs, 0);
2995     return;
2996   }
2997
2998   string prefix;
2999   vector<string> fullcmd;
3000   map<string, cmd_vartype> cmdmap;
3001   stringstream ss, ds;
3002   bufferlist rdata;
3003   string rs;
3004   int r = -EINVAL;
3005   rs = "unrecognized command";
3006
3007   if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3008     // ss has reason for failure
3009     r = -EINVAL;
3010     rs = ss.str();
3011     if (!m->get_source().is_mon())  // don't reply to mon->mon commands
3012       reply_command(op, r, rs, 0);
3013     return;
3014   }
3015
3016   // check return value. If no prefix parameter provided,
3017   // return value will be false, then return error info.
3018   if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) {
3019     reply_command(op, -EINVAL, "command prefix not found", 0);
3020     return;
3021   }
3022
3023   // check prefix is empty
3024   if (prefix.empty()) {
3025     reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3026     return;
3027   }
3028
3029   if (prefix == "get_command_descriptions") {
3030     bufferlist rdata;
3031     Formatter *f = Formatter::create("json");
3032     // hide mgr commands until luminous upgrade is complete
3033     bool hide_mgr_flag =
3034       osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
3035
3036     std::vector<MonCommand> commands;
3037
3038     // only include mgr commands once all mons are upgrade (and we've dropped
3039     // the hard-coded PGMonitor commands)
3040     if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
3041       commands = static_cast<MgrMonitor*>(
3042         paxos_service[PAXOS_MGR])->get_command_descs();
3043     }
3044
3045     for (auto& c : leader_mon_commands) {
3046       commands.push_back(c);
3047     }
3048
3049     format_command_descriptions(commands, f, &rdata, hide_mgr_flag);
3050     delete f;
3051     reply_command(op, 0, "", rdata, 0);
3052     return;
3053   }
3054
3055   string module;
3056   string err;
3057
3058   dout(0) << "handle_command " << *m << dendl;
3059
3060   string format;
3061   cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
3062   boost::scoped_ptr<Formatter> f(Formatter::create(format));
3063
3064   get_str_vec(prefix, fullcmd);
3065
3066   // make sure fullcmd is not empty.
3067   // invalid prefix will cause empty vector fullcmd.
3068   // such as, prefix=";,,;"
3069   if (fullcmd.empty()) {
3070     reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3071     return;
3072   }
3073
3074   module = fullcmd[0];
3075
3076   // validate command is in leader map
3077
3078   const MonCommand *leader_cmd;
3079   const auto& mgr_cmds = mgrmon()->get_command_descs();
3080   const MonCommand *mgr_cmd = nullptr;
3081   if (!mgr_cmds.empty()) {
3082     mgr_cmd = _get_moncommand(prefix, mgr_cmds);
3083   }
3084   leader_cmd = _get_moncommand(prefix, leader_mon_commands);
3085   if (!leader_cmd) {
3086     leader_cmd = mgr_cmd;
3087     if (!leader_cmd) {
3088       reply_command(op, -EINVAL, "command not known", 0);
3089       return;
3090     }
3091   }
3092   // validate command is in our map & matches, or forward if it is allowed
3093   const MonCommand *mon_cmd = _get_moncommand(
3094     prefix,
3095     get_local_commands(quorum_mon_features));
3096   if (!mon_cmd) {
3097     mon_cmd = mgr_cmd;
3098   }
3099   if (!is_leader()) {
3100     if (!mon_cmd) {
3101       if (leader_cmd->is_noforward()) {
3102         reply_command(op, -EINVAL,
3103                       "command not locally supported and not allowed to forward",
3104                       0);
3105         return;
3106       }
3107       dout(10) << "Command not locally supported, forwarding request "
3108                << m << dendl;
3109       forward_request_leader(op);
3110       return;
3111     } else if (!mon_cmd->is_compat(leader_cmd)) {
3112       if (mon_cmd->is_noforward()) {
3113         reply_command(op, -EINVAL,
3114                       "command not compatible with leader and not allowed to forward",
3115                       0);
3116         return;
3117       }
3118       dout(10) << "Command not compatible with leader, forwarding request "
3119                << m << dendl;
3120       forward_request_leader(op);
3121       return;
3122     }
3123   }
3124
3125   if (mon_cmd->is_obsolete() ||
3126       (cct->_conf->mon_debug_deprecated_as_obsolete
3127        && mon_cmd->is_deprecated())) {
3128     reply_command(op, -ENOTSUP,
3129                   "command is obsolete; please check usage and/or man page",
3130                   0);
3131     return;
3132   }
3133
3134   if (session->proxy_con && mon_cmd->is_noforward()) {
3135     dout(10) << "Got forward for noforward command " << m << dendl;
3136     reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3137     return;
3138   }
3139
3140   /* what we perceive as being the service the command falls under */
3141   string service(mon_cmd->module);
3142
3143   dout(25) << __func__ << " prefix='" << prefix
3144            << "' module='" << module
3145            << "' service='" << service << "'" << dendl;
3146
3147   bool cmd_is_rw =
3148     (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3149
3150   // validate user's permissions for requested command
3151   map<string,string> param_str_map;
3152   _generate_command_map(cmdmap, param_str_map);
3153   if (!_allowed_command(session, service, prefix, cmdmap,
3154                         param_str_map, mon_cmd)) {
3155     dout(1) << __func__ << " access denied" << dendl;
3156     (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3157       << "from='" << session->inst << "' "
3158       << "entity='" << session->entity_name << "' "
3159       << "cmd=" << m->cmd << ":  access denied";
3160     reply_command(op, -EACCES, "access denied", 0);
3161     return;
3162   }
3163
3164   (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3165     << "from='" << session->inst << "' "
3166     << "entity='" << session->entity_name << "' "
3167     << "cmd=" << m->cmd << ": dispatch";
3168
3169   if (mon_cmd->is_mgr() &&
3170       osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3171     const auto& hdr = m->get_header();
3172     uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3173     uint64_t max = g_conf->get_val<uint64_t>("mon_client_bytes")
3174                  * g_conf->get_val<double>("mon_mgr_proxy_client_bytes_ratio");
3175     if (mgr_proxy_bytes + size > max) {
3176       dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3177                << " + " << size << " > max " << max << dendl;
3178       reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3179       return;
3180     }
3181     mgr_proxy_bytes += size;
3182     dout(10) << __func__ << " proxying mgr command (+" << size
3183              << " -> " << mgr_proxy_bytes << ")" << dendl;
3184     C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3185     mgr_client.start_command(m->cmd,
3186                              m->get_data(),
3187                              &fin->outbl,
3188                              &fin->outs,
3189                              new C_OnFinisher(fin, &finisher));
3190     return;
3191   }
3192
3193   if ((module == "mds" || module == "fs")  &&
3194       prefix != "fs authorize") {
3195     mdsmon()->dispatch(op);
3196     return;
3197   }
3198   if ((module == "osd" || prefix == "pg map") &&
3199       prefix != "osd last-stat-seq") {
3200     osdmon()->dispatch(op);
3201     return;
3202   }
3203
3204   if (module == "pg") {
3205     pgmon()->dispatch(op);
3206     return;
3207   }
3208   if (module == "mon" &&
3209       /* Let the Monitor class handle the following commands:
3210        *  'mon compact'
3211        *  'mon scrub'
3212        *  'mon sync force'
3213        */
3214       prefix != "mon compact" &&
3215       prefix != "mon scrub" &&
3216       prefix != "mon sync force" &&
3217       prefix != "mon metadata" &&
3218       prefix != "mon versions" &&
3219       prefix != "mon count-metadata") {
3220     monmon()->dispatch(op);
3221     return;
3222   }
3223   if (module == "auth" || prefix == "fs authorize") {
3224     authmon()->dispatch(op);
3225     return;
3226   }
3227   if (module == "log") {
3228     logmon()->dispatch(op);
3229     return;
3230   }
3231
3232   if (module == "config-key") {
3233     config_key_service->dispatch(op);
3234     return;
3235   }
3236
3237   if (module == "mgr") {
3238     mgrmon()->dispatch(op);
3239     return;
3240   }
3241
3242   if (prefix == "fsid") {
3243     if (f) {
3244       f->open_object_section("fsid");
3245       f->dump_stream("fsid") << monmap->fsid;
3246       f->close_section();
3247       f->flush(rdata);
3248     } else {
3249       ds << monmap->fsid;
3250       rdata.append(ds);
3251     }
3252     reply_command(op, 0, "", rdata, 0);
3253     return;
3254   }
3255
3256   if (prefix == "scrub" || prefix == "mon scrub") {
3257     wait_for_paxos_write();
3258     if (is_leader()) {
3259       int r = scrub_start();
3260       reply_command(op, r, "", rdata, 0);
3261     } else if (is_peon()) {
3262       forward_request_leader(op);
3263     } else {
3264       reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3265     }
3266     return;
3267   }
3268
3269   if (prefix == "compact" || prefix == "mon compact") {
3270     dout(1) << "triggering manual compaction" << dendl;
3271     utime_t start = ceph_clock_now();
3272     store->compact();
3273     utime_t end = ceph_clock_now();
3274     end -= start;
3275     dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
3276     ostringstream oss;
3277     oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
3278     rs = oss.str();
3279     r = 0;
3280   }
3281   else if (prefix == "injectargs") {
3282     vector<string> injected_args;
3283     cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
3284     if (!injected_args.empty()) {
3285       dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
3286       ostringstream oss;
3287       r = g_conf->injectargs(str_join(injected_args, " "), &oss);
3288       ss << "injectargs:"  << oss.str();
3289       rs = ss.str();
3290       goto out;
3291     } else {
3292       rs = "must supply options to be parsed in a single string";
3293       r = -EINVAL;
3294     }
3295   } else if (prefix == "time-sync-status") {
3296     if (!f)
3297       f.reset(Formatter::create("json-pretty"));
3298     f->open_object_section("time_sync");
3299     if (!timecheck_skews.empty()) {
3300       f->open_object_section("time_skew_status");
3301       for (auto& i : timecheck_skews) {
3302         entity_inst_t inst = i.first;
3303         double skew = i.second;
3304         double latency = timecheck_latencies[inst];
3305         string name = monmap->get_name(inst.addr);
3306         ostringstream tcss;
3307         health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3308         f->open_object_section(name.c_str());
3309         f->dump_float("skew", skew);
3310         f->dump_float("latency", latency);
3311         f->dump_stream("health") << tcstatus;
3312         if (tcstatus != HEALTH_OK) {
3313           f->dump_stream("details") << tcss.str();
3314         }
3315         f->close_section();
3316       }
3317       f->close_section();
3318     }
3319     f->open_object_section("timechecks");
3320     f->dump_unsigned("epoch", get_epoch());
3321     f->dump_int("round", timecheck_round);
3322     f->dump_stream("round_status") << ((timecheck_round%2) ?
3323                                        "on-going" : "finished");
3324     f->close_section();
3325     f->close_section();
3326     f->flush(rdata);
3327     r = 0;
3328     rs = "";
3329   } else if (prefix == "config set") {
3330     std::string key;
3331     cmd_getval(cct, cmdmap, "key", key);
3332     std::string val;
3333     cmd_getval(cct, cmdmap, "value", val);
3334     r = g_conf->set_val(key, val, true, &ss);
3335     if (r == 0) {
3336       g_conf->apply_changes(nullptr);
3337     }
3338     rs = ss.str();
3339     goto out;
3340   } else if (prefix == "status" ||
3341              prefix == "health" ||
3342              prefix == "df") {
3343     string detail;
3344     cmd_getval(g_ceph_context, cmdmap, "detail", detail);
3345
3346     if (prefix == "status") {
3347       // get_cluster_status handles f == NULL
3348       get_cluster_status(ds, f.get());
3349
3350       if (f) {
3351         f->flush(ds);
3352         ds << '\n';
3353       }
3354       rdata.append(ds);
3355     } else if (prefix == "health") {
3356       if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3357         string plain;
3358         get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3359         if (f) {
3360           f->flush(rdata);
3361         } else {
3362           rdata.append(plain);
3363         }
3364       } else {
3365         list<string> health_str;
3366         get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
3367         if (f) {
3368           f->flush(ds);
3369           ds << '\n';
3370         } else {
3371           assert(!health_str.empty());
3372           ds << health_str.front();
3373           health_str.pop_front();
3374           if (!health_str.empty()) {
3375             ds << ' ';
3376             ds << joinify(health_str.begin(), health_str.end(), string("; "));
3377           }
3378         }
3379         bufferlist comb;
3380         comb.append(ds);
3381         if (detail == "detail")
3382           comb.append(rdata);
3383         rdata = comb;
3384       }
3385     } else if (prefix == "df") {
3386       bool verbose = (detail == "detail");
3387       if (f)
3388         f->open_object_section("stats");
3389
3390       pgservice->dump_fs_stats(&ds, f.get(), verbose);
3391       if (!f)
3392         ds << '\n';
3393       pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
3394
3395       if (f) {
3396         f->close_section();
3397         f->flush(ds);
3398         ds << '\n';
3399       }
3400     } else {
3401       assert(0 == "We should never get here!");
3402       return;
3403     }
3404     rdata.append(ds);
3405     rs = "";
3406     r = 0;
3407   } else if (prefix == "report") {
3408
3409     // this must be formatted, in its current form
3410     if (!f)
3411       f.reset(Formatter::create("json-pretty"));
3412     f->open_object_section("report");
3413     f->dump_stream("cluster_fingerprint") << fingerprint;
3414     f->dump_string("version", ceph_version_to_str());
3415     f->dump_string("commit", git_version_to_str());
3416     f->dump_stream("timestamp") << ceph_clock_now();
3417
3418     vector<string> tagsvec;
3419     cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
3420     string tagstr = str_join(tagsvec, " ");
3421     if (!tagstr.empty())
3422       tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3423     f->dump_string("tag", tagstr);
3424
3425     if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3426       get_health_status(true, f.get(), nullptr);
3427     } else {
3428       list<string> health_str;
3429       get_health(health_str, nullptr, f.get());
3430     }
3431
3432     monmon()->dump_info(f.get());
3433     osdmon()->dump_info(f.get());
3434     mdsmon()->dump_info(f.get());
3435     authmon()->dump_info(f.get());
3436     pgservice->dump_info(f.get());
3437
3438     paxos->dump_info(f.get());
3439
3440     f->close_section();
3441     f->flush(rdata);
3442
3443     ostringstream ss2;
3444     ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
3445     rs = ss2.str();
3446     r = 0;
3447   } else if (prefix == "osd last-stat-seq") {
3448     int64_t osd;
3449     cmd_getval(g_ceph_context, cmdmap, "id", osd);
3450     uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3451     if (f) {
3452       f->dump_unsigned("seq", seq);
3453       f->flush(ds);
3454     } else {
3455       ds << seq;
3456       rdata.append(ds);
3457     }
3458     rs = "";
3459     r = 0;
3460   } else if (prefix == "node ls") {
3461     string node_type("all");
3462     cmd_getval(g_ceph_context, cmdmap, "type", node_type);
3463     if (!f)
3464       f.reset(Formatter::create("json-pretty"));
3465     if (node_type == "all") {
3466       f->open_object_section("nodes");
3467       print_nodes(f.get(), ds);
3468       osdmon()->print_nodes(f.get());
3469       mdsmon()->print_nodes(f.get());
3470       f->close_section();
3471     } else if (node_type == "mon") {
3472       print_nodes(f.get(), ds);
3473     } else if (node_type == "osd") {
3474       osdmon()->print_nodes(f.get());
3475     } else if (node_type == "mds") {
3476       mdsmon()->print_nodes(f.get());
3477     }
3478     f->flush(ds);
3479     rdata.append(ds);
3480     rs = "";
3481     r = 0;
3482   } else if (prefix == "features") {
3483     if (!is_leader() && !is_peon()) {
3484       dout(10) << " waiting for quorum" << dendl;
3485       waitfor_quorum.push_back(new C_RetryMessage(this, op));
3486       return;
3487     }
3488     if (!is_leader()) {
3489       forward_request_leader(op);
3490       return;
3491     }
3492     if (!f)
3493       f.reset(Formatter::create("json-pretty"));
3494     FeatureMap fm;
3495     get_combined_feature_map(&fm);
3496     f->dump_object("features", fm);
3497     f->flush(rdata);
3498     rs = "";
3499     r = 0;
3500   } else if (prefix == "mon metadata") {
3501     if (!f)
3502       f.reset(Formatter::create("json-pretty"));
3503
3504     string name;
3505     bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name);
3506     if (!all) {
3507       // Dump a single mon's metadata
3508       int mon = monmap->get_rank(name);
3509       if (mon < 0) {
3510         rs = "requested mon not found";
3511         r = -ENOENT;
3512         goto out;
3513       }
3514       f->open_object_section("mon_metadata");
3515       r = get_mon_metadata(mon, f.get(), ds);
3516       f->close_section();
3517     } else {
3518       // Dump all mons' metadata
3519       r = 0;
3520       f->open_array_section("mon_metadata");
3521       for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3522         std::ostringstream get_err;
3523         f->open_object_section("mon");
3524         f->dump_string("name", monmap->get_name(rank));
3525         r = get_mon_metadata(rank, f.get(), get_err);
3526         f->close_section();
3527         if (r == -ENOENT || r == -EINVAL) {
3528           dout(1) << get_err.str() << dendl;
3529           // Drop error, list what metadata we do have
3530           r = 0;
3531         } else if (r != 0) {
3532           derr << "Unexpected error from get_mon_metadata: "
3533                << cpp_strerror(r) << dendl;
3534           ds << get_err.str();
3535           break;
3536         }
3537       }
3538       f->close_section();
3539     }
3540
3541     f->flush(ds);
3542     rdata.append(ds);
3543     rs = "";
3544   } else if (prefix == "mon versions") {
3545     if (!f)
3546       f.reset(Formatter::create("json-pretty"));
3547     count_metadata("ceph_version", f.get());
3548     f->flush(ds);
3549     rdata.append(ds);
3550     rs = "";
3551     r = 0;
3552   } else if (prefix == "mon count-metadata") {
3553     if (!f)
3554       f.reset(Formatter::create("json-pretty"));
3555     string field;
3556     cmd_getval(g_ceph_context, cmdmap, "property", field);
3557     count_metadata(field, f.get());
3558     f->flush(ds);
3559     rdata.append(ds);
3560     rs = "";
3561     r = 0;
3562   } else if (prefix == "quorum_status") {
3563     // make sure our map is readable and up to date
3564     if (!is_leader() && !is_peon()) {
3565       dout(10) << " waiting for quorum" << dendl;
3566       waitfor_quorum.push_back(new C_RetryMessage(this, op));
3567       return;
3568     }
3569     _quorum_status(f.get(), ds);
3570     rdata.append(ds);
3571     rs = "";
3572     r = 0;
3573   } else if (prefix == "mon_status") {
3574     get_mon_status(f.get(), ds);
3575     if (f)
3576       f->flush(ds);
3577     rdata.append(ds);
3578     rs = "";
3579     r = 0;
3580   } else if (prefix == "sync force" ||
3581              prefix == "mon sync force") {
3582     string validate1, validate2;
3583     cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
3584     cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
3585     if (validate1 != "--yes-i-really-mean-it" ||
3586         validate2 != "--i-know-what-i-am-doing") {
3587       r = -EINVAL;
3588       rs = "are you SURE? this will mean the monitor store will be "
3589            "erased.  pass '--yes-i-really-mean-it "
3590            "--i-know-what-i-am-doing' if you really do.";
3591       goto out;
3592     }
3593     sync_force(f.get(), ds);
3594     rs = ds.str();
3595     r = 0;
3596   } else if (prefix == "heap") {
3597     if (!ceph_using_tcmalloc())
3598       rs = "tcmalloc not enabled, can't use heap profiler commands\n";
3599     else {
3600       string heapcmd;
3601       cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
3602       // XXX 1-element vector, change at callee or make vector here?
3603       vector<string> heapcmd_vec;
3604       get_str_vec(heapcmd, heapcmd_vec);
3605       ceph_heap_profiler_handle_command(heapcmd_vec, ds);
3606       rdata.append(ds);
3607       rs = "";
3608       r = 0;
3609     }
3610   } else if (prefix == "quorum") {
3611     string quorumcmd;
3612     cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd);
3613     if (quorumcmd == "exit") {
3614       start_election();
3615       elector.stop_participating();
3616       rs = "stopped responding to quorum, initiated new election";
3617       r = 0;
3618     } else if (quorumcmd == "enter") {
3619       elector.start_participating();
3620       start_election();
3621       rs = "started responding to quorum, initiated new election";
3622       r = 0;
3623     } else {
3624       rs = "needs a valid 'quorum' command";
3625       r = -EINVAL;
3626     }
3627   } else if (prefix == "version") {
3628     if (f) {
3629       f->open_object_section("version");
3630       f->dump_string("version", pretty_version_to_str());
3631       f->close_section();
3632       f->flush(ds);
3633     } else {
3634       ds << pretty_version_to_str();
3635     }
3636     rdata.append(ds);
3637     rs = "";
3638     r = 0;
3639   } else if (prefix == "versions") {
3640     if (!f)
3641       f.reset(Formatter::create("json-pretty"));
3642     map<string,int> overall;
3643     f->open_object_section("version");
3644     map<string,int> mon, mgr, osd, mds;
3645
3646     count_metadata("ceph_version", &mon);
3647     f->open_object_section("mon");
3648     for (auto& p : mon) {
3649       f->dump_int(p.first.c_str(), p.second);
3650       overall[p.first] += p.second;
3651     }
3652     f->close_section();
3653
3654     mgrmon()->count_metadata("ceph_version", &mgr);
3655     f->open_object_section("mgr");
3656     for (auto& p : mgr) {
3657       f->dump_int(p.first.c_str(), p.second);
3658       overall[p.first] += p.second;
3659     }
3660     f->close_section();
3661
3662     osdmon()->count_metadata("ceph_version", &osd);
3663     f->open_object_section("osd");
3664     for (auto& p : osd) {
3665       f->dump_int(p.first.c_str(), p.second);
3666       overall[p.first] += p.second;
3667     }
3668     f->close_section();
3669
3670     mdsmon()->count_metadata("ceph_version", &mds);
3671     f->open_object_section("mds");
3672     for (auto& p : mds) {
3673       f->dump_int(p.first.c_str(), p.second);
3674       overall[p.first] += p.second;
3675     }
3676     f->close_section();
3677
3678     for (auto& p : mgrstatmon()->get_service_map().services) {
3679       f->open_object_section(p.first.c_str());
3680       map<string,int> m;
3681       p.second.count_metadata("ceph_version", &m);
3682       for (auto& q : m) {
3683         f->dump_int(q.first.c_str(), q.second);
3684         overall[q.first] += q.second;
3685       }
3686       f->close_section();
3687     }
3688
3689     f->open_object_section("overall");
3690     for (auto& p : overall) {
3691       f->dump_int(p.first.c_str(), p.second);
3692     }
3693     f->close_section();
3694     f->close_section();
3695     f->flush(rdata);
3696     rs = "";
3697     r = 0;
3698   }
3699
3700  out:
3701   if (!m->get_source().is_mon())  // don't reply to mon->mon commands
3702     reply_command(op, r, rs, rdata, 0);
3703 }
3704
3705 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3706 {
3707   bufferlist rdata;
3708   reply_command(op, rc, rs, rdata, version);
3709 }
3710
3711 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3712                             bufferlist& rdata, version_t version)
3713 {
3714   MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
3715   assert(m->get_type() == MSG_MON_COMMAND);
3716   MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3717   reply->set_tid(m->get_tid());
3718   reply->set_data(rdata);
3719   send_reply(op, reply);
3720 }
3721
3722
3723 // ------------------------
3724 // request/reply routing
3725 //
3726 // a client/mds/osd will connect to a random monitor.  we need to forward any
3727 // messages requiring state updates to the leader, and then route any replies
3728 // back via the correct monitor and back to them.  (the monitor will not
3729 // initiate any connections.)
3730
3731 void Monitor::forward_request_leader(MonOpRequestRef op)
3732 {
3733   op->mark_event(__func__);
3734
3735   int mon = get_leader();
3736   MonSession *session = op->get_session();
3737   PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3738   
3739   if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
3740     dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3741   } else if (session->proxy_con) {
3742     dout(10) << "forward_request won't double fwd request " << *req << dendl;
3743   } else if (!session->closed) {
3744     RoutedRequest *rr = new RoutedRequest;
3745     rr->tid = ++routed_request_tid;
3746     rr->client_inst = req->get_source_inst();
3747     rr->con = req->get_connection();
3748     rr->con_features = rr->con->get_features();
3749     encode_message(req, CEPH_FEATURES_ALL, rr->request_bl);   // for my use only; use all features
3750     rr->session = static_cast<MonSession *>(session->get());
3751     rr->op = op;
3752     routed_requests[rr->tid] = rr;
3753     session->routed_request_tids.insert(rr->tid);
3754     
3755     dout(10) << "forward_request " << rr->tid << " request " << *req
3756              << " features " << rr->con_features << dendl;
3757
3758     MForward *forward = new MForward(rr->tid,
3759                                      req,
3760                                      rr->con_features,
3761                                      rr->session->caps);
3762     forward->set_priority(req->get_priority());
3763     if (session->auth_handler) {
3764       forward->entity_name = session->entity_name;
3765     } else if (req->get_source().is_mon()) {
3766       forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3767     }
3768     messenger->send_message(forward, monmap->get_inst(mon));
3769     op->mark_forwarded();
3770     assert(op->get_req()->get_type() != 0);
3771   } else {
3772     dout(10) << "forward_request no session for request " << *req << dendl;
3773   }
3774 }
3775
3776 // fake connection attached to forwarded messages
3777 struct AnonConnection : public Connection {
3778   explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
3779
3780   int send_message(Message *m) override {
3781     assert(!"send_message on anonymous connection");
3782   }
3783   void send_keepalive() override {
3784     assert(!"send_keepalive on anonymous connection");
3785   }
3786   void mark_down() override {
3787     // silently ignore
3788   }
3789   void mark_disposable() override {
3790     // silengtly ignore
3791   }
3792   bool is_connected() override { return false; }
3793 };
3794
3795 //extract the original message and put it into the regular dispatch function
3796 void Monitor::handle_forward(MonOpRequestRef op)
3797 {
3798   MForward *m = static_cast<MForward*>(op->get_req());
3799   dout(10) << "received forwarded message from " << m->client
3800            << " via " << m->get_source_inst() << dendl;
3801   MonSession *session = op->get_session();
3802   assert(session);
3803
3804   if (!session->is_capable("mon", MON_CAP_X)) {
3805     dout(0) << "forward from entity with insufficient caps! " 
3806             << session->caps << dendl;
3807   } else {
3808     // see PaxosService::dispatch(); we rely on this being anon
3809     // (c->msgr == NULL)
3810     PaxosServiceMessage *req = m->claim_message();
3811     assert(req != NULL);
3812
3813     ConnectionRef c(new AnonConnection(cct));
3814     MonSession *s = new MonSession(req->get_source_inst(),
3815                                    static_cast<Connection*>(c.get()));
3816     c->set_priv(s->get());
3817     c->set_peer_addr(m->client.addr);
3818     c->set_peer_type(m->client.name.type());
3819     c->set_features(m->con_features);
3820
3821     s->caps = m->client_caps;
3822     dout(10) << " caps are " << s->caps << dendl;
3823     s->entity_name = m->entity_name;
3824     dout(10) << " entity name '" << s->entity_name << "' type "
3825              << s->entity_name.get_type() << dendl;
3826     s->proxy_con = m->get_connection();
3827     s->proxy_tid = m->tid;
3828
3829     req->set_connection(c);
3830
3831     // not super accurate, but better than nothing.
3832     req->set_recv_stamp(m->get_recv_stamp());
3833
3834     /*
3835      * note which election epoch this is; we will drop the message if
3836      * there is a future election since our peers will resend routed
3837      * requests in that case.
3838      */
3839     req->rx_election_epoch = get_epoch();
3840
3841     /* Because this is a special fake connection, we need to break
3842        the ref loop between Connection and MonSession differently
3843        than we normally do. Here, the Message refers to the Connection
3844        which refers to the Session, and nobody else refers to the Connection
3845        or the Session. And due to the special nature of this message,
3846        nobody refers to the Connection via the Session. So, clear out that
3847        half of the ref loop.*/
3848     s->con.reset(NULL);
3849
3850     dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
3851
3852     _ms_dispatch(req);
3853     s->put();
3854   }
3855 }
3856
3857 void Monitor::try_send_message(Message *m, const entity_inst_t& to)
3858 {
3859   dout(10) << "try_send_message " << *m << " to " << to << dendl;
3860
3861   bufferlist bl;
3862   encode_message(m, quorum_con_features, bl);
3863
3864   messenger->send_message(m, to);
3865
3866   for (int i=0; i<(int)monmap->size(); i++) {
3867     if (i != rank)
3868       messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
3869   }
3870 }
3871
3872 void Monitor::send_reply(MonOpRequestRef op, Message *reply)
3873 {
3874   op->mark_event(__func__);
3875
3876   MonSession *session = op->get_session();
3877   assert(session);
3878   Message *req = op->get_req();
3879   ConnectionRef con = op->get_connection();
3880
3881   reply->set_cct(g_ceph_context);
3882   dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
3883
3884   if (!con) {
3885     dout(2) << "send_reply no connection, dropping reply " << *reply
3886             << " to " << req << " " << *req << dendl;
3887     reply->put();
3888     op->mark_event("reply: no connection");
3889     return;
3890   }
3891
3892   if (!session->con && !session->proxy_con) {
3893     dout(2) << "send_reply no connection, dropping reply " << *reply
3894             << " to " << req << " " << *req << dendl;
3895     reply->put();
3896     op->mark_event("reply: no connection");
3897     return;
3898   }
3899
3900   if (session->proxy_con) {
3901     dout(15) << "send_reply routing reply to " << con->get_peer_addr()
3902              << " via " << session->proxy_con->get_peer_addr()
3903              << " for request " << *req << dendl;
3904     session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
3905     op->mark_event("reply: send routed request");
3906   } else {
3907     session->con->send_message(reply);
3908     op->mark_event("reply: send");
3909   }
3910 }
3911
3912 void Monitor::no_reply(MonOpRequestRef op)
3913 {
3914   MonSession *session = op->get_session();
3915   Message *req = op->get_req();
3916
3917   if (session->proxy_con) {
3918     dout(10) << "no_reply to " << req->get_source_inst()
3919              << " via " << session->proxy_con->get_peer_addr()
3920              << " for request " << *req << dendl;
3921     session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
3922     op->mark_event("no_reply: send routed request");
3923   } else {
3924     dout(10) << "no_reply to " << req->get_source_inst()
3925              << " " << *req << dendl;
3926     op->mark_event("no_reply");
3927   }
3928 }
3929
3930 void Monitor::handle_route(MonOpRequestRef op)
3931 {
3932   MRoute *m = static_cast<MRoute*>(op->get_req());
3933   MonSession *session = op->get_session();
3934   //check privileges
3935   if (!session->is_capable("mon", MON_CAP_X)) {
3936     dout(0) << "MRoute received from entity without appropriate perms! "
3937             << dendl;
3938     return;
3939   }
3940   if (m->msg)
3941     dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
3942   else
3943     dout(10) << "handle_route null to " << m->dest << dendl;
3944   
3945   // look it up
3946   if (m->session_mon_tid) {
3947     if (routed_requests.count(m->session_mon_tid)) {
3948       RoutedRequest *rr = routed_requests[m->session_mon_tid];
3949
3950       // reset payload, in case encoding is dependent on target features
3951       if (m->msg) {
3952         m->msg->clear_payload();
3953         rr->con->send_message(m->msg);
3954         m->msg = NULL;
3955       }
3956       if (m->send_osdmap_first) {
3957         dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
3958         osdmon()->send_incremental(m->send_osdmap_first, rr->session,
3959                                    true, MonOpRequestRef());
3960       }
3961       assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
3962       routed_requests.erase(m->session_mon_tid);
3963       rr->session->routed_request_tids.erase(m->session_mon_tid);
3964       delete rr;
3965     } else {
3966       dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
3967     }
3968   } else {
3969     dout(10) << " not a routed request, trying to send anyway" << dendl;
3970     if (m->msg) {
3971       messenger->send_message(m->msg, m->dest);
3972       m->msg = NULL;
3973     }
3974   }
3975 }
3976
3977 void Monitor::resend_routed_requests()
3978 {
3979   dout(10) << "resend_routed_requests" << dendl;
3980   int mon = get_leader();
3981   list<Context*> retry;
3982   for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
3983        p != routed_requests.end();
3984        ++p) {
3985     RoutedRequest *rr = p->second;
3986
3987     if (mon == rank) {
3988       dout(10) << " requeue for self tid " << rr->tid << dendl;
3989       rr->op->mark_event("retry routed request");
3990       retry.push_back(new C_RetryMessage(this, rr->op));
3991       if (rr->session) {
3992         assert(rr->session->routed_request_tids.count(p->first));
3993         rr->session->routed_request_tids.erase(p->first);
3994       }
3995       delete rr;
3996     } else {
3997       bufferlist::iterator q = rr->request_bl.begin();
3998       PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
3999       rr->op->mark_event("resend forwarded message to leader");
4000       dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
4001       MForward *forward = new MForward(rr->tid, req, rr->con_features,
4002                                        rr->session->caps);
4003       req->put();  // forward takes its own ref; drop ours.
4004       forward->client = rr->client_inst;
4005       forward->set_priority(req->get_priority());
4006       messenger->send_message(forward, monmap->get_inst(mon));
4007     }
4008   }
4009   if (mon == rank) {
4010     routed_requests.clear();
4011     finish_contexts(g_ceph_context, retry);
4012   }
4013 }
4014
4015 void Monitor::remove_session(MonSession *s)
4016 {
4017   dout(10) << "remove_session " << s << " " << s->inst
4018            << " features 0x" << std::hex << s->con_features << std::dec << dendl;
4019   assert(s->con);
4020   assert(!s->closed);
4021   for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4022        p != s->routed_request_tids.end();
4023        ++p) {
4024     assert(routed_requests.count(*p));
4025     RoutedRequest *rr = routed_requests[*p];
4026     dout(10) << " dropping routed request " << rr->tid << dendl;
4027     delete rr;
4028     routed_requests.erase(*p);
4029   }
4030   s->routed_request_tids.clear();
4031   s->con->set_priv(NULL);
4032   session_map.remove_session(s);
4033   logger->set(l_mon_num_sessions, session_map.get_size());
4034   logger->inc(l_mon_session_rm);
4035 }
4036
4037 void Monitor::remove_all_sessions()
4038 {
4039   Mutex::Locker l(session_map_lock);
4040   while (!session_map.sessions.empty()) {
4041     MonSession *s = session_map.sessions.front();
4042     remove_session(s);
4043     if (logger)
4044       logger->inc(l_mon_session_rm);
4045   }
4046   if (logger)
4047     logger->set(l_mon_num_sessions, session_map.get_size());
4048 }
4049
4050 void Monitor::send_command(const entity_inst_t& inst,
4051                            const vector<string>& com)
4052 {
4053   dout(10) << "send_command " << inst << "" << com << dendl;
4054   MMonCommand *c = new MMonCommand(monmap->fsid);
4055   c->cmd = com;
4056   try_send_message(c, inst);
4057 }
4058
4059 void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4060 {
4061   /**
4062    * Wait list the new session until we're in the quorum, assuming it's
4063    * sufficiently new.
4064    * tick() will periodically send them back through so we can send
4065    * the client elsewhere if we don't think we're getting back in.
4066    *
4067    * But we whitelist a few sorts of messages:
4068    * 1) Monitors can talk to us at any time, of course.
4069    * 2) auth messages. It's unlikely to go through much faster, but
4070    * it's possible we've just lost our quorum status and we want to take...
4071    * 3) command messages. We want to accept these under all possible
4072    * circumstances.
4073    */
4074   Message *m = op->get_req();
4075   MonSession *s = op->get_session();
4076   ConnectionRef con = op->get_connection();
4077   utime_t too_old = ceph_clock_now();
4078   too_old -= g_ceph_context->_conf->mon_lease;
4079   if (m->get_recv_stamp() > too_old &&
4080       con->is_connected()) {
4081     dout(5) << "waitlisting message " << *m << dendl;
4082     maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4083     op->mark_wait_for_quorum();
4084   } else {
4085     dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4086     con->mark_down();
4087     // proxied sessions aren't registered and don't have a con; don't remove
4088     // those.
4089     if (!s->proxy_con) {
4090       Mutex::Locker l(session_map_lock);
4091       remove_session(s);
4092     }
4093     op->mark_zap();
4094   }
4095 }
4096
4097 void Monitor::_ms_dispatch(Message *m)
4098 {
4099   if (is_shutdown()) {
4100     m->put();
4101     return;
4102   }
4103
4104   MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4105   bool src_is_mon = op->is_src_mon();
4106   op->mark_event("mon:_ms_dispatch");
4107   MonSession *s = op->get_session();
4108   if (s && s->closed) {
4109     return;
4110   }
4111
4112   if (src_is_mon && s) {
4113     ConnectionRef con = m->get_connection();
4114     if (con->get_messenger() && con->get_features() != s->con_features) {
4115       // only update features if this is a non-anonymous connection
4116       dout(10) << __func__ << " feature change for " << m->get_source_inst()
4117                << " (was " << s->con_features
4118                << ", now " << con->get_features() << ")" << dendl;
4119       // connection features changed - recreate session.
4120       if (s->con && s->con != con) {
4121         dout(10) << __func__ << " connection for " << m->get_source_inst()
4122                  << " changed from session; mark down and replace" << dendl;
4123         s->con->mark_down();
4124       }
4125       if (s->item.is_on_list()) {
4126         // forwarded messages' sessions are not in the sessions map and
4127         // exist only while the op is being handled.
4128         remove_session(s);
4129       }
4130       s->put();
4131       s = nullptr;
4132     }
4133   }
4134
4135   if (!s) {
4136     // if the sender is not a monitor, make sure their first message for a
4137     // session is an MAuth.  If it is not, assume it's a stray message,
4138     // and considering that we are creating a new session it is safe to
4139     // assume that the sender hasn't authenticated yet, so we have no way
4140     // of assessing whether we should handle it or not.
4141     if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4142                         m->get_type() != CEPH_MSG_MON_GET_MAP &&
4143                         m->get_type() != CEPH_MSG_PING)) {
4144       dout(1) << __func__ << " dropping stray message " << *m
4145               << " from " << m->get_source_inst() << dendl;
4146       return;
4147     }
4148
4149     ConnectionRef con = m->get_connection();
4150     {
4151       Mutex::Locker l(session_map_lock);
4152       s = session_map.new_session(m->get_source_inst(), con.get());
4153     }
4154     assert(s);
4155     con->set_priv(s->get());
4156     dout(10) << __func__ << " new session " << s << " " << *s
4157              << " features 0x" << std::hex
4158              << s->con_features << std::dec << dendl;
4159     op->set_session(s);
4160
4161     logger->set(l_mon_num_sessions, session_map.get_size());
4162     logger->inc(l_mon_session_add);
4163
4164     if (src_is_mon) {
4165       // give it monitor caps; the peer type has been authenticated
4166       dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4167       if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4168         s->caps = *mon_caps;
4169     }
4170     s->put();
4171   } else {
4172     dout(20) << __func__ << " existing session " << s << " for " << s->inst
4173              << dendl;
4174   }
4175
4176   assert(s);
4177
4178   s->session_timeout = ceph_clock_now();
4179   s->session_timeout += g_conf->mon_session_timeout;
4180
4181   if (s->auth_handler) {
4182     s->entity_name = s->auth_handler->get_entity_name();
4183   }
4184   dout(20) << " caps " << s->caps.get_str() << dendl;
4185
4186   if ((is_synchronizing() ||
4187        (s->global_id == 0 && !exited_quorum.is_zero())) &&
4188       !src_is_mon &&
4189       m->get_type() != CEPH_MSG_PING) {
4190     waitlist_or_zap_client(op);
4191   } else {
4192     dispatch_op(op);
4193   }
4194   return;
4195 }
4196
4197 void Monitor::dispatch_op(MonOpRequestRef op)
4198 {
4199   op->mark_event("mon:dispatch_op");
4200   MonSession *s = op->get_session();
4201   assert(s);
4202   if (s->closed) {
4203     dout(10) << " session closed, dropping " << op->get_req() << dendl;
4204     return;
4205   }
4206
4207   /* we will consider the default type as being 'monitor' until proven wrong */
4208   op->set_type_monitor();
4209   /* deal with all messages that do not necessarily need caps */
4210   bool dealt_with = true;
4211   switch (op->get_req()->get_type()) {
4212     // auth
4213     case MSG_MON_GLOBAL_ID:
4214     case CEPH_MSG_AUTH:
4215       op->set_type_service();
4216       /* no need to check caps here */
4217       paxos_service[PAXOS_AUTH]->dispatch(op);
4218       break;
4219
4220     case CEPH_MSG_PING:
4221       handle_ping(op);
4222       break;
4223
4224     /* MMonGetMap may be used by clients to obtain a monmap *before*
4225      * authenticating with the monitor.  We need to handle these without
4226      * checking caps because, even on a cluster without cephx, we only set
4227      * session caps *after* the auth handshake.  A good example of this
4228      * is when a client calls MonClient::get_monmap_privately(), which does
4229      * not authenticate when obtaining a monmap.
4230      */
4231     case CEPH_MSG_MON_GET_MAP:
4232       handle_mon_get_map(op);
4233       break;
4234
4235     case CEPH_MSG_MON_METADATA:
4236       return handle_mon_metadata(op);
4237
4238     default:
4239       dealt_with = false;
4240       break;
4241   }
4242   if (dealt_with)
4243     return;
4244
4245   /* well, maybe the op belongs to a service... */
4246   op->set_type_service();
4247   /* deal with all messages which caps should be checked somewhere else */
4248   dealt_with = true;
4249   switch (op->get_req()->get_type()) {
4250
4251     // OSDs
4252     case CEPH_MSG_MON_GET_OSDMAP:
4253     case CEPH_MSG_POOLOP:
4254     case MSG_OSD_BEACON:
4255     case MSG_OSD_MARK_ME_DOWN:
4256     case MSG_OSD_FULL:
4257     case MSG_OSD_FAILURE:
4258     case MSG_OSD_BOOT:
4259     case MSG_OSD_ALIVE:
4260     case MSG_OSD_PGTEMP:
4261     case MSG_OSD_PG_CREATED:
4262     case MSG_REMOVE_SNAPS:
4263       paxos_service[PAXOS_OSDMAP]->dispatch(op);
4264       break;
4265
4266     // MDSs
4267     case MSG_MDS_BEACON:
4268     case MSG_MDS_OFFLOAD_TARGETS:
4269       paxos_service[PAXOS_MDSMAP]->dispatch(op);
4270       break;
4271
4272     // Mgrs
4273     case MSG_MGR_BEACON:
4274       paxos_service[PAXOS_MGR]->dispatch(op);
4275       break;
4276
4277     // MgrStat
4278     case CEPH_MSG_STATFS:
4279       // this is an ugly hack, sorry!  force the version to 1 so that we do
4280       // not run afoul of the is_readable() paxos check.  the client is going
4281       // by the pgmonitor version and the MgrStatMonitor version will lag behind
4282       // that until we complete the upgrade.  The paxos ordering crap really
4283       // doesn't matter for statfs results, so just kludge around it here.
4284       if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
4285         ((MStatfs*)op->get_req())->version = 1;
4286       }
4287     case MSG_MON_MGR_REPORT:
4288     case MSG_GETPOOLSTATS:
4289       paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4290       break;
4291
4292     // pg
4293     case MSG_PGSTATS:
4294       paxos_service[PAXOS_PGMAP]->dispatch(op);
4295       break;
4296
4297     // log
4298     case MSG_LOG:
4299       paxos_service[PAXOS_LOG]->dispatch(op);
4300       break;
4301
4302     // handle_command() does its own caps checking
4303     case MSG_MON_COMMAND:
4304       op->set_type_command();
4305       handle_command(op);
4306       break;
4307
4308     default:
4309       dealt_with = false;
4310       break;
4311   }
4312   if (dealt_with)
4313     return;
4314
4315   /* nop, looks like it's not a service message; revert back to monitor */
4316   op->set_type_monitor();
4317
4318   /* messages we, the Monitor class, need to deal with
4319    * but may be sent by clients. */
4320
4321   if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4322     dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4323             << " not enough caps for " << *(op->get_req()) << " -- dropping"
4324             << dendl;
4325     goto drop;
4326   }
4327
4328   dealt_with = true;
4329   switch (op->get_req()->get_type()) {
4330
4331     // misc
4332     case CEPH_MSG_MON_GET_VERSION:
4333       handle_get_version(op);
4334       break;
4335
4336     case CEPH_MSG_MON_SUBSCRIBE:
4337       /* FIXME: check what's being subscribed, filter accordingly */
4338       handle_subscribe(op);
4339       break;
4340
4341     default:
4342       dealt_with = false;
4343       break;
4344   }
4345   if (dealt_with)
4346     return;
4347
4348   if (!op->is_src_mon()) {
4349     dout(1) << __func__ << " unexpected monitor message from"
4350             << " non-monitor entity " << op->get_req()->get_source_inst()
4351             << " " << *(op->get_req()) << " -- dropping" << dendl;
4352     goto drop;
4353   }
4354
4355   /* messages that should only be sent by another monitor */
4356   dealt_with = true;
4357   switch (op->get_req()->get_type()) {
4358
4359     case MSG_ROUTE:
4360       handle_route(op);
4361       break;
4362
4363     case MSG_MON_PROBE:
4364       handle_probe(op);
4365       break;
4366
4367     // Sync (i.e., the new slurp, but on steroids)
4368     case MSG_MON_SYNC:
4369       handle_sync(op);
4370       break;
4371     case MSG_MON_SCRUB:
4372       handle_scrub(op);
4373       break;
4374
4375     /* log acks are sent from a monitor we sent the MLog to, and are
4376        never sent by clients to us. */
4377     case MSG_LOGACK:
4378       log_client.handle_log_ack((MLogAck*)op->get_req());
4379       break;
4380
4381     // monmap
4382     case MSG_MON_JOIN:
4383       op->set_type_service();
4384       paxos_service[PAXOS_MONMAP]->dispatch(op);
4385       break;
4386
4387     // paxos
4388     case MSG_MON_PAXOS:
4389       {
4390         op->set_type_paxos();
4391         MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
4392         if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4393           //can't send these!
4394           break;
4395         }
4396
4397         if (state == STATE_SYNCHRONIZING) {
4398           // we are synchronizing. These messages would do us no
4399           // good, thus just drop them and ignore them.
4400           dout(10) << __func__ << " ignore paxos msg from "
4401             << pm->get_source_inst() << dendl;
4402           break;
4403         }
4404
4405         // sanitize
4406         if (pm->epoch > get_epoch()) {
4407           bootstrap();
4408           break;
4409         }
4410         if (pm->epoch != get_epoch()) {
4411           break;
4412         }
4413
4414         paxos->dispatch(op);
4415       }
4416       break;
4417
4418     // elector messages
4419     case MSG_MON_ELECTION:
4420       op->set_type_election();
4421       //check privileges here for simplicity
4422       if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4423         dout(0) << "MMonElection received from entity without enough caps!"
4424           << op->get_session()->caps << dendl;
4425         break;
4426       }
4427       if (!is_probing() && !is_synchronizing()) {
4428         elector.dispatch(op);
4429       }
4430       break;
4431
4432     case MSG_FORWARD:
4433       handle_forward(op);
4434       break;
4435
4436     case MSG_TIMECHECK:
4437       handle_timecheck(op);
4438       break;
4439
4440     case MSG_MON_HEALTH:
4441       health_monitor->dispatch(op);
4442       break;
4443
4444     case MSG_MON_HEALTH_CHECKS:
4445       op->set_type_service();
4446       paxos_service[PAXOS_HEALTH]->dispatch(op);
4447       break;
4448
4449     default:
4450       dealt_with = false;
4451       break;
4452   }
4453   if (!dealt_with) {
4454     dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4455     goto drop;
4456   }
4457   return;
4458
4459 drop:
4460   return;
4461 }
4462
4463 void Monitor::handle_ping(MonOpRequestRef op)
4464 {
4465   MPing *m = static_cast<MPing*>(op->get_req());
4466   dout(10) << __func__ << " " << *m << dendl;
4467   MPing *reply = new MPing;
4468   entity_inst_t inst = m->get_source_inst();
4469   bufferlist payload;
4470   boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4471   f->open_object_section("pong");
4472
4473   if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
4474     get_health_status(false, f.get(), nullptr);
4475   } else {
4476     list<string> health_str;
4477     get_health(health_str, nullptr, f.get());
4478   }
4479
4480   {
4481     stringstream ss;
4482     get_mon_status(f.get(), ss);
4483   }
4484
4485   f->close_section();
4486   stringstream ss;
4487   f->flush(ss);
4488   ::encode(ss.str(), payload);
4489   reply->set_payload(payload);
4490   dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4491   messenger->send_message(reply, inst);
4492 }
4493
4494 void Monitor::timecheck_start()
4495 {
4496   dout(10) << __func__ << dendl;
4497   timecheck_cleanup();
4498   timecheck_start_round();
4499 }
4500
4501 void Monitor::timecheck_finish()
4502 {
4503   dout(10) << __func__ << dendl;
4504   timecheck_cleanup();
4505 }
4506
4507 void Monitor::timecheck_start_round()
4508 {
4509   dout(10) << __func__ << " curr " << timecheck_round << dendl;
4510   assert(is_leader());
4511
4512   if (monmap->size() == 1) {
4513     assert(0 == "We are alone; this shouldn't have been scheduled!");
4514     return;
4515   }
4516
4517   if (timecheck_round % 2) {
4518     dout(10) << __func__ << " there's a timecheck going on" << dendl;
4519     utime_t curr_time = ceph_clock_now();
4520     double max = g_conf->mon_timecheck_interval*3;
4521     if (curr_time - timecheck_round_start < max) {
4522       dout(10) << __func__ << " keep current round going" << dendl;
4523       goto out;
4524     } else {
4525       dout(10) << __func__
4526                << " finish current timecheck and start new" << dendl;
4527       timecheck_cancel_round();
4528     }
4529   }
4530
4531   assert(timecheck_round % 2 == 0);
4532   timecheck_acks = 0;
4533   timecheck_round ++;
4534   timecheck_round_start = ceph_clock_now();
4535   dout(10) << __func__ << " new " << timecheck_round << dendl;
4536
4537   timecheck();
4538 out:
4539   dout(10) << __func__ << " setting up next event" << dendl;
4540   timecheck_reset_event();
4541 }
4542
4543 void Monitor::timecheck_finish_round(bool success)
4544 {
4545   dout(10) << __func__ << " curr " << timecheck_round << dendl;
4546   assert(timecheck_round % 2);
4547   timecheck_round ++;
4548   timecheck_round_start = utime_t();
4549
4550   if (success) {
4551     assert(timecheck_waiting.empty());
4552     assert(timecheck_acks == quorum.size());
4553     timecheck_report();
4554     timecheck_check_skews();
4555     return;
4556   }
4557
4558   dout(10) << __func__ << " " << timecheck_waiting.size()
4559            << " peers still waiting:";
4560   for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
4561       p != timecheck_waiting.end(); ++p) {
4562     *_dout << " " << p->first.name;
4563   }
4564   *_dout << dendl;
4565   timecheck_waiting.clear();
4566
4567   dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4568 }
4569
4570 void Monitor::timecheck_cancel_round()
4571 {
4572   timecheck_finish_round(false);
4573 }
4574
4575 void Monitor::timecheck_cleanup()
4576 {
4577   timecheck_round = 0;
4578   timecheck_acks = 0;
4579   timecheck_round_start = utime_t();
4580
4581   if (timecheck_event) {
4582     timer.cancel_event(timecheck_event);
4583     timecheck_event = NULL;
4584   }
4585   timecheck_waiting.clear();
4586   timecheck_skews.clear();
4587   timecheck_latencies.clear();
4588
4589   timecheck_rounds_since_clean = 0;
4590 }
4591
4592 void Monitor::timecheck_reset_event()
4593 {
4594   if (timecheck_event) {
4595     timer.cancel_event(timecheck_event);
4596     timecheck_event = NULL;
4597   }
4598
4599   double delay =
4600     cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4601
4602   if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4603     delay = cct->_conf->mon_timecheck_interval;
4604   }
4605
4606   dout(10) << __func__ << " delay " << delay
4607            << " rounds_since_clean " << timecheck_rounds_since_clean
4608            << dendl;
4609
4610   timecheck_event = timer.add_event_after(
4611     delay,
4612     new C_MonContext(this, [this](int) {
4613         timecheck_start_round();
4614       }));
4615 }
4616
4617 void Monitor::timecheck_check_skews()
4618 {
4619   dout(10) << __func__ << dendl;
4620   assert(is_leader());
4621   assert((timecheck_round % 2) == 0);
4622   if (monmap->size() == 1) {
4623     assert(0 == "We are alone; we shouldn't have gotten here!");
4624     return;
4625   }
4626   assert(timecheck_latencies.size() == timecheck_skews.size());
4627
4628   bool found_skew = false;
4629   for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
4630        p != timecheck_skews.end(); ++p) {
4631
4632     double abs_skew;
4633     if (timecheck_has_skew(p->second, &abs_skew)) {
4634       dout(10) << __func__
4635                << " " << p->first << " skew " << abs_skew << dendl;
4636       found_skew = true;
4637     }
4638   }
4639
4640   if (found_skew) {
4641     ++timecheck_rounds_since_clean;
4642     timecheck_reset_event();
4643   } else if (timecheck_rounds_since_clean > 0) {
4644     dout(1) << __func__
4645       << " no clock skews found after " << timecheck_rounds_since_clean
4646       << " rounds" << dendl;
4647     // make sure the skews are really gone and not just a transient success
4648     // this will run just once if not in the presence of skews again.
4649     timecheck_rounds_since_clean = 1;
4650     timecheck_reset_event();
4651     timecheck_rounds_since_clean = 0;
4652   }
4653
4654 }
4655
4656 void Monitor::timecheck_report()
4657 {
4658   dout(10) << __func__ << dendl;
4659   assert(is_leader());
4660   assert((timecheck_round % 2) == 0);
4661   if (monmap->size() == 1) {
4662     assert(0 == "We are alone; we shouldn't have gotten here!");
4663     return;
4664   }
4665
4666   assert(timecheck_latencies.size() == timecheck_skews.size());
4667   bool do_output = true; // only output report once
4668   for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4669     if (monmap->get_name(*q) == name)
4670       continue;
4671
4672     MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
4673     m->epoch = get_epoch();
4674     m->round = timecheck_round;
4675
4676     for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
4677          it != timecheck_skews.end(); ++it) {
4678       double skew = it->second;
4679       double latency = timecheck_latencies[it->first];
4680
4681       m->skews[it->first] = skew;
4682       m->latencies[it->first] = latency;
4683
4684       if (do_output) {
4685         dout(25) << __func__ << " " << it->first
4686                  << " latency " << latency
4687                  << " skew " << skew << dendl;
4688       }
4689     }
4690     do_output = false;
4691     entity_inst_t inst = monmap->get_inst(*q);
4692     dout(10) << __func__ << " send report to " << inst << dendl;
4693     messenger->send_message(m, inst);
4694   }
4695 }
4696
4697 void Monitor::timecheck()
4698 {
4699   dout(10) << __func__ << dendl;
4700   assert(is_leader());
4701   if (monmap->size() == 1) {
4702     assert(0 == "We are alone; we shouldn't have gotten here!");
4703     return;
4704   }
4705   assert(timecheck_round % 2 != 0);
4706
4707   timecheck_acks = 1; // we ack ourselves
4708
4709   dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4710            << " round " << timecheck_round << dendl;
4711
4712   // we are at the eye of the storm; the point of reference
4713   timecheck_skews[messenger->get_myinst()] = 0.0;
4714   timecheck_latencies[messenger->get_myinst()] = 0.0;
4715
4716   for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4717     if (monmap->get_name(*it) == name)
4718       continue;
4719
4720     entity_inst_t inst = monmap->get_inst(*it);
4721     utime_t curr_time = ceph_clock_now();
4722     timecheck_waiting[inst] = curr_time;
4723     MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
4724     m->epoch = get_epoch();
4725     m->round = timecheck_round;
4726     dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
4727     messenger->send_message(m, inst);
4728   }
4729 }
4730
4731 health_status_t Monitor::timecheck_status(ostringstream &ss,
4732                                           const double skew_bound,
4733                                           const double latency)
4734 {
4735   health_status_t status = HEALTH_OK;
4736   assert(latency >= 0);
4737
4738   double abs_skew;
4739   if (timecheck_has_skew(skew_bound, &abs_skew)) {
4740     status = HEALTH_WARN;
4741     ss << "clock skew " << abs_skew << "s"
4742        << " > max " << g_conf->mon_clock_drift_allowed << "s";
4743   }
4744
4745   return status;
4746 }
4747
4748 void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4749 {
4750   MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4751   dout(10) << __func__ << " " << *m << dendl;
4752   /* handles PONG's */
4753   assert(m->op == MTimeCheck::OP_PONG);
4754
4755   entity_inst_t other = m->get_source_inst();
4756   if (m->epoch < get_epoch()) {
4757     dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4758             << " from " << other
4759             << " curr " << get_epoch()
4760             << " -- severely lagged? discard" << dendl;
4761     return;
4762   }
4763   assert(m->epoch == get_epoch());
4764
4765   if (m->round < timecheck_round) {
4766     dout(1) << __func__ << " got old round " << m->round
4767             << " from " << other
4768             << " curr " << timecheck_round << " -- discard" << dendl;
4769     return;
4770   }
4771
4772   utime_t curr_time = ceph_clock_now();
4773
4774   assert(timecheck_waiting.count(other) > 0);
4775   utime_t timecheck_sent = timecheck_waiting[other];
4776   timecheck_waiting.erase(other);
4777   if (curr_time < timecheck_sent) {
4778     // our clock was readjusted -- drop everything until it all makes sense.
4779     dout(1) << __func__ << " our clock was readjusted --"
4780             << " bump round and drop current check"
4781             << dendl;
4782     timecheck_cancel_round();
4783     return;
4784   }
4785
4786   /* update peer latencies */
4787   double latency = (double)(curr_time - timecheck_sent);
4788
4789   if (timecheck_latencies.count(other) == 0)
4790     timecheck_latencies[other] = latency;
4791   else {
4792     double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4793     timecheck_latencies[other] = avg_latency;
4794   }
4795
4796   /*
4797    * update skews
4798    *
4799    * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4800    * and 'a' happens to be lower than 'b'; so we use double instead.
4801    *
4802    * latency is always expected to be >= 0.
4803    *
4804    * delta, the difference between theirs timestamp and ours, may either be
4805    * lower or higher than 0; will hardly ever be 0.
4806    *
4807    * The absolute skew is the absolute delta minus the latency, which is
4808    * taken as a whole instead of an rtt given that there is some queueing
4809    * and dispatch times involved and it's hard to assess how long exactly
4810    * it took for the message to travel to the other side and be handled. So
4811    * we call it a bounded skew, the worst case scenario.
4812    *
4813    * Now, to math!
4814    *
4815    * Given that the latency is always positive, we can establish that the
4816    * bounded skew will be:
4817    *
4818    *  1. positive if the absolute delta is higher than the latency and
4819    *     delta is positive
4820    *  2. negative if the absolute delta is higher than the latency and
4821    *     delta is negative.
4822    *  3. zero if the absolute delta is lower than the latency.
4823    *
4824    * On 3. we make a judgement call and treat the skew as non-existent.
4825    * This is because that, if the absolute delta is lower than the
4826    * latency, then the apparently existing skew is nothing more than a
4827    * side-effect of the high latency at work.
4828    *
4829    * This may not be entirely true though, as a severely skewed clock
4830    * may be masked by an even higher latency, but with high latencies
4831    * we probably have worse issues to deal with than just skewed clocks.
4832    */
4833   assert(latency >= 0);
4834
4835   double delta = ((double) m->timestamp) - ((double) curr_time);
4836   double abs_delta = (delta > 0 ? delta : -delta);
4837   double skew_bound = abs_delta - latency;
4838   if (skew_bound < 0)
4839     skew_bound = 0;
4840   else if (delta < 0)
4841     skew_bound = -skew_bound;
4842
4843   ostringstream ss;
4844   health_status_t status = timecheck_status(ss, skew_bound, latency);
4845   clog->health(status) << other << " " << ss.str();
4846
4847   dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4848            << " delta " << delta << " skew_bound " << skew_bound
4849            << " latency " << latency << dendl;
4850
4851   timecheck_skews[other] = skew_bound;
4852
4853   timecheck_acks++;
4854   if (timecheck_acks == quorum.size()) {
4855     dout(10) << __func__ << " got pongs from everybody ("
4856              << timecheck_acks << " total)" << dendl;
4857     assert(timecheck_skews.size() == timecheck_acks);
4858     assert(timecheck_waiting.empty());
4859     // everyone has acked, so bump the round to finish it.
4860     timecheck_finish_round();
4861   }
4862 }
4863
4864 void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4865 {
4866   MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4867   dout(10) << __func__ << " " << *m << dendl;
4868
4869   assert(is_peon());
4870   assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
4871
4872   if (m->epoch != get_epoch()) {
4873     dout(1) << __func__ << " got wrong epoch "
4874             << "(ours " << get_epoch()
4875             << " theirs: " << m->epoch << ") -- discarding" << dendl;
4876     return;
4877   }
4878
4879   if (m->round < timecheck_round) {
4880     dout(1) << __func__ << " got old round " << m->round
4881             << " current " << timecheck_round
4882             << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4883     return;
4884   }
4885
4886   timecheck_round = m->round;
4887
4888   if (m->op == MTimeCheck::OP_REPORT) {
4889     assert((timecheck_round % 2) == 0);
4890     timecheck_latencies.swap(m->latencies);
4891     timecheck_skews.swap(m->skews);
4892     return;
4893   }
4894
4895   assert((timecheck_round % 2) != 0);
4896   MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
4897   utime_t curr_time = ceph_clock_now();
4898   reply->timestamp = curr_time;
4899   reply->epoch = m->epoch;
4900   reply->round = m->round;
4901   dout(10) << __func__ << " send " << *m
4902            << " to " << m->get_source_inst() << dendl;
4903   m->get_connection()->send_message(reply);
4904 }
4905
4906 void Monitor::handle_timecheck(MonOpRequestRef op)
4907 {
4908   MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4909   dout(10) << __func__ << " " << *m << dendl;
4910
4911   if (is_leader()) {
4912     if (m->op != MTimeCheck::OP_PONG) {
4913       dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
4914     } else {
4915       handle_timecheck_leader(op);
4916     }
4917   } else if (is_peon()) {
4918     if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
4919       dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
4920     } else {
4921       handle_timecheck_peon(op);
4922     }
4923   } else {
4924     dout(1) << __func__ << " drop unexpected msg" << dendl;
4925   }
4926 }
4927
4928 void Monitor::handle_subscribe(MonOpRequestRef op)
4929 {
4930   MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
4931   dout(10) << "handle_subscribe " << *m << dendl;
4932   
4933   bool reply = false;
4934
4935   MonSession *s = op->get_session();
4936   assert(s);
4937
4938   for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
4939        p != m->what.end();
4940        ++p) {
4941     // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4942     if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
4943       reply = true;
4944
4945     // remove conflicting subscribes
4946     if (logmon()->sub_name_to_id(p->first) >= 0) {
4947       for (map<string, Subscription*>::iterator it = s->sub_map.begin();
4948            it != s->sub_map.end(); ) {
4949         if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
4950           Mutex::Locker l(session_map_lock);
4951           session_map.remove_sub((it++)->second);
4952         } else {
4953           ++it;
4954         }
4955       }
4956     }
4957
4958     {
4959       Mutex::Locker l(session_map_lock);
4960       session_map.add_update_sub(s, p->first, p->second.start,
4961                                  p->second.flags & CEPH_SUBSCRIBE_ONETIME,
4962                                  m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
4963     }
4964
4965     if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
4966       dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
4967       if ((int)s->is_capable("mds", MON_CAP_R)) {
4968         Subscription *sub = s->sub_map[p->first];
4969         assert(sub != nullptr);
4970         mdsmon()->check_sub(sub);
4971       }
4972     } else if (p->first == "osdmap") {
4973       if ((int)s->is_capable("osd", MON_CAP_R)) {
4974         if (s->osd_epoch > p->second.start) {
4975           // client needs earlier osdmaps on purpose, so reset the sent epoch
4976           s->osd_epoch = 0;
4977         }
4978         osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
4979       }
4980     } else if (p->first == "osd_pg_creates") {
4981       if ((int)s->is_capable("osd", MON_CAP_W)) {
4982         if (monmap->get_required_features().contains_all(
4983               ceph::features::mon::FEATURE_LUMINOUS)) {
4984           osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
4985         } else {
4986           pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
4987         }
4988       }
4989     } else if (p->first == "monmap") {
4990       monmon()->check_sub(s->sub_map[p->first]);
4991     } else if (logmon()->sub_name_to_id(p->first) >= 0) {
4992       logmon()->check_sub(s->sub_map[p->first]);
4993     } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
4994       mgrmon()->check_sub(s->sub_map[p->first]);
4995     } else if (p->first == "servicemap") {
4996       mgrstatmon()->check_sub(s->sub_map[p->first]);
4997     }
4998   }
4999
5000   if (reply) {
5001     // we only need to reply if the client is old enough to think it
5002     // has to send renewals.
5003     ConnectionRef con = m->get_connection();
5004     if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
5005       m->get_connection()->send_message(new MMonSubscribeAck(
5006         monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
5007   }
5008
5009 }
5010
5011 void Monitor::handle_get_version(MonOpRequestRef op)
5012 {
5013   MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
5014   dout(10) << "handle_get_version " << *m << dendl;
5015   PaxosService *svc = NULL;
5016
5017   MonSession *s = op->get_session();
5018   assert(s);
5019
5020   if (!is_leader() && !is_peon()) {
5021     dout(10) << " waiting for quorum" << dendl;
5022     waitfor_quorum.push_back(new C_RetryMessage(this, op));
5023     goto out;
5024   }
5025
5026   if (m->what == "mdsmap") {
5027     svc = mdsmon();
5028   } else if (m->what == "fsmap") {
5029     svc = mdsmon();
5030   } else if (m->what == "osdmap") {
5031     svc = osdmon();
5032   } else if (m->what == "monmap") {
5033     svc = monmon();
5034   } else {
5035     derr << "invalid map type " << m->what << dendl;
5036   }
5037
5038   if (svc) {
5039     if (!svc->is_readable()) {
5040       svc->wait_for_readable(op, new C_RetryMessage(this, op));
5041       goto out;
5042     }
5043
5044     MMonGetVersionReply *reply = new MMonGetVersionReply();
5045     reply->handle = m->handle;
5046     reply->version = svc->get_last_committed();
5047     reply->oldest_version = svc->get_first_committed();
5048     reply->set_tid(m->get_tid());
5049
5050     m->get_connection()->send_message(reply);
5051   }
5052  out:
5053   return;
5054 }
5055
5056 bool Monitor::ms_handle_reset(Connection *con)
5057 {
5058   dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5059
5060   // ignore lossless monitor sessions
5061   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5062     return false;
5063
5064   MonSession *s = static_cast<MonSession *>(con->get_priv());
5065   if (!s)
5066     return false;
5067
5068   // break any con <-> session ref cycle
5069   s->con->set_priv(NULL);
5070
5071   if (is_shutdown())
5072     return false;
5073
5074   Mutex::Locker l(lock);
5075
5076   dout(10) << "reset/close on session " << s->inst << dendl;
5077   if (!s->closed) {
5078     Mutex::Locker l(session_map_lock);
5079     remove_session(s);
5080   }
5081   s->put();
5082   return true;
5083 }
5084
5085 bool Monitor::ms_handle_refused(Connection *con)
5086 {
5087   // just log for now...
5088   dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5089   return false;
5090 }
5091
5092 // -----
5093
5094 void Monitor::send_latest_monmap(Connection *con)
5095 {
5096   bufferlist bl;
5097   monmap->encode(bl, con->get_features());
5098   con->send_message(new MMonMap(bl));
5099 }
5100
5101 void Monitor::handle_mon_get_map(MonOpRequestRef op)
5102 {
5103   MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
5104   dout(10) << "handle_mon_get_map" << dendl;
5105   send_latest_monmap(m->get_connection().get());
5106 }
5107
5108 void Monitor::handle_mon_metadata(MonOpRequestRef op)
5109 {
5110   MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
5111   if (is_leader()) {
5112     dout(10) << __func__ << dendl;
5113     update_mon_metadata(m->get_source().num(), std::move(m->data));
5114   }
5115 }
5116
5117 void Monitor::update_mon_metadata(int from, Metadata&& m)
5118 {
5119   // NOTE: this is now for legacy (kraken or jewel) mons only.
5120   pending_metadata[from] = std::move(m);
5121
5122   MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5123   bufferlist bl;
5124   ::encode(pending_metadata, bl);
5125   t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5126   paxos->trigger_propose();
5127 }
5128
5129 int Monitor::load_metadata()
5130 {
5131   bufferlist bl;
5132   int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5133   if (r)
5134     return r;
5135   bufferlist::iterator it = bl.begin();
5136   ::decode(mon_metadata, it);
5137
5138   pending_metadata = mon_metadata;
5139   return 0;
5140 }
5141
5142 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5143 {
5144   assert(f);
5145   if (!mon_metadata.count(mon)) {
5146     err << "mon." << mon << " not found";
5147     return -EINVAL;
5148   }
5149   const Metadata& m = mon_metadata[mon];
5150   for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5151     f->dump_string(p->first.c_str(), p->second);
5152   }
5153   return 0;
5154 }
5155
5156 void Monitor::count_metadata(const string& field, map<string,int> *out)
5157 {
5158   for (auto& p : mon_metadata) {
5159     auto q = p.second.find(field);
5160     if (q == p.second.end()) {
5161       (*out)["unknown"]++;
5162     } else {
5163       (*out)[q->second]++;
5164     }
5165   }
5166 }
5167
5168 void Monitor::count_metadata(const string& field, Formatter *f)
5169 {
5170   map<string,int> by_val;
5171   count_metadata(field, &by_val);
5172   f->open_object_section(field.c_str());
5173   for (auto& p : by_val) {
5174     f->dump_int(p.first.c_str(), p.second);
5175   }
5176   f->close_section();
5177 }
5178
5179 int Monitor::print_nodes(Formatter *f, ostream& err)
5180 {
5181   map<string, list<int> > mons; // hostname => mon
5182   for (map<int, Metadata>::iterator it = mon_metadata.begin();
5183        it != mon_metadata.end(); ++it) {
5184     const Metadata& m = it->second;
5185     Metadata::const_iterator hostname = m.find("hostname");
5186     if (hostname == m.end()) {
5187       // not likely though
5188       continue;
5189     }
5190     mons[hostname->second].push_back(it->first);
5191   }
5192
5193   dump_services(f, mons, "mon");
5194   return 0;
5195 }
5196
5197 // ----------------------------------------------
5198 // scrub
5199
5200 int Monitor::scrub_start()
5201 {
5202   dout(10) << __func__ << dendl;
5203   assert(is_leader());
5204
5205   if (!scrub_result.empty()) {
5206     clog->info() << "scrub already in progress";
5207     return -EBUSY;
5208   }
5209
5210   scrub_event_cancel();
5211   scrub_result.clear();
5212   scrub_state.reset(new ScrubState);
5213
5214   scrub();
5215   return 0;
5216 }
5217
5218 int Monitor::scrub()
5219 {
5220   assert(is_leader());
5221   assert(scrub_state);
5222
5223   scrub_cancel_timeout();
5224   wait_for_paxos_write();
5225   scrub_version = paxos->get_version();
5226
5227
5228   // scrub all keys if we're the only monitor in the quorum
5229   int32_t num_keys =
5230     (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5231
5232   for (set<int>::iterator p = quorum.begin();
5233        p != quorum.end();
5234        ++p) {
5235     if (*p == rank)
5236       continue;
5237     MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5238                                  num_keys);
5239     r->key = scrub_state->last_key;
5240     messenger->send_message(r, monmap->get_inst(*p));
5241   }
5242
5243   // scrub my keys
5244   bool r = _scrub(&scrub_result[rank],
5245                   &scrub_state->last_key,
5246                   &num_keys);
5247
5248   scrub_state->finished = !r;
5249
5250   // only after we got our scrub results do we really care whether the
5251   // other monitors are late on their results.  Also, this way we avoid
5252   // triggering the timeout if we end up getting stuck in _scrub() for
5253   // longer than the duration of the timeout.
5254   scrub_reset_timeout();
5255
5256   if (quorum.size() == 1) {
5257     assert(scrub_state->finished == true);
5258     scrub_finish();
5259   }
5260   return 0;
5261 }
5262
5263 void Monitor::handle_scrub(MonOpRequestRef op)
5264 {
5265   MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
5266   dout(10) << __func__ << " " << *m << dendl;
5267   switch (m->op) {
5268   case MMonScrub::OP_SCRUB:
5269     {
5270       if (!is_peon())
5271         break;
5272
5273       wait_for_paxos_write();
5274
5275       if (m->version != paxos->get_version())
5276         break;
5277
5278       MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5279                                        m->version,
5280                                        m->num_keys);
5281
5282       reply->key = m->key;
5283       _scrub(&reply->result, &reply->key, &reply->num_keys);
5284       m->get_connection()->send_message(reply);
5285     }
5286     break;
5287
5288   case MMonScrub::OP_RESULT:
5289     {
5290       if (!is_leader())
5291         break;
5292       if (m->version != scrub_version)
5293         break;
5294       // reset the timeout each time we get a result
5295       scrub_reset_timeout();
5296
5297       int from = m->get_source().num();
5298       assert(scrub_result.count(from) == 0);
5299       scrub_result[from] = m->result;
5300
5301       if (scrub_result.size() == quorum.size()) {
5302         scrub_check_results();
5303         scrub_result.clear();
5304         if (scrub_state->finished)
5305           scrub_finish();
5306         else
5307           scrub();
5308       }
5309     }
5310     break;
5311   }
5312 }
5313
5314 bool Monitor::_scrub(ScrubResult *r,
5315                      pair<string,string> *start,
5316                      int *num_keys)
5317 {
5318   assert(r != NULL);
5319   assert(start != NULL);
5320   assert(num_keys != NULL);
5321
5322   set<string> prefixes = get_sync_targets_names();
5323   prefixes.erase("paxos");  // exclude paxos, as this one may have extra states for proposals, etc.
5324
5325   dout(10) << __func__ << " start (" << *start << ")"
5326            << " num_keys " << *num_keys << dendl;
5327
5328   MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5329
5330   int scrubbed_keys = 0;
5331   pair<string,string> last_key;
5332
5333   while (it->has_next_chunk()) {
5334
5335     if (*num_keys > 0 && scrubbed_keys == *num_keys)
5336       break;
5337
5338     pair<string,string> k = it->get_next_key();
5339     if (prefixes.count(k.first) == 0)
5340       continue;
5341
5342     if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5343         (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5344       dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5345                << dendl;
5346       continue;
5347     }
5348
5349     bufferlist bl;
5350     int err = store->get(k.first, k.second, bl);
5351     assert(err == 0);
5352     
5353     uint32_t key_crc = bl.crc32c(0);
5354     dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5355                                      << " crc " << key_crc << dendl;
5356     r->prefix_keys[k.first]++;
5357     if (r->prefix_crc.count(k.first) == 0) {
5358       r->prefix_crc[k.first] = 0;
5359     }
5360     r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5361
5362     if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5363         (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5364       dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5365       r->prefix_crc[k.first] += 1;
5366     }
5367
5368     ++scrubbed_keys;
5369     last_key = k;
5370   }
5371
5372   dout(20) << __func__ << " last_key (" << last_key << ")"
5373                        << " scrubbed_keys " << scrubbed_keys
5374                        << " has_next " << it->has_next_chunk() << dendl;
5375
5376   *start = last_key;
5377   *num_keys = scrubbed_keys;
5378
5379   return it->has_next_chunk();
5380 }
5381
5382 void Monitor::scrub_check_results()
5383 {
5384   dout(10) << __func__ << dendl;
5385
5386   // compare
5387   int errors = 0;
5388   ScrubResult& mine = scrub_result[rank];
5389   for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5390        p != scrub_result.end();
5391        ++p) {
5392     if (p->first == rank)
5393       continue;
5394     if (p->second != mine) {
5395       ++errors;
5396       clog->error() << "scrub mismatch";
5397       clog->error() << " mon." << rank << " " << mine;
5398       clog->error() << " mon." << p->first << " " << p->second;
5399     }
5400   }
5401   if (!errors)
5402     clog->debug() << "scrub ok on " << quorum << ": " << mine;
5403 }
5404
5405 inline void Monitor::scrub_timeout()
5406 {
5407   dout(1) << __func__ << " restarting scrub" << dendl;
5408   scrub_reset();
5409   scrub_start();
5410 }
5411
5412 void Monitor::scrub_finish()
5413 {
5414   dout(10) << __func__ << dendl;
5415   scrub_reset();
5416   scrub_event_start();
5417 }
5418
5419 void Monitor::scrub_reset()
5420 {
5421   dout(10) << __func__ << dendl;
5422   scrub_cancel_timeout();
5423   scrub_version = 0;
5424   scrub_result.clear();
5425   scrub_state.reset();
5426 }
5427
5428 inline void Monitor::scrub_update_interval(int secs)
5429 {
5430   // we don't care about changes if we are not the leader.
5431   // changes will be visible if we become the leader.
5432   if (!is_leader())
5433     return;
5434
5435   dout(1) << __func__ << " new interval = " << secs << dendl;
5436
5437   // if scrub already in progress, all changes will already be visible during
5438   // the next round.  Nothing to do.
5439   if (scrub_state != NULL)
5440     return;
5441
5442   scrub_event_cancel();
5443   scrub_event_start();
5444 }
5445
5446 void Monitor::scrub_event_start()
5447 {
5448   dout(10) << __func__ << dendl;
5449
5450   if (scrub_event)
5451     scrub_event_cancel();
5452
5453   if (cct->_conf->mon_scrub_interval <= 0) {
5454     dout(1) << __func__ << " scrub event is disabled"
5455             << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5456             << ")" << dendl;
5457     return;
5458   }
5459
5460   scrub_event = timer.add_event_after(
5461     cct->_conf->mon_scrub_interval,
5462     new C_MonContext(this, [this](int) {
5463       scrub_start();
5464       }));
5465 }
5466
5467 void Monitor::scrub_event_cancel()
5468 {
5469   dout(10) << __func__ << dendl;
5470   if (scrub_event) {
5471     timer.cancel_event(scrub_event);
5472     scrub_event = NULL;
5473   }
5474 }
5475
5476 inline void Monitor::scrub_cancel_timeout()
5477 {
5478   if (scrub_timeout_event) {
5479     timer.cancel_event(scrub_timeout_event);
5480     scrub_timeout_event = NULL;
5481   }
5482 }
5483
5484 void Monitor::scrub_reset_timeout()
5485 {
5486   dout(15) << __func__ << " reset timeout event" << dendl;
5487   scrub_cancel_timeout();
5488   scrub_timeout_event = timer.add_event_after(
5489     g_conf->mon_scrub_timeout,
5490     new C_MonContext(this, [this](int) {
5491       scrub_timeout();
5492     }));
5493 }
5494
5495 /************ TICK ***************/
5496 void Monitor::new_tick()
5497 {
5498   timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
5499         tick();
5500       }));
5501 }
5502
5503 void Monitor::tick()
5504 {
5505   // ok go.
5506   dout(11) << "tick" << dendl;
5507   const utime_t now = ceph_clock_now();
5508   
5509   // Check if we need to emit any delayed health check updated messages
5510   if (is_leader()) {
5511     const auto min_period = g_conf->get_val<int64_t>(
5512                               "mon_health_log_update_period");
5513     for (auto& svc : paxos_service) {
5514       auto health = svc->get_health_checks();
5515
5516       for (const auto &i : health.checks) {
5517         const std::string &code = i.first;
5518         const std::string &summary = i.second.summary;
5519         const health_status_t severity = i.second.severity;
5520
5521         auto status_iter = health_check_log_times.find(code);
5522         if (status_iter == health_check_log_times.end()) {
5523           continue;
5524         }
5525
5526         auto &log_status = status_iter->second;
5527         bool const changed = log_status.last_message != summary
5528                              || log_status.severity != severity;
5529
5530         if (changed && now - log_status.updated_at > min_period) {
5531           log_status.last_message = summary;
5532           log_status.updated_at = now;
5533           log_status.severity = severity;
5534
5535           ostringstream ss;
5536           ss << "Health check update: " << summary << " (" << code << ")";
5537           clog->health(severity) << ss.str();
5538         }
5539       }
5540     }
5541   }
5542
5543
5544   for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
5545     (*p)->tick();
5546     (*p)->maybe_trim();
5547   }
5548   
5549   // trim sessions
5550   {
5551     Mutex::Locker l(session_map_lock);
5552     auto p = session_map.sessions.begin();
5553
5554     bool out_for_too_long = (!exited_quorum.is_zero() &&
5555                              now > (exited_quorum + 2*g_conf->mon_lease));
5556
5557     while (!p.end()) {
5558       MonSession *s = *p;
5559       ++p;
5560     
5561       // don't trim monitors
5562       if (s->inst.name.is_mon())
5563         continue;
5564
5565       if (s->session_timeout < now && s->con) {
5566         // check keepalive, too
5567         s->session_timeout = s->con->get_last_keepalive();
5568         s->session_timeout += g_conf->mon_session_timeout;
5569       }
5570       if (s->session_timeout < now) {
5571         dout(10) << " trimming session " << s->con << " " << s->inst
5572                  << " (timeout " << s->session_timeout
5573                  << " < now " << now << ")" << dendl;
5574       } else if (out_for_too_long) {
5575         // boot the client Session because we've taken too long getting back in
5576         dout(10) << " trimming session " << s->con << " " << s->inst
5577                  << " because we've been out of quorum too long" << dendl;
5578       } else {
5579         continue;
5580       }
5581
5582       s->con->mark_down();
5583       remove_session(s);
5584       logger->inc(l_mon_session_trim);
5585     }
5586   }
5587   sync_trim_providers();
5588
5589   if (!maybe_wait_for_quorum.empty()) {
5590     finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5591   }
5592
5593   if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5594     // this is only necessary on upgraded clusters.
5595     MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5596     prepare_new_fingerprint(t);
5597     paxos->trigger_propose();
5598   }
5599
5600   new_tick();
5601 }
5602
5603 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5604 {
5605   uuid_d nf;
5606   nf.generate_random();
5607   dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5608
5609   bufferlist bl;
5610   ::encode(nf, bl);
5611   t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5612 }
5613
5614 int Monitor::check_fsid()
5615 {
5616   bufferlist ebl;
5617   int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5618   if (r == -ENOENT)
5619     return r;
5620   assert(r == 0);
5621
5622   string es(ebl.c_str(), ebl.length());
5623
5624   // only keep the first line
5625   size_t pos = es.find_first_of('\n');
5626   if (pos != string::npos)
5627     es.resize(pos);
5628
5629   dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5630   uuid_d ondisk;
5631   if (!ondisk.parse(es.c_str())) {
5632     derr << "error: unable to parse uuid" << dendl;
5633     return -EINVAL;
5634   }
5635
5636   if (monmap->get_fsid() != ondisk) {
5637     derr << "error: cluster_uuid file exists with value " << ondisk
5638          << ", != our uuid " << monmap->get_fsid() << dendl;
5639     return -EEXIST;
5640   }
5641
5642   return 0;
5643 }
5644
5645 int Monitor::write_fsid()
5646 {
5647   auto t(std::make_shared<MonitorDBStore::Transaction>());
5648   write_fsid(t);
5649   int r = store->apply_transaction(t);
5650   return r;
5651 }
5652
5653 int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5654 {
5655   ostringstream ss;
5656   ss << monmap->get_fsid() << "\n";
5657   string us = ss.str();
5658
5659   bufferlist b;
5660   b.append(us);
5661
5662   t->put(MONITOR_NAME, "cluster_uuid", b);
5663   return 0;
5664 }
5665
5666 /*
5667  * this is the closest thing to a traditional 'mkfs' for ceph.
5668  * initialize the monitor state machines to their initial values.
5669  */
5670 int Monitor::mkfs(bufferlist& osdmapbl)
5671 {
5672   auto t(std::make_shared<MonitorDBStore::Transaction>());
5673
5674   // verify cluster fsid
5675   int r = check_fsid();
5676   if (r < 0 && r != -ENOENT)
5677     return r;
5678
5679   bufferlist magicbl;
5680   magicbl.append(CEPH_MON_ONDISK_MAGIC);
5681   magicbl.append("\n");
5682   t->put(MONITOR_NAME, "magic", magicbl);
5683
5684
5685   features = get_initial_supported_features();
5686   write_features(t);
5687
5688   // save monmap, osdmap, keyring.
5689   bufferlist monmapbl;
5690   monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5691   monmap->set_epoch(0);     // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5692   t->put("mkfs", "monmap", monmapbl);
5693
5694   if (osdmapbl.length()) {
5695     // make sure it's a valid osdmap
5696     try {
5697       OSDMap om;
5698       om.decode(osdmapbl);
5699     }
5700     catch (buffer::error& e) {
5701       derr << "error decoding provided osdmap: " << e.what() << dendl;
5702       return -EINVAL;
5703     }
5704     t->put("mkfs", "osdmap", osdmapbl);
5705   }
5706
5707   if (is_keyring_required()) {
5708     KeyRing keyring;
5709     string keyring_filename;
5710
5711     r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
5712     if (r) {
5713       derr << "unable to find a keyring file on " << g_conf->keyring
5714            << ": " << cpp_strerror(r) << dendl;
5715       if (g_conf->key != "") {
5716         string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
5717           "\n\tcaps mon = \"allow *\"\n";
5718         bufferlist bl;
5719         bl.append(keyring_plaintext);
5720         try {
5721           bufferlist::iterator i = bl.begin();
5722           keyring.decode_plaintext(i);
5723         }
5724         catch (const buffer::error& e) {
5725           derr << "error decoding keyring " << keyring_plaintext
5726                << ": " << e.what() << dendl;
5727           return -EINVAL;
5728         }
5729       } else {
5730         return -ENOENT;
5731       }
5732     } else {
5733       r = keyring.load(g_ceph_context, keyring_filename);
5734       if (r < 0) {
5735         derr << "unable to load initial keyring " << g_conf->keyring << dendl;
5736         return r;
5737       }
5738     }
5739
5740     // put mon. key in external keyring; seed with everything else.
5741     extract_save_mon_key(keyring);
5742
5743     bufferlist keyringbl;
5744     keyring.encode_plaintext(keyringbl);
5745     t->put("mkfs", "keyring", keyringbl);
5746   }
5747   write_fsid(t);
5748   store->apply_transaction(t);
5749
5750   return 0;
5751 }
5752
5753 int Monitor::write_default_keyring(bufferlist& bl)
5754 {
5755   ostringstream os;
5756   os << g_conf->mon_data << "/keyring";
5757
5758   int err = 0;
5759   int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600);
5760   if (fd < 0) {
5761     err = -errno;
5762     dout(0) << __func__ << " failed to open " << os.str() 
5763             << ": " << cpp_strerror(err) << dendl;
5764     return err;
5765   }
5766
5767   err = bl.write_fd(fd);
5768   if (!err)
5769     ::fsync(fd);
5770   VOID_TEMP_FAILURE_RETRY(::close(fd));
5771
5772   return err;
5773 }
5774
5775 void Monitor::extract_save_mon_key(KeyRing& keyring)
5776 {
5777   EntityName mon_name;
5778   mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5779   EntityAuth mon_key;
5780   if (keyring.get_auth(mon_name, mon_key)) {
5781     dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5782     KeyRing pkey;
5783     pkey.add(mon_name, mon_key);
5784     bufferlist bl;
5785     pkey.encode_plaintext(bl);
5786     write_default_keyring(bl);
5787     keyring.remove(mon_name);
5788   }
5789 }
5790
5791 bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
5792                                 bool force_new)
5793 {
5794   dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
5795            << dendl;
5796
5797   if (is_shutdown())
5798     return false;
5799
5800   // we only connect to other monitors and mgr; every else connects to us.
5801   if (service_id != CEPH_ENTITY_TYPE_MON &&
5802       service_id != CEPH_ENTITY_TYPE_MGR)
5803     return false;
5804
5805   if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5806     // auth_none
5807     dout(20) << __func__ << " building auth_none authorizer" << dendl;
5808     AuthNoneClientHandler handler(g_ceph_context, nullptr);
5809     handler.set_global_id(0);
5810     *authorizer = handler.build_authorizer(service_id);
5811     return true;
5812   }
5813
5814   CephXServiceTicketInfo auth_ticket_info;
5815   CephXSessionAuthInfo info;
5816   int ret;
5817
5818   EntityName name;
5819   name.set_type(CEPH_ENTITY_TYPE_MON);
5820   auth_ticket_info.ticket.name = name;
5821   auth_ticket_info.ticket.global_id = 0;
5822
5823   if (service_id == CEPH_ENTITY_TYPE_MON) {
5824     // mon to mon authentication uses the private monitor shared key and not the
5825     // rotating key
5826     CryptoKey secret;
5827     if (!keyring.get_secret(name, secret) &&
5828         !key_server.get_secret(name, secret)) {
5829       dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5830               << dendl;
5831       stringstream ss, ds;
5832       int err = key_server.list_secrets(ds);
5833       if (err < 0)
5834         ss << "no installed auth entries!";
5835       else
5836         ss << "installed auth entries:";
5837       dout(0) << ss.str() << "\n" << ds.str() << dendl;
5838       return false;
5839     }
5840
5841     ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
5842                                              secret, (uint64_t)-1);
5843     if (ret < 0) {
5844       dout(0) << __func__ << " failed to build mon session_auth_info "
5845               << cpp_strerror(ret) << dendl;
5846       return false;
5847     }
5848   } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
5849     // mgr
5850     ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
5851     if (ret < 0) {
5852       derr << __func__ << " failed to build mgr service session_auth_info "
5853            << cpp_strerror(ret) << dendl;
5854       return false;
5855     }
5856   } else {
5857     ceph_abort();  // see check at top of fn
5858   }
5859
5860   CephXTicketBlob blob;
5861   if (!cephx_build_service_ticket_blob(cct, info, blob)) {
5862     dout(0) << "ms_get_authorizer failed to build service ticket" << dendl;
5863     return false;
5864   }
5865   bufferlist ticket_data;
5866   ::encode(blob, ticket_data);
5867
5868   bufferlist::iterator iter = ticket_data.begin();
5869   CephXTicketHandler handler(g_ceph_context, service_id);
5870   ::decode(handler.ticket, iter);
5871
5872   handler.session_key = info.session_key;
5873
5874   *authorizer = handler.build_authorizer(0);
5875   
5876   return true;
5877 }
5878
5879 bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
5880                                    int protocol, bufferlist& authorizer_data,
5881                                    bufferlist& authorizer_reply,
5882                                    bool& isvalid, CryptoKey& session_key)
5883 {
5884   dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
5885            << " " << ceph_entity_type_name(peer_type)
5886            << " protocol " << protocol << dendl;
5887
5888   if (is_shutdown())
5889     return false;
5890
5891   if (peer_type == CEPH_ENTITY_TYPE_MON &&
5892       auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5893     // monitor, and cephx is enabled
5894     isvalid = false;
5895     if (protocol == CEPH_AUTH_CEPHX) {
5896       bufferlist::iterator iter = authorizer_data.begin();
5897       CephXServiceTicketInfo auth_ticket_info;
5898       
5899       if (authorizer_data.length()) {
5900         bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
5901                                           auth_ticket_info, authorizer_reply);
5902         if (ret) {
5903           session_key = auth_ticket_info.session_key;
5904           isvalid = true;
5905         } else {
5906           dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
5907         }
5908       }
5909     } else {
5910       dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;
5911     }
5912   } else {
5913     // who cares.
5914     isvalid = true;
5915   }
5916   return true;
5917 }