1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
21 #include <boost/scope_exit.hpp>
22 #include <boost/algorithm/string/predicate.hpp>
25 #include "common/version.h"
27 #include "osd/OSDMap.h"
29 #include "MonitorDBStore.h"
31 #include "messages/PaxosServiceMessage.h"
32 #include "messages/MMonMap.h"
33 #include "messages/MMonGetMap.h"
34 #include "messages/MMonGetVersion.h"
35 #include "messages/MMonGetVersionReply.h"
36 #include "messages/MGenericMessage.h"
37 #include "messages/MMonCommand.h"
38 #include "messages/MMonCommandAck.h"
39 #include "messages/MMonHealth.h"
40 #include "messages/MMonMetadata.h"
41 #include "messages/MMonSync.h"
42 #include "messages/MMonScrub.h"
43 #include "messages/MMonProbe.h"
44 #include "messages/MMonJoin.h"
45 #include "messages/MMonPaxos.h"
46 #include "messages/MRoute.h"
47 #include "messages/MForward.h"
48 #include "messages/MStatfs.h"
50 #include "messages/MMonSubscribe.h"
51 #include "messages/MMonSubscribeAck.h"
53 #include "messages/MAuthReply.h"
55 #include "messages/MTimeCheck.h"
56 #include "messages/MPing.h"
58 #include "common/strtol.h"
59 #include "common/ceph_argparse.h"
60 #include "common/Timer.h"
61 #include "common/Clock.h"
62 #include "common/errno.h"
63 #include "common/perf_counters.h"
64 #include "common/admin_socket.h"
65 #include "global/signal_handler.h"
66 #include "common/Formatter.h"
67 #include "include/stringify.h"
68 #include "include/color.h"
69 #include "include/ceph_fs.h"
70 #include "include/str_list.h"
72 #include "OSDMonitor.h"
73 #include "MDSMonitor.h"
74 #include "MonmapMonitor.h"
75 #include "PGMonitor.h"
76 #include "LogMonitor.h"
77 #include "AuthMonitor.h"
78 #include "MgrMonitor.h"
79 #include "MgrStatMonitor.h"
80 #include "mon/QuorumService.h"
81 #include "mon/OldHealthMonitor.h"
82 #include "mon/HealthMonitor.h"
83 #include "mon/ConfigKeyService.h"
84 #include "common/config.h"
85 #include "common/cmdparse.h"
86 #include "include/assert.h"
87 #include "include/compat.h"
88 #include "perfglue/heap_profiler.h"
90 #include "auth/none/AuthNoneClientHandler.h"
92 #define dout_subsys ceph_subsys_mon
94 #define dout_prefix _prefix(_dout, this)
95 static ostream& _prefix(std::ostream *_dout, const Monitor *mon) {
96 return *_dout << "mon." << mon->name << "@" << mon->rank
97 << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " ";
100 const string Monitor::MONITOR_NAME = "monitor";
101 const string Monitor::MONITOR_STORE_PREFIX = "monitor_store";
106 #undef COMMAND_WITH_FLAG
107 #define FLAG(f) (MonCommand::FLAG_##f)
108 #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \
109 {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)},
110 #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \
111 {parsesig, helptext, modulename, req_perms, avail, flags},
112 MonCommand mon_commands[] = {
113 #include <mon/MonCommands.h>
115 MonCommand pgmonitor_commands[] = {
116 #include <mon/PGMonitorCommands.h>
119 #undef COMMAND_WITH_FLAG
122 void C_MonContext::finish(int r) {
123 if (mon->is_shutdown())
125 FunctionContext::finish(r);
128 Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
129 Messenger *m, Messenger *mgr_m, MonMap *map) :
134 con_self(m ? m->get_loopback_connection() : NULL),
135 lock("Monitor::lock"),
137 finisher(cct_, "mon_finisher", "fin"),
138 cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads),
139 has_ever_joined(false),
140 logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
142 log_client(cct_, messenger, monmap, LogClient::FLAG_MON),
143 key_server(cct, &keyring),
144 auth_cluster_required(cct,
145 cct->_conf->auth_supported.empty() ?
146 cct->_conf->auth_cluster_required : cct->_conf->auth_supported),
147 auth_service_required(cct,
148 cct->_conf->auth_supported.empty() ?
149 cct->_conf->auth_service_required : cct->_conf->auth_supported ),
150 mgr_messenger(mgr_m),
151 mgr_client(cct_, mgr_m),
155 state(STATE_PROBING),
158 required_features(0),
160 quorum_con_features(0),
164 scrub_timeout_event(NULL),
167 sync_provider_count(0),
170 sync_start_version(0),
171 sync_timeout_event(NULL),
172 sync_last_committed_floor(0),
176 timecheck_rounds_since_clean(0),
177 timecheck_event(NULL),
179 paxos_service(PAXOS_NUM),
181 routed_request_tid(0),
182 op_tracker(cct, true, 1)
184 clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER);
185 audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT);
187 update_log_clients();
189 paxos = new Paxos(this, "paxos");
191 paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap");
192 paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap");
193 paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap");
194 paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap");
195 paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm");
196 paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth");
197 paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr");
198 paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat");
199 paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health");
201 health_monitor = new OldHealthMonitor(this);
202 config_key_service = new ConfigKeyService(this, paxos);
204 mon_caps = new MonCap();
205 bool r = mon_caps->parse("allow *", NULL);
208 exited_quorum = ceph_clock_now();
210 // prepare local commands
211 local_mon_commands.resize(ARRAY_SIZE(mon_commands));
212 for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) {
213 local_mon_commands[i] = mon_commands[i];
215 MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl);
217 local_upgrading_mon_commands = local_mon_commands;
218 for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) {
219 local_upgrading_mon_commands.push_back(pgmonitor_commands[i]);
221 MonCommand::encode_vector(local_upgrading_mon_commands,
222 local_upgrading_mon_commands_bl);
224 // assume our commands until we have an election. this only means
225 // we won't reply with EINVAL before the election; any command that
226 // actually matters will wait until we have quorum etc and then
227 // retry (and revalidate).
228 leader_mon_commands = local_mon_commands;
230 // note: OSDMonitor may update this based on the luminous flag.
231 pgservice = mgrstatmon()->get_pg_stat_service();
236 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
238 delete health_monitor;
239 delete config_key_service;
241 assert(session_map.sessions.empty());
246 class AdminHook : public AdminSocketHook {
249 explicit AdminHook(Monitor *m) : mon(m) {}
250 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
251 bufferlist& out) override {
253 mon->do_admin_command(command, cmdmap, format, ss);
259 void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format,
262 Mutex::Locker l(lock);
264 boost::scoped_ptr<Formatter> f(Formatter::create(format));
267 for (cmdmap_t::iterator p = cmdmap.begin();
268 p != cmdmap.end(); ++p) {
269 if (p->first == "prefix")
273 args += cmd_vartype_stringify(p->second);
275 args = "[" + args + "]";
277 bool read_only = (command == "mon_status" ||
278 command == "mon metadata" ||
279 command == "quorum_status" ||
281 command == "sessions");
283 (read_only ? audit_clog->debug() : audit_clog->info())
284 << "from='admin socket' entity='admin socket' "
285 << "cmd='" << command << "' args=" << args << ": dispatch";
287 if (command == "mon_status") {
288 get_mon_status(f.get(), ss);
291 } else if (command == "quorum_status") {
292 _quorum_status(f.get(), ss);
293 } else if (command == "sync_force") {
295 if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) ||
296 (validate != "--yes-i-really-mean-it")) {
297 ss << "are you SURE? this will mean the monitor store will be erased "
298 "the next time the monitor is restarted. pass "
299 "'--yes-i-really-mean-it' if you really do.";
302 sync_force(f.get(), ss);
303 } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) {
304 if (!_add_bootstrap_peer_hint(command, cmdmap, ss))
306 } else if (command == "quorum enter") {
307 elector.start_participating();
309 ss << "started responding to quorum, initiated new election";
310 } else if (command == "quorum exit") {
312 elector.stop_participating();
313 ss << "stopped responding to quorum, initiated new election";
314 } else if (command == "ops") {
315 (void)op_tracker.dump_ops_in_flight(f.get());
319 } else if (command == "sessions") {
322 f->open_array_section("sessions");
323 for (auto p : session_map.sessions) {
324 f->dump_stream("session") << *p;
331 assert(0 == "bad AdminSocket command binding");
333 (read_only ? audit_clog->debug() : audit_clog->info())
334 << "from='admin socket' "
335 << "entity='admin socket' "
336 << "cmd=" << command << " "
337 << "args=" << args << ": finished";
341 (read_only ? audit_clog->debug() : audit_clog->info())
342 << "from='admin socket' "
343 << "entity='admin socket' "
344 << "cmd=" << command << " "
345 << "args=" << args << ": aborted";
348 void Monitor::handle_signal(int signum)
350 assert(signum == SIGINT || signum == SIGTERM);
351 derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl;
355 CompatSet Monitor::get_initial_supported_features()
357 CompatSet::FeatureSet ceph_mon_feature_compat;
358 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
359 CompatSet::FeatureSet ceph_mon_feature_incompat;
360 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
361 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS);
362 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
363 ceph_mon_feature_incompat);
366 CompatSet Monitor::get_supported_features()
368 CompatSet compat = get_initial_supported_features();
369 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
370 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
371 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
372 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
373 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
374 compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
378 CompatSet Monitor::get_legacy_features()
380 CompatSet::FeatureSet ceph_mon_feature_compat;
381 CompatSet::FeatureSet ceph_mon_feature_ro_compat;
382 CompatSet::FeatureSet ceph_mon_feature_incompat;
383 ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE);
384 return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat,
385 ceph_mon_feature_incompat);
388 int Monitor::check_features(MonitorDBStore *store)
390 CompatSet required = get_supported_features();
393 read_features_off_disk(store, &ondisk);
395 if (!required.writeable(ondisk)) {
396 CompatSet diff = required.unsupported(ondisk);
397 generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl;
404 void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features)
406 bufferlist featuresbl;
407 store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
408 if (featuresbl.length() == 0) {
409 generic_dout(0) << "WARNING: mon fs missing feature list.\n"
410 << "Assuming it is old-style and introducing one." << dendl;
411 //we only want the baseline ~v.18 features assumed to be on disk.
412 //If new features are introduced this code needs to disappear or
414 *features = get_legacy_features();
416 features->encode(featuresbl);
417 auto t(std::make_shared<MonitorDBStore::Transaction>());
418 t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl);
419 store->apply_transaction(t);
421 bufferlist::iterator it = featuresbl.begin();
422 features->decode(it);
426 void Monitor::read_features()
428 read_features_off_disk(store, &features);
429 dout(10) << "features " << features << dendl;
431 calc_quorum_requirements();
432 dout(10) << "required_features " << required_features << dendl;
435 void Monitor::write_features(MonitorDBStore::TransactionRef t)
439 t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
442 const char** Monitor::get_tracked_conf_keys() const
444 static const char* KEYS[] = {
445 "crushtool", // helpful for testing
446 "mon_election_timeout",
448 "mon_lease_renew_interval_factor",
449 "mon_lease_ack_timeout_factor",
450 "mon_accept_timeout_factor",
454 "clog_to_syslog_facility",
455 "clog_to_syslog_level",
457 "clog_to_graylog_host",
458 "clog_to_graylog_port",
461 // periodic health to clog
462 "mon_health_to_clog",
463 "mon_health_to_clog_interval",
464 "mon_health_to_clog_tick_interval",
466 "mon_scrub_interval",
472 void Monitor::handle_conf_change(const struct md_config_t *conf,
473 const std::set<std::string> &changed)
477 dout(10) << __func__ << " " << changed << dendl;
479 if (changed.count("clog_to_monitors") ||
480 changed.count("clog_to_syslog") ||
481 changed.count("clog_to_syslog_level") ||
482 changed.count("clog_to_syslog_facility") ||
483 changed.count("clog_to_graylog") ||
484 changed.count("clog_to_graylog_host") ||
485 changed.count("clog_to_graylog_port") ||
486 changed.count("host") ||
487 changed.count("fsid")) {
488 update_log_clients();
491 if (changed.count("mon_health_to_clog") ||
492 changed.count("mon_health_to_clog_interval") ||
493 changed.count("mon_health_to_clog_tick_interval")) {
494 health_to_clog_update_conf(changed);
497 if (changed.count("mon_scrub_interval")) {
498 scrub_update_interval(conf->mon_scrub_interval);
502 void Monitor::update_log_clients()
504 map<string,string> log_to_monitors;
505 map<string,string> log_to_syslog;
506 map<string,string> log_channel;
507 map<string,string> log_prio;
508 map<string,string> log_to_graylog;
509 map<string,string> log_to_graylog_host;
510 map<string,string> log_to_graylog_port;
514 if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog,
515 log_channel, log_prio, log_to_graylog,
516 log_to_graylog_host, log_to_graylog_port,
520 clog->update_config(log_to_monitors, log_to_syslog,
521 log_channel, log_prio, log_to_graylog,
522 log_to_graylog_host, log_to_graylog_port,
525 audit_clog->update_config(log_to_monitors, log_to_syslog,
526 log_channel, log_prio, log_to_graylog,
527 log_to_graylog_host, log_to_graylog_port,
531 int Monitor::sanitize_options()
535 // mon_lease must be greater than mon_lease_renewal; otherwise we
536 // may incur in leases expiring before they are renewed.
537 if (g_conf->mon_lease_renew_interval_factor >= 1.0) {
538 clog->error() << "mon_lease_renew_interval_factor ("
539 << g_conf->mon_lease_renew_interval_factor
540 << ") must be less than 1.0";
544 // mon_lease_ack_timeout must be greater than mon_lease to make sure we've
545 // got time to renew the lease and get an ack for it. Having both options
546 // with the same value, for a given small vale, could mean timing out if
547 // the monitors happened to be overloaded -- or even under normal load for
548 // a small enough value.
549 if (g_conf->mon_lease_ack_timeout_factor <= 1.0) {
550 clog->error() << "mon_lease_ack_timeout_factor ("
551 << g_conf->mon_lease_ack_timeout_factor
552 << ") must be greater than 1.0";
559 int Monitor::preinit()
563 dout(1) << "preinit fsid " << monmap->fsid << dendl;
565 int r = sanitize_options();
567 derr << "option sanitization failed!" << dendl;
574 PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last);
575 pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess",
576 PerfCountersBuilder::PRIO_USEFUL);
577 pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions",
578 "sadd", PerfCountersBuilder::PRIO_INTERESTING);
579 pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions",
580 "srm", PerfCountersBuilder::PRIO_INTERESTING);
581 pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions",
582 "strm", PerfCountersBuilder::PRIO_USEFUL);
583 pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in",
584 "ecnt", PerfCountersBuilder::PRIO_USEFUL);
585 pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started",
586 "estt", PerfCountersBuilder::PRIO_INTERESTING);
587 pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won",
588 "ewon", PerfCountersBuilder::PRIO_INTERESTING);
589 pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost",
590 "elst", PerfCountersBuilder::PRIO_INTERESTING);
591 logger = pcb.create_perf_counters();
592 cct->get_perfcounters_collection()->add(logger);
595 assert(!cluster_logger);
597 PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last);
598 pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors");
599 pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum");
600 pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs");
601 pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up");
602 pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)");
603 pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map");
604 pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster");
605 pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space");
606 pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space");
607 pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools");
608 pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups");
609 pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state");
610 pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state");
611 pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state");
612 pcb.add_u64(l_cluster_num_object, "num_object", "Objects");
613 pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects");
614 pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects");
615 pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects");
616 pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects");
617 pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up");
618 pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)");
619 pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS");
620 pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map");
621 cluster_logger = pcb.create_perf_counters();
624 paxos->init_logger();
626 // verify cluster_uuid
628 int r = check_fsid();
640 // have we ever joined a quorum?
641 has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0);
642 dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl;
644 if (!has_ever_joined) {
645 // impose initial quorum restrictions?
646 list<string> initial_members;
647 get_str_list(g_conf->mon_initial_members, initial_members);
649 if (!initial_members.empty()) {
650 dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl;
652 monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(),
655 dout(10) << " monmap is " << *monmap << dendl;
656 dout(10) << " extra probe peers " << extra_probe_peers << dendl;
658 } else if (!monmap->contains(name)) {
659 derr << "not in monmap and have been in a quorum before; "
660 << "must have been removed" << dendl;
661 if (g_conf->mon_force_quorum_join) {
662 dout(0) << "we should have died but "
663 << "'mon_force_quorum_join' is set -- allowing boot" << dendl;
665 derr << "commit suicide!" << dendl;
672 // We have a potentially inconsistent store state in hands. Get rid of it
674 bool clear_store = false;
675 if (store->exists("mon_sync", "in_sync")) {
676 dout(1) << __func__ << " clean up potentially inconsistent store state"
681 if (store->get("mon_sync", "force_sync") > 0) {
682 dout(1) << __func__ << " force sync by clearing store state" << dendl;
687 set<string> sync_prefixes = get_sync_targets_names();
688 store->clear(sync_prefixes);
692 sync_last_committed_floor = store->get("mon_sync", "last_committed_floor");
693 dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl;
696 health_monitor->init();
698 if (is_keyring_required()) {
699 // we need to bootstrap authentication keys so we can form an
701 if (authmon()->get_last_committed() == 0) {
702 dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl;
704 int err = store->get("mkfs", "keyring", bl);
705 if (err == 0 && bl.length() > 0) {
706 // Attempt to decode and extract keyring only if it is found.
708 bufferlist::iterator p = bl.begin();
709 ::decode(keyring, p);
710 extract_save_mon_key(keyring);
714 string keyring_loc = g_conf->mon_data + "/keyring";
716 r = keyring.load(cct, keyring_loc);
719 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
721 if (key_server.get_auth(mon_name, mon_key)) {
722 dout(1) << "copying mon. key from old db to external keyring" << dendl;
723 keyring.add(mon_name, mon_key);
725 keyring.encode_plaintext(bl);
726 write_default_keyring(bl);
728 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
735 admin_hook = new AdminHook(this);
736 AdminSocket* admin_socket = cct->get_admin_socket();
738 // unlock while registering to avoid mon_lock -> admin socket lock dependency.
740 r = admin_socket->register_command("mon_status", "mon_status", admin_hook,
741 "show current monitor status");
743 r = admin_socket->register_command("quorum_status", "quorum_status",
744 admin_hook, "show current quorum status");
746 r = admin_socket->register_command("sync_force",
747 "sync_force name=validate,"
749 "strings=--yes-i-really-mean-it",
751 "force sync of and clear monitor store");
753 r = admin_socket->register_command("add_bootstrap_peer_hint",
754 "add_bootstrap_peer_hint name=addr,"
757 "add peer address as potential bootstrap"
758 " peer for cluster bringup");
760 r = admin_socket->register_command("quorum enter", "quorum enter",
762 "force monitor back into quorum");
764 r = admin_socket->register_command("quorum exit", "quorum exit",
766 "force monitor out of the quorum");
768 r = admin_socket->register_command("ops",
771 "show the ops currently in flight");
773 r = admin_socket->register_command("sessions",
776 "list existing sessions");
781 // add ourselves as a conf observer
782 g_conf->add_observer(this);
790 dout(2) << "init" << dendl;
791 Mutex::Locker l(lock);
802 messenger->add_dispatcher_tail(this);
805 mgr_messenger->add_dispatcher_tail(&mgr_client);
806 mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls
809 // add features of myself into feature_map
810 session_map.feature_map.add_mon(con_self->get_features());
814 void Monitor::init_paxos()
816 dout(10) << __func__ << dendl;
820 for (int i = 0; i < PAXOS_NUM; ++i) {
821 paxos_service[i]->init();
824 refresh_from_paxos(NULL);
827 void Monitor::refresh_from_paxos(bool *need_bootstrap)
829 dout(10) << __func__ << dendl;
832 int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl);
835 bufferlist::iterator p = bl.begin();
836 ::decode(fingerprint, p);
838 catch (buffer::error& e) {
839 dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl;
842 dout(10) << __func__ << " no cluster_fingerprint" << dendl;
845 for (int i = 0; i < PAXOS_NUM; ++i) {
846 paxos_service[i]->refresh(need_bootstrap);
848 for (int i = 0; i < PAXOS_NUM; ++i) {
849 paxos_service[i]->post_refresh();
854 void Monitor::register_cluster_logger()
856 if (!cluster_logger_registered) {
857 dout(10) << "register_cluster_logger" << dendl;
858 cluster_logger_registered = true;
859 cct->get_perfcounters_collection()->add(cluster_logger);
861 dout(10) << "register_cluster_logger - already registered" << dendl;
865 void Monitor::unregister_cluster_logger()
867 if (cluster_logger_registered) {
868 dout(10) << "unregister_cluster_logger" << dendl;
869 cluster_logger_registered = false;
870 cct->get_perfcounters_collection()->remove(cluster_logger);
872 dout(10) << "unregister_cluster_logger - not registered" << dendl;
876 void Monitor::update_logger()
878 cluster_logger->set(l_cluster_num_mon, monmap->size());
879 cluster_logger->set(l_cluster_num_mon_quorum, quorum.size());
882 void Monitor::shutdown()
884 dout(1) << "shutdown" << dendl;
888 wait_for_paxos_write();
890 state = STATE_SHUTDOWN;
892 g_conf->remove_observer(this);
895 AdminSocket* admin_socket = cct->get_admin_socket();
896 admin_socket->unregister_command("mon_status");
897 admin_socket->unregister_command("quorum_status");
898 admin_socket->unregister_command("sync_force");
899 admin_socket->unregister_command("add_bootstrap_peer_hint");
900 admin_socket->unregister_command("quorum enter");
901 admin_socket->unregister_command("quorum exit");
902 admin_socket->unregister_command("ops");
903 admin_socket->unregister_command("sessions");
910 mgr_client.shutdown();
913 finisher.wait_for_empty();
919 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
921 health_monitor->shutdown();
923 finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED);
924 finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED);
930 remove_all_sessions();
933 cct->get_perfcounters_collection()->remove(logger);
937 if (cluster_logger) {
938 if (cluster_logger_registered)
939 cct->get_perfcounters_collection()->remove(cluster_logger);
940 delete cluster_logger;
941 cluster_logger = NULL;
944 log_client.shutdown();
946 // unlock before msgr shutdown...
949 messenger->shutdown(); // last thing! ceph_mon.cc will delete mon.
950 mgr_messenger->shutdown();
953 void Monitor::wait_for_paxos_write()
955 if (paxos->is_writing() || paxos->is_writing_previous()) {
956 dout(10) << __func__ << " flushing pending write" << dendl;
960 dout(10) << __func__ << " flushed pending write" << dendl;
964 void Monitor::bootstrap()
966 dout(10) << "bootstrap" << dendl;
967 wait_for_paxos_write();
969 sync_reset_requester();
970 unregister_cluster_logger();
971 cancel_probe_timeout();
974 int newrank = monmap->get_rank(messenger->get_myaddr());
975 if (newrank < 0 && rank >= 0) {
976 // was i ever part of the quorum?
977 if (has_ever_joined) {
978 dout(0) << " removed from monmap, suicide." << dendl;
982 if (newrank != rank) {
983 dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl;
984 messenger->set_myname(entity_name_t::MON(newrank));
987 // reset all connections, or else our peers will think we are someone else.
988 messenger->mark_down_all();
992 state = STATE_PROBING;
997 if (g_conf->mon_compact_on_bootstrap) {
998 dout(10) << "bootstrap -- triggering compaction" << dendl;
1000 dout(10) << "bootstrap -- finished compaction" << dendl;
1003 // singleton monitor?
1004 if (monmap->size() == 1 && rank == 0) {
1005 win_standalone_election();
1009 reset_probe_timeout();
1011 // i'm outside the quorum
1012 if (monmap->contains(name))
1013 outside_quorum.insert(name);
1016 dout(10) << "probing other monitors" << dendl;
1017 for (unsigned i = 0; i < monmap->size(); i++) {
1019 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
1020 monmap->get_inst(i));
1022 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
1023 p != extra_probe_peers.end();
1025 if (*p != messenger->get_myaddr()) {
1027 i.name = entity_name_t::MON(-1);
1029 messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
1034 bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss)
1037 if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) {
1038 ss << "unable to parse address string value '"
1039 << cmd_vartype_stringify(cmdmap["addr"]) << "'";
1042 dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '"
1043 << addrstr << "'" << dendl;
1046 const char *end = 0;
1047 if (!addr.parse(addrstr.c_str(), &end)) {
1048 ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'";
1052 if (is_leader() || is_peon()) {
1053 ss << "mon already active; ignoring bootstrap hint";
1057 if (addr.get_port() == 0)
1058 addr.set_port(CEPH_MON_PORT);
1060 extra_probe_peers.insert(addr);
1061 ss << "adding peer " << addr << " to list: " << extra_probe_peers;
1065 // called by bootstrap(), or on leader|peon -> electing
1066 void Monitor::_reset()
1068 dout(10) << __func__ << dendl;
1070 cancel_probe_timeout();
1072 health_events_cleanup();
1073 health_check_log_times.clear();
1074 scrub_event_cancel();
1076 leader_since = utime_t();
1077 if (!quorum.empty()) {
1078 exited_quorum = ceph_clock_now();
1081 outside_quorum.clear();
1082 quorum_feature_map.clear();
1088 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
1090 health_monitor->finish();
1094 // -----------------------------------------------------------
1097 set<string> Monitor::get_sync_targets_names()
1099 set<string> targets;
1100 targets.insert(paxos->get_name());
1101 for (int i = 0; i < PAXOS_NUM; ++i)
1102 paxos_service[i]->get_store_prefixes(targets);
1103 ConfigKeyService *config_key_service_ptr = dynamic_cast<ConfigKeyService*>(config_key_service);
1104 assert(config_key_service_ptr);
1105 config_key_service_ptr->get_store_prefixes(targets);
1110 void Monitor::sync_timeout()
1112 dout(10) << __func__ << dendl;
1113 assert(state == STATE_SYNCHRONIZING);
1117 void Monitor::sync_obtain_latest_monmap(bufferlist &bl)
1119 dout(1) << __func__ << dendl;
1121 MonMap latest_monmap;
1123 // Grab latest monmap from MonmapMonitor
1124 bufferlist monmon_bl;
1125 int err = monmon()->get_monmap(monmon_bl);
1127 if (err != -ENOENT) {
1129 << " something wrong happened while reading the store: "
1130 << cpp_strerror(err) << dendl;
1131 assert(0 == "error reading the store");
1134 latest_monmap.decode(monmon_bl);
1137 // Grab last backed up monmap (if any) and compare epochs
1138 if (store->exists("mon_sync", "latest_monmap")) {
1139 bufferlist backup_bl;
1140 int err = store->get("mon_sync", "latest_monmap", backup_bl);
1143 << " something wrong happened while reading the store: "
1144 << cpp_strerror(err) << dendl;
1145 assert(0 == "error reading the store");
1147 assert(backup_bl.length() > 0);
1149 MonMap backup_monmap;
1150 backup_monmap.decode(backup_bl);
1152 if (backup_monmap.epoch > latest_monmap.epoch)
1153 latest_monmap = backup_monmap;
1156 // Check if our current monmap's epoch is greater than the one we've
1158 if (monmap->epoch > latest_monmap.epoch)
1159 latest_monmap = *monmap;
1161 dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl;
1163 latest_monmap.encode(bl, CEPH_FEATURES_ALL);
1166 void Monitor::sync_reset_requester()
1168 dout(10) << __func__ << dendl;
1170 if (sync_timeout_event) {
1171 timer.cancel_event(sync_timeout_event);
1172 sync_timeout_event = NULL;
1175 sync_provider = entity_inst_t();
1178 sync_start_version = 0;
1181 void Monitor::sync_reset_provider()
1183 dout(10) << __func__ << dendl;
1184 sync_providers.clear();
1187 void Monitor::sync_start(entity_inst_t &other, bool full)
1189 dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl;
1191 assert(state == STATE_PROBING ||
1192 state == STATE_SYNCHRONIZING);
1193 state = STATE_SYNCHRONIZING;
1195 // make sure are not a provider for anyone!
1196 sync_reset_provider();
1201 // stash key state, and mark that we are syncing
1202 auto t(std::make_shared<MonitorDBStore::Transaction>());
1203 sync_stash_critical_state(t);
1204 t->put("mon_sync", "in_sync", 1);
1206 sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
1207 dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
1208 << sync_last_committed_floor << dendl;
1209 t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
1211 store->apply_transaction(t);
1213 assert(g_conf->mon_sync_requester_kill_at != 1);
1215 // clear the underlying store
1216 set<string> targets = get_sync_targets_names();
1217 dout(10) << __func__ << " clearing prefixes " << targets << dendl;
1218 store->clear(targets);
1220 // make sure paxos knows it has been reset. this prevents a
1221 // bootstrap and then different probe reply order from possibly
1222 // deciding a partial or no sync is needed.
1225 assert(g_conf->mon_sync_requester_kill_at != 2);
1228 // assume 'other' as the leader. We will update the leader once we receive
1229 // a reply to the sync start.
1230 sync_provider = other;
1232 sync_reset_timeout();
1234 MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT);
1236 m->last_committed = paxos->get_version();
1237 messenger->send_message(m, sync_provider);
1240 void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
1242 dout(10) << __func__ << dendl;
1243 bufferlist backup_monmap;
1244 sync_obtain_latest_monmap(backup_monmap);
1245 assert(backup_monmap.length() > 0);
1246 t->put("mon_sync", "latest_monmap", backup_monmap);
1249 void Monitor::sync_reset_timeout()
1251 dout(10) << __func__ << dendl;
1252 if (sync_timeout_event)
1253 timer.cancel_event(sync_timeout_event);
1254 sync_timeout_event = timer.add_event_after(
1255 g_conf->mon_sync_timeout,
1256 new C_MonContext(this, [this](int) {
1261 void Monitor::sync_finish(version_t last_committed)
1263 dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl;
1265 assert(g_conf->mon_sync_requester_kill_at != 7);
1268 // finalize the paxos commits
1269 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1270 paxos->read_and_prepare_transactions(tx, sync_start_version,
1272 tx->put(paxos->get_name(), "last_committed", last_committed);
1274 dout(30) << __func__ << " final tx dump:\n";
1275 JSONFormatter f(true);
1280 store->apply_transaction(tx);
1283 assert(g_conf->mon_sync_requester_kill_at != 8);
1285 auto t(std::make_shared<MonitorDBStore::Transaction>());
1286 t->erase("mon_sync", "in_sync");
1287 t->erase("mon_sync", "force_sync");
1288 t->erase("mon_sync", "last_committed_floor");
1289 store->apply_transaction(t);
1291 assert(g_conf->mon_sync_requester_kill_at != 9);
1295 assert(g_conf->mon_sync_requester_kill_at != 10);
1300 void Monitor::handle_sync(MonOpRequestRef op)
1302 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1303 dout(10) << __func__ << " " << *m << dendl;
1306 // provider ---------
1308 case MMonSync::OP_GET_COOKIE_FULL:
1309 case MMonSync::OP_GET_COOKIE_RECENT:
1310 handle_sync_get_cookie(op);
1312 case MMonSync::OP_GET_CHUNK:
1313 handle_sync_get_chunk(op);
1316 // client -----------
1318 case MMonSync::OP_COOKIE:
1319 handle_sync_cookie(op);
1322 case MMonSync::OP_CHUNK:
1323 case MMonSync::OP_LAST_CHUNK:
1324 handle_sync_chunk(op);
1326 case MMonSync::OP_NO_COOKIE:
1327 handle_sync_no_cookie(op);
1331 dout(0) << __func__ << " unknown op " << m->op << dendl;
1332 assert(0 == "unknown op");
1338 void Monitor::_sync_reply_no_cookie(MonOpRequestRef op)
1340 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1341 MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie);
1342 m->get_connection()->send_message(reply);
1345 void Monitor::handle_sync_get_cookie(MonOpRequestRef op)
1347 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1348 if (is_synchronizing()) {
1349 _sync_reply_no_cookie(op);
1353 assert(g_conf->mon_sync_provider_kill_at != 1);
1355 // make sure they can understand us.
1356 if ((required_features ^ m->get_connection()->get_features()) &
1357 required_features) {
1358 dout(5) << " ignoring peer mon." << m->get_source().num()
1359 << " has features " << std::hex
1360 << m->get_connection()->get_features()
1361 << " but we require " << required_features << std::dec << dendl;
1365 // make up a unique cookie. include election epoch (which persists
1366 // across restarts for the whole cluster) and a counter for this
1367 // process instance. there is no need to be unique *across*
1368 // monitors, though.
1369 uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count;
1370 assert(sync_providers.count(cookie) == 0);
1372 dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl;
1374 SyncProvider& sp = sync_providers[cookie];
1376 sp.entity = m->get_source_inst();
1377 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1379 set<string> sync_targets;
1380 if (m->op == MMonSync::OP_GET_COOKIE_FULL) {
1382 sync_targets = get_sync_targets_names();
1383 sp.last_committed = paxos->get_version();
1384 sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets);
1386 dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl;
1388 // just catch up paxos
1389 sp.last_committed = m->last_committed;
1391 dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl;
1393 MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie);
1394 reply->last_committed = sp.last_committed;
1395 m->get_connection()->send_message(reply);
1398 void Monitor::handle_sync_get_chunk(MonOpRequestRef op)
1400 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1401 dout(10) << __func__ << " " << *m << dendl;
1403 if (sync_providers.count(m->cookie) == 0) {
1404 dout(10) << __func__ << " no cookie " << m->cookie << dendl;
1405 _sync_reply_no_cookie(op);
1409 assert(g_conf->mon_sync_provider_kill_at != 2);
1411 SyncProvider& sp = sync_providers[m->cookie];
1412 sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2);
1414 if (sp.last_committed < paxos->get_first_committed() &&
1415 paxos->get_first_committed() > 1) {
1416 dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed
1417 << " < our fc " << paxos->get_first_committed() << dendl;
1418 sync_providers.erase(m->cookie);
1419 _sync_reply_no_cookie(op);
1423 MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
1424 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1426 int left = g_conf->mon_sync_max_payload_size;
1427 while (sp.last_committed < paxos->get_version() && left > 0) {
1429 sp.last_committed++;
1431 int err = store->get(paxos->get_name(), sp.last_committed, bl);
1434 tx->put(paxos->get_name(), sp.last_committed, bl);
1435 left -= bl.length();
1436 dout(20) << __func__ << " including paxos state " << sp.last_committed
1439 reply->last_committed = sp.last_committed;
1441 if (sp.full && left > 0) {
1442 sp.synchronizer->get_chunk_tx(tx, left);
1443 sp.last_key = sp.synchronizer->get_last_key();
1444 reply->last_key = sp.last_key;
1447 if ((sp.full && sp.synchronizer->has_next_chunk()) ||
1448 sp.last_committed < paxos->get_version()) {
1449 dout(10) << __func__ << " chunk, through version " << sp.last_committed
1450 << " key " << sp.last_key << dendl;
1452 dout(10) << __func__ << " last chunk, through version " << sp.last_committed
1453 << " key " << sp.last_key << dendl;
1454 reply->op = MMonSync::OP_LAST_CHUNK;
1456 assert(g_conf->mon_sync_provider_kill_at != 3);
1458 // clean up our local state
1459 sync_providers.erase(sp.cookie);
1462 ::encode(*tx, reply->chunk_bl);
1464 m->get_connection()->send_message(reply);
1469 void Monitor::handle_sync_cookie(MonOpRequestRef op)
1471 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1472 dout(10) << __func__ << " " << *m << dendl;
1474 dout(10) << __func__ << " already have a cookie, ignoring" << dendl;
1477 if (m->get_source_inst() != sync_provider) {
1478 dout(10) << __func__ << " source does not match, discarding" << dendl;
1481 sync_cookie = m->cookie;
1482 sync_start_version = m->last_committed;
1484 sync_reset_timeout();
1485 sync_get_next_chunk();
1487 assert(g_conf->mon_sync_requester_kill_at != 3);
1490 void Monitor::sync_get_next_chunk()
1492 dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl;
1493 if (g_conf->mon_inject_sync_get_chunk_delay > 0) {
1494 dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl;
1495 usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0));
1497 MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie);
1498 messenger->send_message(r, sync_provider);
1500 assert(g_conf->mon_sync_requester_kill_at != 4);
1503 void Monitor::handle_sync_chunk(MonOpRequestRef op)
1505 MMonSync *m = static_cast<MMonSync*>(op->get_req());
1506 dout(10) << __func__ << " " << *m << dendl;
1508 if (m->cookie != sync_cookie) {
1509 dout(10) << __func__ << " cookie does not match, discarding" << dendl;
1512 if (m->get_source_inst() != sync_provider) {
1513 dout(10) << __func__ << " source does not match, discarding" << dendl;
1517 assert(state == STATE_SYNCHRONIZING);
1518 assert(g_conf->mon_sync_requester_kill_at != 5);
1520 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1521 tx->append_from_encoded(m->chunk_bl);
1523 dout(30) << __func__ << " tx dump:\n";
1524 JSONFormatter f(true);
1529 store->apply_transaction(tx);
1531 assert(g_conf->mon_sync_requester_kill_at != 6);
1534 dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
1535 auto tx(std::make_shared<MonitorDBStore::Transaction>());
1536 paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
1538 tx->put(paxos->get_name(), "last_committed", m->last_committed);
1540 dout(30) << __func__ << " tx dump:\n";
1541 JSONFormatter f(true);
1546 store->apply_transaction(tx);
1547 paxos->init(); // to refresh what we just wrote
1550 if (m->op == MMonSync::OP_CHUNK) {
1551 sync_reset_timeout();
1552 sync_get_next_chunk();
1553 } else if (m->op == MMonSync::OP_LAST_CHUNK) {
1554 sync_finish(m->last_committed);
1558 void Monitor::handle_sync_no_cookie(MonOpRequestRef op)
1560 dout(10) << __func__ << dendl;
1564 void Monitor::sync_trim_providers()
1566 dout(20) << __func__ << dendl;
1568 utime_t now = ceph_clock_now();
1569 map<uint64_t,SyncProvider>::iterator p = sync_providers.begin();
1570 while (p != sync_providers.end()) {
1571 if (now > p->second.timeout) {
1572 dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl;
1573 sync_providers.erase(p++);
1580 // ---------------------------------------------------
1583 void Monitor::cancel_probe_timeout()
1585 if (probe_timeout_event) {
1586 dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl;
1587 timer.cancel_event(probe_timeout_event);
1588 probe_timeout_event = NULL;
1590 dout(10) << "cancel_probe_timeout (none scheduled)" << dendl;
1594 void Monitor::reset_probe_timeout()
1596 cancel_probe_timeout();
1597 probe_timeout_event = new C_MonContext(this, [this](int r) {
1600 double t = g_conf->mon_probe_timeout;
1601 if (timer.add_event_after(t, probe_timeout_event)) {
1602 dout(10) << "reset_probe_timeout " << probe_timeout_event
1603 << " after " << t << " seconds" << dendl;
1605 probe_timeout_event = nullptr;
1609 void Monitor::probe_timeout(int r)
1611 dout(4) << "probe_timeout " << probe_timeout_event << dendl;
1612 assert(is_probing() || is_synchronizing());
1613 assert(probe_timeout_event);
1614 probe_timeout_event = NULL;
1618 void Monitor::handle_probe(MonOpRequestRef op)
1620 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1621 dout(10) << "handle_probe " << *m << dendl;
1623 if (m->fsid != monmap->fsid) {
1624 dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl;
1629 case MMonProbe::OP_PROBE:
1630 handle_probe_probe(op);
1633 case MMonProbe::OP_REPLY:
1634 handle_probe_reply(op);
1637 case MMonProbe::OP_MISSING_FEATURES:
1638 derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL
1639 << ", required " << m->required_features
1640 << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL)
1646 void Monitor::handle_probe_probe(MonOpRequestRef op)
1648 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1650 dout(10) << "handle_probe_probe " << m->get_source_inst() << *m
1651 << " features " << m->get_connection()->get_features() << dendl;
1652 uint64_t missing = required_features & ~m->get_connection()->get_features();
1654 dout(1) << " peer " << m->get_source_addr() << " missing features "
1655 << missing << dendl;
1656 if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) {
1657 MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES,
1658 name, has_ever_joined);
1659 m->required_features = required_features;
1660 m->get_connection()->send_message(r);
1665 if (!is_probing() && !is_synchronizing()) {
1666 // If the probing mon is way ahead of us, we need to re-bootstrap.
1667 // Normally we capture this case when we initially bootstrap, but
1668 // it is possible we pass those checks (we overlap with
1669 // quorum-to-be) but fail to join a quorum before it moves past
1670 // us. We need to be kicked back to bootstrap so we can
1671 // synchonize, not keep calling elections.
1672 if (paxos->get_version() + 1 < m->paxos_first_version) {
1673 dout(1) << " peer " << m->get_source_addr() << " has first_committed "
1674 << "ahead of us, re-bootstrapping" << dendl;
1682 r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
1685 monmap->encode(r->monmap_bl, m->get_connection()->get_features());
1686 r->paxos_first_version = paxos->get_first_committed();
1687 r->paxos_last_version = paxos->get_version();
1688 m->get_connection()->send_message(r);
1690 // did we discover a peer here?
1691 if (!monmap->contains(m->get_source_addr())) {
1692 dout(1) << " adding peer " << m->get_source_addr()
1693 << " to list of hints" << dendl;
1694 extra_probe_peers.insert(m->get_source_addr());
1701 void Monitor::handle_probe_reply(MonOpRequestRef op)
1703 MMonProbe *m = static_cast<MMonProbe*>(op->get_req());
1704 dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl;
1705 dout(10) << " monmap is " << *monmap << dendl;
1707 // discover name and addrs during probing or electing states.
1708 if (!is_probing() && !is_electing()) {
1712 // newer map, or they've joined a quorum and we haven't?
1714 monmap->encode(mybl, m->get_connection()->get_features());
1715 // make sure it's actually different; the checks below err toward
1716 // taking the other guy's map, which could cause us to loop.
1717 if (!mybl.contents_equal(m->monmap_bl)) {
1718 MonMap *newmap = new MonMap;
1719 newmap->decode(m->monmap_bl);
1720 if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() ||
1721 !has_ever_joined)) {
1722 dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch()
1723 << ", mine was " << monmap->get_epoch() << dendl;
1725 monmap->decode(m->monmap_bl);
1734 string peer_name = monmap->get_name(m->get_source_addr());
1735 if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) {
1736 dout(10) << " renaming peer " << m->get_source_addr() << " "
1737 << peer_name << " -> " << m->name << " in my monmap"
1739 monmap->rename(peer_name, m->name);
1741 if (is_electing()) {
1746 dout(10) << " peer name is " << peer_name << dendl;
1749 // new initial peer?
1750 if (monmap->get_epoch() == 0 &&
1751 monmap->contains(m->name) &&
1752 monmap->get_addr(m->name).is_blank_ip()) {
1753 dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl;
1754 monmap->set_addr(m->name, m->get_source_addr());
1760 // end discover phase
1761 if (!is_probing()) {
1765 assert(paxos != NULL);
1767 if (is_synchronizing()) {
1768 dout(10) << " currently syncing" << dendl;
1772 entity_inst_t other = m->get_source_inst();
1774 if (m->paxos_last_version < sync_last_committed_floor) {
1775 dout(10) << " peer paxos versions [" << m->paxos_first_version
1776 << "," << m->paxos_last_version << "] < my sync_last_committed_floor "
1777 << sync_last_committed_floor << ", ignoring"
1780 if (paxos->get_version() < m->paxos_first_version &&
1781 m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
1782 dout(10) << " peer paxos first versions [" << m->paxos_first_version
1783 << "," << m->paxos_last_version << "]"
1784 << " vs my version " << paxos->get_version()
1785 << " (too far ahead)"
1787 cancel_probe_timeout();
1788 sync_start(other, true);
1791 if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
1792 dout(10) << " peer paxos last version " << m->paxos_last_version
1793 << " vs my version " << paxos->get_version()
1794 << " (too far ahead)"
1796 cancel_probe_timeout();
1797 sync_start(other, false);
1802 // is there an existing quorum?
1803 if (m->quorum.size()) {
1804 dout(10) << " existing quorum " << m->quorum << dendl;
1806 dout(10) << " peer paxos version " << m->paxos_last_version
1807 << " vs my version " << paxos->get_version()
1811 if (monmap->contains(name) &&
1812 !monmap->get_addr(name).is_blank_ip()) {
1813 // i'm part of the cluster; just initiate a new election
1816 dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
1817 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
1818 monmap->get_inst(*m->quorum.begin()));
1821 if (monmap->contains(m->name)) {
1822 dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
1823 outside_quorum.insert(m->name);
1825 dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
1829 unsigned need = monmap->size() / 2 + 1;
1830 dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
1831 if (outside_quorum.size() >= need) {
1832 if (outside_quorum.count(name)) {
1833 dout(10) << " that's enough to form a new quorum, calling election" << dendl;
1836 dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
1839 dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
1844 void Monitor::join_election()
1846 dout(10) << __func__ << dendl;
1847 wait_for_paxos_write();
1849 state = STATE_ELECTING;
1851 logger->inc(l_mon_num_elections);
1854 void Monitor::start_election()
1856 dout(10) << "start_election" << dendl;
1857 wait_for_paxos_write();
1859 state = STATE_ELECTING;
1861 logger->inc(l_mon_num_elections);
1862 logger->inc(l_mon_election_call);
1864 clog->info() << "mon." << name << " calling new monitor election";
1865 elector.call_election();
1868 void Monitor::win_standalone_election()
1870 dout(1) << "win_standalone_election" << dendl;
1872 // bump election epoch, in case the previous epoch included other
1873 // monitors; we need to be able to make the distinction.
1875 elector.advance_epoch();
1877 rank = monmap->get_rank(name);
1882 map<int,Metadata> metadata;
1883 collect_metadata(&metadata[0]);
1885 win_election(elector.get_epoch(), q,
1887 ceph::features::mon::get_supported(),
1891 const utime_t& Monitor::get_leader_since() const
1893 assert(state == STATE_LEADER);
1894 return leader_since;
1897 epoch_t Monitor::get_epoch()
1899 return elector.get_epoch();
1902 void Monitor::_finish_svc_election()
1904 assert(state == STATE_LEADER || state == STATE_PEON);
1906 for (auto p : paxos_service) {
1907 // we already called election_finished() on monmon(); avoid callig twice
1908 if (state == STATE_LEADER && p == monmon())
1910 p->election_finished();
1914 void Monitor::win_election(epoch_t epoch, set<int>& active, uint64_t features,
1915 const mon_feature_t& mon_features,
1916 const map<int,Metadata>& metadata)
1918 dout(10) << __func__ << " epoch " << epoch << " quorum " << active
1919 << " features " << features
1920 << " mon_features " << mon_features
1922 assert(is_electing());
1923 state = STATE_LEADER;
1924 leader_since = ceph_clock_now();
1927 quorum_con_features = features;
1928 quorum_mon_features = mon_features;
1929 pending_metadata = metadata;
1930 outside_quorum.clear();
1932 clog->info() << "mon." << name << "@" << rank
1933 << " won leader election with quorum " << quorum;
1935 set_leader_commands(get_local_commands(mon_features));
1937 paxos->leader_init();
1938 // NOTE: tell monmap monitor first. This is important for the
1939 // bootstrap case to ensure that the very first paxos proposal
1940 // codifies the monmap. Otherwise any manner of chaos can ensue
1941 // when monitors are call elections or participating in a paxos
1942 // round without agreeing on who the participants are.
1943 monmon()->election_finished();
1944 _finish_svc_election();
1945 health_monitor->start(epoch);
1947 logger->inc(l_mon_election_win);
1949 // inject new metadata in first transaction.
1951 // include previous metadata for missing mons (that aren't part of
1952 // the current quorum).
1953 map<int,Metadata> m = metadata;
1954 for (unsigned rank = 0; rank < monmap->size(); ++rank) {
1955 if (m.count(rank) == 0 &&
1956 mon_metadata.count(rank)) {
1957 m[rank] = mon_metadata[rank];
1961 // FIXME: This is a bit sloppy because we aren't guaranteed to submit
1962 // a new transaction immediately after the election finishes. We should
1963 // do that anyway for other reasons, though.
1964 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
1967 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
1971 if (monmap->size() > 1 &&
1972 monmap->get_epoch() > 0) {
1974 health_tick_start();
1975 do_health_to_clog_interval();
1976 scrub_event_start();
1980 void Monitor::lose_election(epoch_t epoch, set<int> &q, int l,
1982 const mon_feature_t& mon_features)
1985 leader_since = utime_t();
1988 outside_quorum.clear();
1989 quorum_con_features = features;
1990 quorum_mon_features = mon_features;
1991 dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader
1992 << " quorum is " << quorum << " features are " << quorum_con_features
1993 << " mon_features are " << quorum_mon_features
1997 _finish_svc_election();
1998 health_monitor->start(epoch);
2000 logger->inc(l_mon_election_lose);
2004 if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) &&
2005 !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) {
2006 // for pre-luminous mons only
2008 collect_metadata(&sys_info);
2009 messenger->send_message(new MMonMetadata(sys_info),
2010 monmap->get_inst(get_leader()));
2014 void Monitor::collect_metadata(Metadata *m)
2016 collect_sys_info(m, g_ceph_context);
2017 (*m)["addr"] = stringify(messenger->get_myaddr());
2020 void Monitor::finish_election()
2022 apply_quorum_to_compatset_features();
2023 apply_monmap_to_compatset_features();
2025 exited_quorum = utime_t();
2026 finish_contexts(g_ceph_context, waitfor_quorum);
2027 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
2028 resend_routed_requests();
2030 register_cluster_logger();
2032 // am i named properly?
2033 string cur_name = monmap->get_name(messenger->get_myaddr());
2034 if (cur_name != name) {
2035 dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl;
2036 messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
2037 monmap->get_inst(*quorum.begin()));
2041 void Monitor::_apply_compatset_features(CompatSet &new_features)
2043 if (new_features.compare(features) != 0) {
2044 CompatSet diff = features.unsupported(new_features);
2045 dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
2046 features = new_features;
2048 auto t = std::make_shared<MonitorDBStore::Transaction>();
2050 store->apply_transaction(t);
2052 calc_quorum_requirements();
2056 void Monitor::apply_quorum_to_compatset_features()
2058 CompatSet new_features(features);
2059 if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) {
2060 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES);
2062 if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) {
2063 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC);
2065 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) {
2066 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2);
2068 if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) {
2069 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3);
2071 dout(5) << __func__ << dendl;
2072 _apply_compatset_features(new_features);
2075 void Monitor::apply_monmap_to_compatset_features()
2077 CompatSet new_features(features);
2078 mon_feature_t monmap_features = monmap->get_required_features();
2080 /* persistent monmap features may go into the compatset.
2081 * optional monmap features may not - why?
2082 * because optional monmap features may be set/unset by the admin,
2083 * and possibly by other means that haven't yet been thought out,
2084 * so we can't make the monitor enforce them on start - because they
2086 * this, of course, does not invalidate setting a compatset feature
2087 * for an optional feature - as long as you make sure to clean it up
2088 * once you unset it.
2090 if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) {
2091 assert(ceph::features::mon::get_persistent().contains_all(
2092 ceph::features::mon::FEATURE_KRAKEN));
2093 // this feature should only ever be set if the quorum supports it.
2094 assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN));
2095 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN);
2097 if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
2098 assert(ceph::features::mon::get_persistent().contains_all(
2099 ceph::features::mon::FEATURE_LUMINOUS));
2100 // this feature should only ever be set if the quorum supports it.
2101 assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS));
2102 new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS);
2105 dout(5) << __func__ << dendl;
2106 _apply_compatset_features(new_features);
2109 void Monitor::calc_quorum_requirements()
2111 required_features = 0;
2114 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) {
2115 required_features |= CEPH_FEATURE_OSD_ERASURE_CODES;
2117 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) {
2118 required_features |= CEPH_FEATURE_OSDMAP_ENC;
2120 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) {
2121 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2;
2123 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) {
2124 required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3;
2126 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) {
2127 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2129 if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) {
2130 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2134 if (monmap->get_required_features().contains_all(
2135 ceph::features::mon::FEATURE_KRAKEN)) {
2136 required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN;
2138 if (monmap->get_required_features().contains_all(
2139 ceph::features::mon::FEATURE_LUMINOUS)) {
2140 required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS;
2142 dout(10) << __func__ << " required_features " << required_features << dendl;
2145 void Monitor::get_combined_feature_map(FeatureMap *fm)
2147 *fm += session_map.feature_map;
2148 for (auto id : quorum) {
2150 *fm += quorum_feature_map[id];
2155 void Monitor::sync_force(Formatter *f, ostream& ss)
2157 bool free_formatter = false;
2160 // louzy/lazy hack: default to json if no formatter has been defined
2161 f = new JSONFormatter();
2162 free_formatter = true;
2165 auto tx(std::make_shared<MonitorDBStore::Transaction>());
2166 sync_stash_critical_state(tx);
2167 tx->put("mon_sync", "force_sync", 1);
2168 store->apply_transaction(tx);
2170 f->open_object_section("sync_force");
2171 f->dump_int("ret", 0);
2172 f->dump_stream("msg") << "forcing store sync the next time the monitor starts";
2173 f->close_section(); // sync_force
2179 void Monitor::_quorum_status(Formatter *f, ostream& ss)
2181 bool free_formatter = false;
2184 // louzy/lazy hack: default to json if no formatter has been defined
2185 f = new JSONFormatter();
2186 free_formatter = true;
2188 f->open_object_section("quorum_status");
2189 f->dump_int("election_epoch", get_epoch());
2191 f->open_array_section("quorum");
2192 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2193 f->dump_int("mon", *p);
2194 f->close_section(); // quorum
2196 list<string> quorum_names = get_quorum_names();
2197 f->open_array_section("quorum_names");
2198 for (list<string>::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p)
2199 f->dump_string("mon", *p);
2200 f->close_section(); // quorum_names
2202 f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin()));
2204 f->open_object_section("monmap");
2206 f->close_section(); // monmap
2208 f->close_section(); // quorum_status
2214 void Monitor::get_mon_status(Formatter *f, ostream& ss)
2216 bool free_formatter = false;
2219 // louzy/lazy hack: default to json if no formatter has been defined
2220 f = new JSONFormatter();
2221 free_formatter = true;
2224 f->open_object_section("mon_status");
2225 f->dump_string("name", name);
2226 f->dump_int("rank", rank);
2227 f->dump_string("state", get_state_name());
2228 f->dump_int("election_epoch", get_epoch());
2230 f->open_array_section("quorum");
2231 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
2232 f->dump_int("mon", *p);
2235 f->close_section(); // quorum
2237 f->open_object_section("features");
2238 f->dump_stream("required_con") << required_features;
2239 mon_feature_t req_mon_features = get_required_mon_features();
2240 req_mon_features.dump(f, "required_mon");
2241 f->dump_stream("quorum_con") << quorum_con_features;
2242 quorum_mon_features.dump(f, "quorum_mon");
2243 f->close_section(); // features
2245 f->open_array_section("outside_quorum");
2246 for (set<string>::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p)
2247 f->dump_string("mon", *p);
2248 f->close_section(); // outside_quorum
2250 f->open_array_section("extra_probe_peers");
2251 for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
2252 p != extra_probe_peers.end();
2254 f->dump_stream("peer") << *p;
2255 f->close_section(); // extra_probe_peers
2257 f->open_array_section("sync_provider");
2258 for (map<uint64_t,SyncProvider>::const_iterator p = sync_providers.begin();
2259 p != sync_providers.end();
2261 f->dump_unsigned("cookie", p->second.cookie);
2262 f->dump_stream("entity") << p->second.entity;
2263 f->dump_stream("timeout") << p->second.timeout;
2264 f->dump_unsigned("last_committed", p->second.last_committed);
2265 f->dump_stream("last_key") << p->second.last_key;
2269 if (is_synchronizing()) {
2270 f->open_object_section("sync");
2271 f->dump_stream("sync_provider") << sync_provider;
2272 f->dump_unsigned("sync_cookie", sync_cookie);
2273 f->dump_unsigned("sync_start_version", sync_start_version);
2277 if (g_conf->mon_sync_provider_kill_at > 0)
2278 f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at);
2279 if (g_conf->mon_sync_requester_kill_at > 0)
2280 f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at);
2282 f->open_object_section("monmap");
2286 f->dump_object("feature_map", session_map.feature_map);
2287 f->close_section(); // mon_status
2289 if (free_formatter) {
2290 // flush formatter to ss and delete it iff we created the formatter
2297 // health status to clog
2299 void Monitor::health_tick_start()
2301 if (!cct->_conf->mon_health_to_clog ||
2302 cct->_conf->mon_health_to_clog_tick_interval <= 0)
2305 dout(15) << __func__ << dendl;
2308 health_tick_event = timer.add_event_after(
2309 cct->_conf->mon_health_to_clog_tick_interval,
2310 new C_MonContext(this, [this](int r) {
2313 do_health_to_clog();
2314 health_tick_start();
2318 void Monitor::health_tick_stop()
2320 dout(15) << __func__ << dendl;
2322 if (health_tick_event) {
2323 timer.cancel_event(health_tick_event);
2324 health_tick_event = NULL;
2328 utime_t Monitor::health_interval_calc_next_update()
2330 utime_t now = ceph_clock_now();
2332 time_t secs = now.sec();
2333 int remainder = secs % cct->_conf->mon_health_to_clog_interval;
2334 int adjustment = cct->_conf->mon_health_to_clog_interval - remainder;
2335 utime_t next = utime_t(secs + adjustment, 0);
2337 dout(20) << __func__
2338 << " now: " << now << ","
2339 << " next: " << next << ","
2340 << " interval: " << cct->_conf->mon_health_to_clog_interval
2346 void Monitor::health_interval_start()
2348 dout(15) << __func__ << dendl;
2350 if (!cct->_conf->mon_health_to_clog ||
2351 cct->_conf->mon_health_to_clog_interval <= 0) {
2355 health_interval_stop();
2356 utime_t next = health_interval_calc_next_update();
2357 health_interval_event = new C_MonContext(this, [this](int r) {
2360 do_health_to_clog_interval();
2362 if (!timer.add_event_at(next, health_interval_event)) {
2363 health_interval_event = nullptr;
2367 void Monitor::health_interval_stop()
2369 dout(15) << __func__ << dendl;
2370 if (health_interval_event) {
2371 timer.cancel_event(health_interval_event);
2373 health_interval_event = NULL;
2376 void Monitor::health_events_cleanup()
2379 health_interval_stop();
2380 health_status_cache.reset();
2383 void Monitor::health_to_clog_update_conf(const std::set<std::string> &changed)
2385 dout(20) << __func__ << dendl;
2387 if (changed.count("mon_health_to_clog")) {
2388 if (!cct->_conf->mon_health_to_clog) {
2389 health_events_cleanup();
2391 if (!health_tick_event) {
2392 health_tick_start();
2394 if (!health_interval_event) {
2395 health_interval_start();
2400 if (changed.count("mon_health_to_clog_interval")) {
2401 if (cct->_conf->mon_health_to_clog_interval <= 0) {
2402 health_interval_stop();
2404 health_interval_start();
2408 if (changed.count("mon_health_to_clog_tick_interval")) {
2409 if (cct->_conf->mon_health_to_clog_tick_interval <= 0) {
2412 health_tick_start();
2417 void Monitor::do_health_to_clog_interval()
2419 // outputting to clog may have been disabled in the conf
2420 // since we were scheduled.
2421 if (!cct->_conf->mon_health_to_clog ||
2422 cct->_conf->mon_health_to_clog_interval <= 0)
2425 dout(10) << __func__ << dendl;
2427 // do we have a cached value for next_clog_update? if not,
2428 // do we know when the last update was?
2430 do_health_to_clog(true);
2431 health_interval_start();
2434 void Monitor::do_health_to_clog(bool force)
2436 // outputting to clog may have been disabled in the conf
2437 // since we were scheduled.
2438 if (!cct->_conf->mon_health_to_clog ||
2439 cct->_conf->mon_health_to_clog_interval <= 0)
2442 dout(10) << __func__ << (force ? " (force)" : "") << dendl;
2444 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2446 health_status_t level = get_health_status(false, nullptr, &summary);
2448 summary == health_status_cache.summary &&
2449 level == health_status_cache.overall)
2451 clog->health(level) << "overall " << summary;
2452 health_status_cache.summary = summary;
2453 health_status_cache.overall = level;
2456 list<string> status;
2457 health_status_t overall = get_health(status, NULL, NULL);
2458 dout(25) << __func__
2459 << (force ? " (force)" : "")
2462 string summary = joinify(status.begin(), status.end(), string("; "));
2465 overall == health_status_cache.overall &&
2466 !health_status_cache.summary.empty() &&
2467 health_status_cache.summary == summary) {
2472 clog->info() << summary;
2474 health_status_cache.overall = overall;
2475 health_status_cache.summary = summary;
2479 health_status_t Monitor::get_health_status(
2486 health_status_t r = HEALTH_OK;
2487 bool compat = g_conf->mon_health_preluminous_compat;
2488 bool compat_warn = g_conf->get_val<bool>("mon_health_preluminous_compat_warning");
2490 f->open_object_section("health");
2491 f->open_object_section("checks");
2495 string *psummary = f ? nullptr : &summary;
2496 for (auto& svc : paxos_service) {
2497 r = std::min(r, svc->get_health_checks().dump_summary(
2498 f, psummary, sep2, want_detail));
2503 f->dump_stream("status") << r;
2505 // one-liner: HEALTH_FOO[ thing1[; thing2 ...]]
2506 *plain = stringify(r);
2507 if (summary.size()) {
2514 const std::string old_fields_message = "'ceph health' JSON format has "
2515 "changed in luminous. If you see this your monitoring system is "
2516 "scraping the wrong fields. Disable this with 'mon health preluminous "
2517 "compat warning = false'";
2519 if (f && (compat || compat_warn)) {
2520 health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r;
2521 f->open_array_section("summary");
2523 f->open_object_section("item");
2524 f->dump_stream("severity") << HEALTH_WARN;
2525 f->dump_string("summary", old_fields_message);
2529 for (auto& svc : paxos_service) {
2530 svc->get_health_checks().dump_summary_compat(f);
2534 f->dump_stream("overall_status") << cr;
2538 if (f && (compat || compat_warn)) {
2539 f->open_array_section("detail");
2541 f->dump_string("item", old_fields_message);
2545 for (auto& svc : paxos_service) {
2546 svc->get_health_checks().dump_detail(f, plain, compat);
2549 if (f && (compat || compat_warn)) {
2559 void Monitor::log_health(
2560 const health_check_map_t& updated,
2561 const health_check_map_t& previous,
2562 MonitorDBStore::TransactionRef t)
2564 if (!g_conf->mon_health_to_clog) {
2568 const utime_t now = ceph_clock_now();
2570 // FIXME: log atomically as part of @t instead of using clog.
2571 dout(10) << __func__ << " updated " << updated.checks.size()
2572 << " previous " << previous.checks.size()
2574 const auto min_log_period = g_conf->get_val<int64_t>(
2575 "mon_health_log_update_period");
2576 for (auto& p : updated.checks) {
2577 auto q = previous.checks.find(p.first);
2578 bool logged = false;
2579 if (q == previous.checks.end()) {
2582 ss << "Health check failed: " << p.second.summary << " ("
2584 clog->health(p.second.severity) << ss.str();
2588 if (p.second.summary != q->second.summary ||
2589 p.second.severity != q->second.severity) {
2591 auto status_iter = health_check_log_times.find(p.first);
2592 if (status_iter != health_check_log_times.end()) {
2593 if (p.second.severity == q->second.severity &&
2594 now - status_iter->second.updated_at < min_log_period) {
2595 // We already logged this recently and the severity is unchanged,
2596 // so skip emitting an update of the summary string.
2597 // We'll get an update out of tick() later if the check
2598 // is still failing.
2603 // summary or severity changed (ignore detail changes at this level)
2605 ss << "Health check update: " << p.second.summary << " (" << p.first << ")";
2606 clog->health(p.second.severity) << ss.str();
2611 // Record the time at which we last logged, so that we can check this
2612 // when considering whether/when to print update messages.
2614 auto iter = health_check_log_times.find(p.first);
2615 if (iter == health_check_log_times.end()) {
2616 health_check_log_times.emplace(p.first, HealthCheckLogStatus(
2617 p.second.severity, p.second.summary, now));
2619 iter->second = HealthCheckLogStatus(
2620 p.second.severity, p.second.summary, now);
2624 for (auto& p : previous.checks) {
2625 if (!updated.checks.count(p.first)) {
2628 if (p.first == "DEGRADED_OBJECTS") {
2629 clog->info() << "All degraded objects recovered";
2630 } else if (p.first == "OSD_FLAGS") {
2631 clog->info() << "OSD flags cleared";
2633 clog->info() << "Health check cleared: " << p.first << " (was: "
2634 << p.second.summary << ")";
2637 if (health_check_log_times.count(p.first)) {
2638 health_check_log_times.erase(p.first);
2643 if (previous.checks.size() && updated.checks.size() == 0) {
2644 // We might be going into a fully healthy state, check
2646 bool any_checks = false;
2647 for (auto& svc : paxos_service) {
2648 if (&(svc->get_health_checks()) == &(previous)) {
2649 // Ignore the ones we're clearing right now
2653 if (svc->get_health_checks().checks.size() > 0) {
2659 clog->info() << "Cluster is now healthy";
2664 health_status_t Monitor::get_health(list<string>& status,
2665 bufferlist *detailbl,
2668 list<pair<health_status_t,string> > summary;
2669 list<pair<health_status_t,string> > detail;
2672 f->open_object_section("health");
2674 for (vector<PaxosService*>::iterator p = paxos_service.begin();
2675 p != paxos_service.end();
2677 PaxosService *s = *p;
2678 s->get_health(summary, detailbl ? &detail : NULL, cct);
2681 health_monitor->get_health(summary, (detailbl ? &detail : NULL));
2683 health_status_t overall = HEALTH_OK;
2684 if (!timecheck_skews.empty()) {
2686 for (map<entity_inst_t,double>::iterator i = timecheck_skews.begin();
2687 i != timecheck_skews.end(); ++i) {
2688 entity_inst_t inst = i->first;
2689 double skew = i->second;
2690 double latency = timecheck_latencies[inst];
2691 string name = monmap->get_name(inst.addr);
2693 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
2694 if (tcstatus != HEALTH_OK) {
2695 if (overall > tcstatus)
2697 warns.push_back(name);
2698 ostringstream tmp_ss;
2699 tmp_ss << "mon." << name
2700 << " addr " << inst.addr << " " << tcss.str()
2701 << " (latency " << latency << "s)";
2702 detail.push_back(make_pair(tcstatus, tmp_ss.str()));
2705 if (!warns.empty()) {
2707 ss << "clock skew detected on";
2708 while (!warns.empty()) {
2709 ss << " mon." << warns.front();
2714 status.push_back(ss.str());
2715 summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected "));
2720 f->open_array_section("summary");
2721 if (!summary.empty()) {
2722 while (!summary.empty()) {
2723 if (overall > summary.front().first)
2724 overall = summary.front().first;
2725 status.push_back(summary.front().second);
2727 f->open_object_section("item");
2728 f->dump_stream("severity") << summary.front().first;
2729 f->dump_string("summary", summary.front().second);
2732 summary.pop_front();
2740 status.push_front(fss.str());
2742 f->dump_stream("overall_status") << overall;
2745 f->open_array_section("detail");
2746 while (!detail.empty()) {
2748 f->dump_string("item", detail.front().second);
2749 else if (detailbl != NULL) {
2750 detailbl->append(detail.front().second);
2751 detailbl->append('\n');
2764 void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
2767 f->open_object_section("status");
2770 f->dump_stream("fsid") << monmap->get_fsid();
2771 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2772 get_health_status(false, f, nullptr);
2774 list<string> health_str;
2775 get_health(health_str, nullptr, f);
2777 f->dump_unsigned("election_epoch", get_epoch());
2779 f->open_array_section("quorum");
2780 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2781 f->dump_int("rank", *p);
2783 f->open_array_section("quorum_names");
2784 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p)
2785 f->dump_string("id", monmap->get_name(*p));
2788 f->open_object_section("monmap");
2791 f->open_object_section("osdmap");
2792 osdmon()->osdmap.print_summary(f, cout, string(12, ' '));
2794 f->open_object_section("pgmap");
2795 pgservice->print_summary(f, NULL);
2797 f->open_object_section("fsmap");
2798 mdsmon()->get_fsmap().print_summary(f, NULL);
2800 f->open_object_section("mgrmap");
2801 mgrmon()->get_map().print_summary(f, nullptr);
2804 f->dump_object("servicemap", mgrstatmon()->get_service_map());
2807 ss << " cluster:\n";
2808 ss << " id: " << monmap->get_fsid() << "\n";
2811 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
2812 get_health_status(false, nullptr, &health,
2816 get_health(ls, NULL, f);
2817 health = joinify(ls.begin(), ls.end(),
2820 ss << " health: " << health << "\n";
2822 ss << "\n \n services:\n";
2825 auto& service_map = mgrstatmon()->get_service_map();
2826 for (auto& p : service_map.services) {
2827 maxlen = std::max(maxlen, p.first.size());
2829 string spacing(maxlen - 3, ' ');
2830 const auto quorum_names = get_quorum_names();
2831 const auto mon_count = monmap->mon_info.size();
2832 ss << " mon: " << spacing << mon_count << " daemons, quorum "
2834 if (quorum_names.size() != mon_count) {
2835 std::list<std::string> out_of_q;
2836 for (size_t i = 0; i < monmap->ranks.size(); ++i) {
2837 if (quorum.count(i) == 0) {
2838 out_of_q.push_back(monmap->ranks[i]);
2841 ss << ", out of quorum: " << joinify(out_of_q.begin(),
2842 out_of_q.end(), std::string(", "));
2845 if (mgrmon()->in_use()) {
2846 ss << " mgr: " << spacing;
2847 mgrmon()->get_map().print_summary(nullptr, &ss);
2850 if (mdsmon()->get_fsmap().filesystem_count() > 0) {
2851 ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n";
2853 ss << " osd: " << spacing;
2854 osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' '));
2856 for (auto& p : service_map.services) {
2857 ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ')
2858 << p.second.get_summary() << "\n";
2862 ss << "\n \n data:\n";
2863 pgservice->print_summary(NULL, &ss);
2868 void Monitor::_generate_command_map(map<string,cmd_vartype>& cmdmap,
2869 map<string,string> ¶m_str_map)
2871 for (map<string,cmd_vartype>::const_iterator p = cmdmap.begin();
2872 p != cmdmap.end(); ++p) {
2873 if (p->first == "prefix")
2875 if (p->first == "caps") {
2877 if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) &&
2878 cv.size() % 2 == 0) {
2879 for (unsigned i = 0; i < cv.size(); i += 2) {
2880 string k = string("caps_") + cv[i];
2881 param_str_map[k] = cv[i + 1];
2886 param_str_map[p->first] = cmd_vartype_stringify(p->second);
2890 const MonCommand *Monitor::_get_moncommand(
2891 const string &cmd_prefix,
2892 const vector<MonCommand>& cmds)
2894 for (auto& c : cmds) {
2895 if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) {
2902 bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix,
2903 const map<string,cmd_vartype>& cmdmap,
2904 const map<string,string>& param_str_map,
2905 const MonCommand *this_cmd) {
2907 bool cmd_r = this_cmd->requires_perm('r');
2908 bool cmd_w = this_cmd->requires_perm('w');
2909 bool cmd_x = this_cmd->requires_perm('x');
2911 bool capable = s->caps.is_capable(
2913 CEPH_ENTITY_TYPE_MON,
2915 module, prefix, param_str_map,
2916 cmd_r, cmd_w, cmd_x);
2918 dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl;
2922 void Monitor::format_command_descriptions(const std::vector<MonCommand> &commands,
2928 f->open_object_section("command_descriptions");
2929 for (const auto &cmd : commands) {
2930 unsigned flags = cmd.flags;
2931 if (hide_mgr_flag) {
2932 flags &= ~MonCommand::FLAG_MGR;
2934 ostringstream secname;
2935 secname << "cmd" << setfill('0') << std::setw(3) << cmdnum;
2936 dump_cmddesc_to_json(f, secname.str(),
2937 cmd.cmdstring, cmd.helpstring, cmd.module,
2938 cmd.req_perms, cmd.availability, flags);
2941 f->close_section(); // command_descriptions
2946 bool Monitor::is_keyring_required()
2948 string auth_cluster_required = g_conf->auth_supported.empty() ?
2949 g_conf->auth_cluster_required : g_conf->auth_supported;
2950 string auth_service_required = g_conf->auth_supported.empty() ?
2951 g_conf->auth_service_required : g_conf->auth_supported;
2953 return auth_service_required == "cephx" ||
2954 auth_cluster_required == "cephx";
2957 struct C_MgrProxyCommand : public Context {
2963 C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s)
2964 : mon(mon), op(op), size(s) { }
2965 void finish(int r) {
2966 Mutex::Locker l(mon->lock);
2967 mon->mgr_proxy_bytes -= size;
2968 mon->reply_command(op, r, outs, outbl, 0);
2972 void Monitor::handle_command(MonOpRequestRef op)
2974 assert(op->is_type_command());
2975 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
2976 if (m->fsid != monmap->fsid) {
2977 dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl;
2978 reply_command(op, -EPERM, "wrong fsid", 0);
2982 MonSession *session = static_cast<MonSession *>(
2983 m->get_connection()->get_priv());
2985 dout(5) << __func__ << " dropping stray message " << *m << dendl;
2988 BOOST_SCOPE_EXIT_ALL(=) {
2992 if (m->cmd.empty()) {
2993 string rs = "No command supplied";
2994 reply_command(op, -EINVAL, rs, 0);
2999 vector<string> fullcmd;
3000 map<string, cmd_vartype> cmdmap;
3001 stringstream ss, ds;
3005 rs = "unrecognized command";
3007 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
3008 // ss has reason for failure
3011 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3012 reply_command(op, r, rs, 0);
3016 // check return value. If no prefix parameter provided,
3017 // return value will be false, then return error info.
3018 if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) {
3019 reply_command(op, -EINVAL, "command prefix not found", 0);
3023 // check prefix is empty
3024 if (prefix.empty()) {
3025 reply_command(op, -EINVAL, "command prefix must not be empty", 0);
3029 if (prefix == "get_command_descriptions") {
3031 Formatter *f = Formatter::create("json");
3032 // hide mgr commands until luminous upgrade is complete
3033 bool hide_mgr_flag =
3034 osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS;
3036 std::vector<MonCommand> commands;
3038 // only include mgr commands once all mons are upgrade (and we've dropped
3039 // the hard-coded PGMonitor commands)
3040 if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) {
3041 commands = static_cast<MgrMonitor*>(
3042 paxos_service[PAXOS_MGR])->get_command_descs();
3045 for (auto& c : leader_mon_commands) {
3046 commands.push_back(c);
3049 format_command_descriptions(commands, f, &rdata, hide_mgr_flag);
3051 reply_command(op, 0, "", rdata, 0);
3058 dout(0) << "handle_command " << *m << dendl;
3061 cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain"));
3062 boost::scoped_ptr<Formatter> f(Formatter::create(format));
3064 get_str_vec(prefix, fullcmd);
3066 // make sure fullcmd is not empty.
3067 // invalid prefix will cause empty vector fullcmd.
3068 // such as, prefix=";,,;"
3069 if (fullcmd.empty()) {
3070 reply_command(op, -EINVAL, "command requires a prefix to be valid", 0);
3074 module = fullcmd[0];
3076 // validate command is in leader map
3078 const MonCommand *leader_cmd;
3079 const auto& mgr_cmds = mgrmon()->get_command_descs();
3080 const MonCommand *mgr_cmd = nullptr;
3081 if (!mgr_cmds.empty()) {
3082 mgr_cmd = _get_moncommand(prefix, mgr_cmds);
3084 leader_cmd = _get_moncommand(prefix, leader_mon_commands);
3086 leader_cmd = mgr_cmd;
3088 reply_command(op, -EINVAL, "command not known", 0);
3092 // validate command is in our map & matches, or forward if it is allowed
3093 const MonCommand *mon_cmd = _get_moncommand(
3095 get_local_commands(quorum_mon_features));
3101 if (leader_cmd->is_noforward()) {
3102 reply_command(op, -EINVAL,
3103 "command not locally supported and not allowed to forward",
3107 dout(10) << "Command not locally supported, forwarding request "
3109 forward_request_leader(op);
3111 } else if (!mon_cmd->is_compat(leader_cmd)) {
3112 if (mon_cmd->is_noforward()) {
3113 reply_command(op, -EINVAL,
3114 "command not compatible with leader and not allowed to forward",
3118 dout(10) << "Command not compatible with leader, forwarding request "
3120 forward_request_leader(op);
3125 if (mon_cmd->is_obsolete() ||
3126 (cct->_conf->mon_debug_deprecated_as_obsolete
3127 && mon_cmd->is_deprecated())) {
3128 reply_command(op, -ENOTSUP,
3129 "command is obsolete; please check usage and/or man page",
3134 if (session->proxy_con && mon_cmd->is_noforward()) {
3135 dout(10) << "Got forward for noforward command " << m << dendl;
3136 reply_command(op, -EINVAL, "forward for noforward command", rdata, 0);
3140 /* what we perceive as being the service the command falls under */
3141 string service(mon_cmd->module);
3143 dout(25) << __func__ << " prefix='" << prefix
3144 << "' module='" << module
3145 << "' service='" << service << "'" << dendl;
3148 (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x'));
3150 // validate user's permissions for requested command
3151 map<string,string> param_str_map;
3152 _generate_command_map(cmdmap, param_str_map);
3153 if (!_allowed_command(session, service, prefix, cmdmap,
3154 param_str_map, mon_cmd)) {
3155 dout(1) << __func__ << " access denied" << dendl;
3156 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3157 << "from='" << session->inst << "' "
3158 << "entity='" << session->entity_name << "' "
3159 << "cmd=" << m->cmd << ": access denied";
3160 reply_command(op, -EACCES, "access denied", 0);
3164 (cmd_is_rw ? audit_clog->info() : audit_clog->debug())
3165 << "from='" << session->inst << "' "
3166 << "entity='" << session->entity_name << "' "
3167 << "cmd=" << m->cmd << ": dispatch";
3169 if (mon_cmd->is_mgr() &&
3170 osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3171 const auto& hdr = m->get_header();
3172 uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len;
3173 uint64_t max = g_conf->get_val<uint64_t>("mon_client_bytes")
3174 * g_conf->get_val<double>("mon_mgr_proxy_client_bytes_ratio");
3175 if (mgr_proxy_bytes + size > max) {
3176 dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes
3177 << " + " << size << " > max " << max << dendl;
3178 reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0);
3181 mgr_proxy_bytes += size;
3182 dout(10) << __func__ << " proxying mgr command (+" << size
3183 << " -> " << mgr_proxy_bytes << ")" << dendl;
3184 C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size);
3185 mgr_client.start_command(m->cmd,
3189 new C_OnFinisher(fin, &finisher));
3193 if ((module == "mds" || module == "fs") &&
3194 prefix != "fs authorize") {
3195 mdsmon()->dispatch(op);
3198 if ((module == "osd" || prefix == "pg map") &&
3199 prefix != "osd last-stat-seq") {
3200 osdmon()->dispatch(op);
3204 if (module == "pg") {
3205 pgmon()->dispatch(op);
3208 if (module == "mon" &&
3209 /* Let the Monitor class handle the following commands:
3214 prefix != "mon compact" &&
3215 prefix != "mon scrub" &&
3216 prefix != "mon sync force" &&
3217 prefix != "mon metadata" &&
3218 prefix != "mon versions" &&
3219 prefix != "mon count-metadata") {
3220 monmon()->dispatch(op);
3223 if (module == "auth" || prefix == "fs authorize") {
3224 authmon()->dispatch(op);
3227 if (module == "log") {
3228 logmon()->dispatch(op);
3232 if (module == "config-key") {
3233 config_key_service->dispatch(op);
3237 if (module == "mgr") {
3238 mgrmon()->dispatch(op);
3242 if (prefix == "fsid") {
3244 f->open_object_section("fsid");
3245 f->dump_stream("fsid") << monmap->fsid;
3252 reply_command(op, 0, "", rdata, 0);
3256 if (prefix == "scrub" || prefix == "mon scrub") {
3257 wait_for_paxos_write();
3259 int r = scrub_start();
3260 reply_command(op, r, "", rdata, 0);
3261 } else if (is_peon()) {
3262 forward_request_leader(op);
3264 reply_command(op, -EAGAIN, "no quorum", rdata, 0);
3269 if (prefix == "compact" || prefix == "mon compact") {
3270 dout(1) << "triggering manual compaction" << dendl;
3271 utime_t start = ceph_clock_now();
3273 utime_t end = ceph_clock_now();
3275 dout(1) << "finished manual compaction in " << end << " seconds" << dendl;
3277 oss << "compacted " << g_conf->get_val<std::string>("mon_keyvaluedb") << " in " << end << " seconds";
3281 else if (prefix == "injectargs") {
3282 vector<string> injected_args;
3283 cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args);
3284 if (!injected_args.empty()) {
3285 dout(0) << "parsing injected options '" << injected_args << "'" << dendl;
3287 r = g_conf->injectargs(str_join(injected_args, " "), &oss);
3288 ss << "injectargs:" << oss.str();
3292 rs = "must supply options to be parsed in a single string";
3295 } else if (prefix == "time-sync-status") {
3297 f.reset(Formatter::create("json-pretty"));
3298 f->open_object_section("time_sync");
3299 if (!timecheck_skews.empty()) {
3300 f->open_object_section("time_skew_status");
3301 for (auto& i : timecheck_skews) {
3302 entity_inst_t inst = i.first;
3303 double skew = i.second;
3304 double latency = timecheck_latencies[inst];
3305 string name = monmap->get_name(inst.addr);
3307 health_status_t tcstatus = timecheck_status(tcss, skew, latency);
3308 f->open_object_section(name.c_str());
3309 f->dump_float("skew", skew);
3310 f->dump_float("latency", latency);
3311 f->dump_stream("health") << tcstatus;
3312 if (tcstatus != HEALTH_OK) {
3313 f->dump_stream("details") << tcss.str();
3319 f->open_object_section("timechecks");
3320 f->dump_unsigned("epoch", get_epoch());
3321 f->dump_int("round", timecheck_round);
3322 f->dump_stream("round_status") << ((timecheck_round%2) ?
3323 "on-going" : "finished");
3329 } else if (prefix == "config set") {
3331 cmd_getval(cct, cmdmap, "key", key);
3333 cmd_getval(cct, cmdmap, "value", val);
3334 r = g_conf->set_val(key, val, true, &ss);
3336 g_conf->apply_changes(nullptr);
3340 } else if (prefix == "status" ||
3341 prefix == "health" ||
3344 cmd_getval(g_ceph_context, cmdmap, "detail", detail);
3346 if (prefix == "status") {
3347 // get_cluster_status handles f == NULL
3348 get_cluster_status(ds, f.get());
3355 } else if (prefix == "health") {
3356 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3358 get_health_status(detail == "detail", f.get(), f ? nullptr : &plain);
3362 rdata.append(plain);
3365 list<string> health_str;
3366 get_health(health_str, detail == "detail" ? &rdata : NULL, f.get());
3371 assert(!health_str.empty());
3372 ds << health_str.front();
3373 health_str.pop_front();
3374 if (!health_str.empty()) {
3376 ds << joinify(health_str.begin(), health_str.end(), string("; "));
3381 if (detail == "detail")
3385 } else if (prefix == "df") {
3386 bool verbose = (detail == "detail");
3388 f->open_object_section("stats");
3390 pgservice->dump_fs_stats(&ds, f.get(), verbose);
3393 pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose);
3401 assert(0 == "We should never get here!");
3407 } else if (prefix == "report") {
3409 // this must be formatted, in its current form
3411 f.reset(Formatter::create("json-pretty"));
3412 f->open_object_section("report");
3413 f->dump_stream("cluster_fingerprint") << fingerprint;
3414 f->dump_string("version", ceph_version_to_str());
3415 f->dump_string("commit", git_version_to_str());
3416 f->dump_stream("timestamp") << ceph_clock_now();
3418 vector<string> tagsvec;
3419 cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec);
3420 string tagstr = str_join(tagsvec, " ");
3421 if (!tagstr.empty())
3422 tagstr = tagstr.substr(0, tagstr.find_last_of(' '));
3423 f->dump_string("tag", tagstr);
3425 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
3426 get_health_status(true, f.get(), nullptr);
3428 list<string> health_str;
3429 get_health(health_str, nullptr, f.get());
3432 monmon()->dump_info(f.get());
3433 osdmon()->dump_info(f.get());
3434 mdsmon()->dump_info(f.get());
3435 authmon()->dump_info(f.get());
3436 pgservice->dump_info(f.get());
3438 paxos->dump_info(f.get());
3444 ss2 << "report " << rdata.crc32c(CEPH_MON_PORT);
3447 } else if (prefix == "osd last-stat-seq") {
3449 cmd_getval(g_ceph_context, cmdmap, "id", osd);
3450 uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd);
3452 f->dump_unsigned("seq", seq);
3460 } else if (prefix == "node ls") {
3461 string node_type("all");
3462 cmd_getval(g_ceph_context, cmdmap, "type", node_type);
3464 f.reset(Formatter::create("json-pretty"));
3465 if (node_type == "all") {
3466 f->open_object_section("nodes");
3467 print_nodes(f.get(), ds);
3468 osdmon()->print_nodes(f.get());
3469 mdsmon()->print_nodes(f.get());
3471 } else if (node_type == "mon") {
3472 print_nodes(f.get(), ds);
3473 } else if (node_type == "osd") {
3474 osdmon()->print_nodes(f.get());
3475 } else if (node_type == "mds") {
3476 mdsmon()->print_nodes(f.get());
3482 } else if (prefix == "features") {
3483 if (!is_leader() && !is_peon()) {
3484 dout(10) << " waiting for quorum" << dendl;
3485 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3489 forward_request_leader(op);
3493 f.reset(Formatter::create("json-pretty"));
3495 get_combined_feature_map(&fm);
3496 f->dump_object("features", fm);
3500 } else if (prefix == "mon metadata") {
3502 f.reset(Formatter::create("json-pretty"));
3505 bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name);
3507 // Dump a single mon's metadata
3508 int mon = monmap->get_rank(name);
3510 rs = "requested mon not found";
3514 f->open_object_section("mon_metadata");
3515 r = get_mon_metadata(mon, f.get(), ds);
3518 // Dump all mons' metadata
3520 f->open_array_section("mon_metadata");
3521 for (unsigned int rank = 0; rank < monmap->size(); ++rank) {
3522 std::ostringstream get_err;
3523 f->open_object_section("mon");
3524 f->dump_string("name", monmap->get_name(rank));
3525 r = get_mon_metadata(rank, f.get(), get_err);
3527 if (r == -ENOENT || r == -EINVAL) {
3528 dout(1) << get_err.str() << dendl;
3529 // Drop error, list what metadata we do have
3531 } else if (r != 0) {
3532 derr << "Unexpected error from get_mon_metadata: "
3533 << cpp_strerror(r) << dendl;
3534 ds << get_err.str();
3544 } else if (prefix == "mon versions") {
3546 f.reset(Formatter::create("json-pretty"));
3547 count_metadata("ceph_version", f.get());
3552 } else if (prefix == "mon count-metadata") {
3554 f.reset(Formatter::create("json-pretty"));
3556 cmd_getval(g_ceph_context, cmdmap, "property", field);
3557 count_metadata(field, f.get());
3562 } else if (prefix == "quorum_status") {
3563 // make sure our map is readable and up to date
3564 if (!is_leader() && !is_peon()) {
3565 dout(10) << " waiting for quorum" << dendl;
3566 waitfor_quorum.push_back(new C_RetryMessage(this, op));
3569 _quorum_status(f.get(), ds);
3573 } else if (prefix == "mon_status") {
3574 get_mon_status(f.get(), ds);
3580 } else if (prefix == "sync force" ||
3581 prefix == "mon sync force") {
3582 string validate1, validate2;
3583 cmd_getval(g_ceph_context, cmdmap, "validate1", validate1);
3584 cmd_getval(g_ceph_context, cmdmap, "validate2", validate2);
3585 if (validate1 != "--yes-i-really-mean-it" ||
3586 validate2 != "--i-know-what-i-am-doing") {
3588 rs = "are you SURE? this will mean the monitor store will be "
3589 "erased. pass '--yes-i-really-mean-it "
3590 "--i-know-what-i-am-doing' if you really do.";
3593 sync_force(f.get(), ds);
3596 } else if (prefix == "heap") {
3597 if (!ceph_using_tcmalloc())
3598 rs = "tcmalloc not enabled, can't use heap profiler commands\n";
3601 cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd);
3602 // XXX 1-element vector, change at callee or make vector here?
3603 vector<string> heapcmd_vec;
3604 get_str_vec(heapcmd, heapcmd_vec);
3605 ceph_heap_profiler_handle_command(heapcmd_vec, ds);
3610 } else if (prefix == "quorum") {
3612 cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd);
3613 if (quorumcmd == "exit") {
3615 elector.stop_participating();
3616 rs = "stopped responding to quorum, initiated new election";
3618 } else if (quorumcmd == "enter") {
3619 elector.start_participating();
3621 rs = "started responding to quorum, initiated new election";
3624 rs = "needs a valid 'quorum' command";
3627 } else if (prefix == "version") {
3629 f->open_object_section("version");
3630 f->dump_string("version", pretty_version_to_str());
3634 ds << pretty_version_to_str();
3639 } else if (prefix == "versions") {
3641 f.reset(Formatter::create("json-pretty"));
3642 map<string,int> overall;
3643 f->open_object_section("version");
3644 map<string,int> mon, mgr, osd, mds;
3646 count_metadata("ceph_version", &mon);
3647 f->open_object_section("mon");
3648 for (auto& p : mon) {
3649 f->dump_int(p.first.c_str(), p.second);
3650 overall[p.first] += p.second;
3654 mgrmon()->count_metadata("ceph_version", &mgr);
3655 f->open_object_section("mgr");
3656 for (auto& p : mgr) {
3657 f->dump_int(p.first.c_str(), p.second);
3658 overall[p.first] += p.second;
3662 osdmon()->count_metadata("ceph_version", &osd);
3663 f->open_object_section("osd");
3664 for (auto& p : osd) {
3665 f->dump_int(p.first.c_str(), p.second);
3666 overall[p.first] += p.second;
3670 mdsmon()->count_metadata("ceph_version", &mds);
3671 f->open_object_section("mds");
3672 for (auto& p : mds) {
3673 f->dump_int(p.first.c_str(), p.second);
3674 overall[p.first] += p.second;
3678 for (auto& p : mgrstatmon()->get_service_map().services) {
3679 f->open_object_section(p.first.c_str());
3681 p.second.count_metadata("ceph_version", &m);
3683 f->dump_int(q.first.c_str(), q.second);
3684 overall[q.first] += q.second;
3689 f->open_object_section("overall");
3690 for (auto& p : overall) {
3691 f->dump_int(p.first.c_str(), p.second);
3701 if (!m->get_source().is_mon()) // don't reply to mon->mon commands
3702 reply_command(op, r, rs, rdata, 0);
3705 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version)
3708 reply_command(op, rc, rs, rdata, version);
3711 void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs,
3712 bufferlist& rdata, version_t version)
3714 MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
3715 assert(m->get_type() == MSG_MON_COMMAND);
3716 MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version);
3717 reply->set_tid(m->get_tid());
3718 reply->set_data(rdata);
3719 send_reply(op, reply);
3723 // ------------------------
3724 // request/reply routing
3726 // a client/mds/osd will connect to a random monitor. we need to forward any
3727 // messages requiring state updates to the leader, and then route any replies
3728 // back via the correct monitor and back to them. (the monitor will not
3729 // initiate any connections.)
3731 void Monitor::forward_request_leader(MonOpRequestRef op)
3733 op->mark_event(__func__);
3735 int mon = get_leader();
3736 MonSession *session = op->get_session();
3737 PaxosServiceMessage *req = op->get_req<PaxosServiceMessage>();
3739 if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) {
3740 dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl;
3741 } else if (session->proxy_con) {
3742 dout(10) << "forward_request won't double fwd request " << *req << dendl;
3743 } else if (!session->closed) {
3744 RoutedRequest *rr = new RoutedRequest;
3745 rr->tid = ++routed_request_tid;
3746 rr->client_inst = req->get_source_inst();
3747 rr->con = req->get_connection();
3748 rr->con_features = rr->con->get_features();
3749 encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
3750 rr->session = static_cast<MonSession *>(session->get());
3752 routed_requests[rr->tid] = rr;
3753 session->routed_request_tids.insert(rr->tid);
3755 dout(10) << "forward_request " << rr->tid << " request " << *req
3756 << " features " << rr->con_features << dendl;
3758 MForward *forward = new MForward(rr->tid,
3762 forward->set_priority(req->get_priority());
3763 if (session->auth_handler) {
3764 forward->entity_name = session->entity_name;
3765 } else if (req->get_source().is_mon()) {
3766 forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON);
3768 messenger->send_message(forward, monmap->get_inst(mon));
3769 op->mark_forwarded();
3770 assert(op->get_req()->get_type() != 0);
3772 dout(10) << "forward_request no session for request " << *req << dendl;
3776 // fake connection attached to forwarded messages
3777 struct AnonConnection : public Connection {
3778 explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
3780 int send_message(Message *m) override {
3781 assert(!"send_message on anonymous connection");
3783 void send_keepalive() override {
3784 assert(!"send_keepalive on anonymous connection");
3786 void mark_down() override {
3789 void mark_disposable() override {
3792 bool is_connected() override { return false; }
3795 //extract the original message and put it into the regular dispatch function
3796 void Monitor::handle_forward(MonOpRequestRef op)
3798 MForward *m = static_cast<MForward*>(op->get_req());
3799 dout(10) << "received forwarded message from " << m->client
3800 << " via " << m->get_source_inst() << dendl;
3801 MonSession *session = op->get_session();
3804 if (!session->is_capable("mon", MON_CAP_X)) {
3805 dout(0) << "forward from entity with insufficient caps! "
3806 << session->caps << dendl;
3808 // see PaxosService::dispatch(); we rely on this being anon
3809 // (c->msgr == NULL)
3810 PaxosServiceMessage *req = m->claim_message();
3811 assert(req != NULL);
3813 ConnectionRef c(new AnonConnection(cct));
3814 MonSession *s = new MonSession(req->get_source_inst(),
3815 static_cast<Connection*>(c.get()));
3816 c->set_priv(s->get());
3817 c->set_peer_addr(m->client.addr);
3818 c->set_peer_type(m->client.name.type());
3819 c->set_features(m->con_features);
3821 s->caps = m->client_caps;
3822 dout(10) << " caps are " << s->caps << dendl;
3823 s->entity_name = m->entity_name;
3824 dout(10) << " entity name '" << s->entity_name << "' type "
3825 << s->entity_name.get_type() << dendl;
3826 s->proxy_con = m->get_connection();
3827 s->proxy_tid = m->tid;
3829 req->set_connection(c);
3831 // not super accurate, but better than nothing.
3832 req->set_recv_stamp(m->get_recv_stamp());
3835 * note which election epoch this is; we will drop the message if
3836 * there is a future election since our peers will resend routed
3837 * requests in that case.
3839 req->rx_election_epoch = get_epoch();
3841 /* Because this is a special fake connection, we need to break
3842 the ref loop between Connection and MonSession differently
3843 than we normally do. Here, the Message refers to the Connection
3844 which refers to the Session, and nobody else refers to the Connection
3845 or the Session. And due to the special nature of this message,
3846 nobody refers to the Connection via the Session. So, clear out that
3847 half of the ref loop.*/
3850 dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
3857 void Monitor::try_send_message(Message *m, const entity_inst_t& to)
3859 dout(10) << "try_send_message " << *m << " to " << to << dendl;
3862 encode_message(m, quorum_con_features, bl);
3864 messenger->send_message(m, to);
3866 for (int i=0; i<(int)monmap->size(); i++) {
3868 messenger->send_message(new MRoute(bl, to), monmap->get_inst(i));
3872 void Monitor::send_reply(MonOpRequestRef op, Message *reply)
3874 op->mark_event(__func__);
3876 MonSession *session = op->get_session();
3878 Message *req = op->get_req();
3879 ConnectionRef con = op->get_connection();
3881 reply->set_cct(g_ceph_context);
3882 dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl;
3885 dout(2) << "send_reply no connection, dropping reply " << *reply
3886 << " to " << req << " " << *req << dendl;
3888 op->mark_event("reply: no connection");
3892 if (!session->con && !session->proxy_con) {
3893 dout(2) << "send_reply no connection, dropping reply " << *reply
3894 << " to " << req << " " << *req << dendl;
3896 op->mark_event("reply: no connection");
3900 if (session->proxy_con) {
3901 dout(15) << "send_reply routing reply to " << con->get_peer_addr()
3902 << " via " << session->proxy_con->get_peer_addr()
3903 << " for request " << *req << dendl;
3904 session->proxy_con->send_message(new MRoute(session->proxy_tid, reply));
3905 op->mark_event("reply: send routed request");
3907 session->con->send_message(reply);
3908 op->mark_event("reply: send");
3912 void Monitor::no_reply(MonOpRequestRef op)
3914 MonSession *session = op->get_session();
3915 Message *req = op->get_req();
3917 if (session->proxy_con) {
3918 dout(10) << "no_reply to " << req->get_source_inst()
3919 << " via " << session->proxy_con->get_peer_addr()
3920 << " for request " << *req << dendl;
3921 session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL));
3922 op->mark_event("no_reply: send routed request");
3924 dout(10) << "no_reply to " << req->get_source_inst()
3925 << " " << *req << dendl;
3926 op->mark_event("no_reply");
3930 void Monitor::handle_route(MonOpRequestRef op)
3932 MRoute *m = static_cast<MRoute*>(op->get_req());
3933 MonSession *session = op->get_session();
3935 if (!session->is_capable("mon", MON_CAP_X)) {
3936 dout(0) << "MRoute received from entity without appropriate perms! "
3941 dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl;
3943 dout(10) << "handle_route null to " << m->dest << dendl;
3946 if (m->session_mon_tid) {
3947 if (routed_requests.count(m->session_mon_tid)) {
3948 RoutedRequest *rr = routed_requests[m->session_mon_tid];
3950 // reset payload, in case encoding is dependent on target features
3952 m->msg->clear_payload();
3953 rr->con->send_message(m->msg);
3956 if (m->send_osdmap_first) {
3957 dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl;
3958 osdmon()->send_incremental(m->send_osdmap_first, rr->session,
3959 true, MonOpRequestRef());
3961 assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid));
3962 routed_requests.erase(m->session_mon_tid);
3963 rr->session->routed_request_tids.erase(m->session_mon_tid);
3966 dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl;
3969 dout(10) << " not a routed request, trying to send anyway" << dendl;
3971 messenger->send_message(m->msg, m->dest);
3977 void Monitor::resend_routed_requests()
3979 dout(10) << "resend_routed_requests" << dendl;
3980 int mon = get_leader();
3981 list<Context*> retry;
3982 for (map<uint64_t, RoutedRequest*>::iterator p = routed_requests.begin();
3983 p != routed_requests.end();
3985 RoutedRequest *rr = p->second;
3988 dout(10) << " requeue for self tid " << rr->tid << dendl;
3989 rr->op->mark_event("retry routed request");
3990 retry.push_back(new C_RetryMessage(this, rr->op));
3992 assert(rr->session->routed_request_tids.count(p->first));
3993 rr->session->routed_request_tids.erase(p->first);
3997 bufferlist::iterator q = rr->request_bl.begin();
3998 PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
3999 rr->op->mark_event("resend forwarded message to leader");
4000 dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
4001 MForward *forward = new MForward(rr->tid, req, rr->con_features,
4003 req->put(); // forward takes its own ref; drop ours.
4004 forward->client = rr->client_inst;
4005 forward->set_priority(req->get_priority());
4006 messenger->send_message(forward, monmap->get_inst(mon));
4010 routed_requests.clear();
4011 finish_contexts(g_ceph_context, retry);
4015 void Monitor::remove_session(MonSession *s)
4017 dout(10) << "remove_session " << s << " " << s->inst
4018 << " features 0x" << std::hex << s->con_features << std::dec << dendl;
4021 for (set<uint64_t>::iterator p = s->routed_request_tids.begin();
4022 p != s->routed_request_tids.end();
4024 assert(routed_requests.count(*p));
4025 RoutedRequest *rr = routed_requests[*p];
4026 dout(10) << " dropping routed request " << rr->tid << dendl;
4028 routed_requests.erase(*p);
4030 s->routed_request_tids.clear();
4031 s->con->set_priv(NULL);
4032 session_map.remove_session(s);
4033 logger->set(l_mon_num_sessions, session_map.get_size());
4034 logger->inc(l_mon_session_rm);
4037 void Monitor::remove_all_sessions()
4039 Mutex::Locker l(session_map_lock);
4040 while (!session_map.sessions.empty()) {
4041 MonSession *s = session_map.sessions.front();
4044 logger->inc(l_mon_session_rm);
4047 logger->set(l_mon_num_sessions, session_map.get_size());
4050 void Monitor::send_command(const entity_inst_t& inst,
4051 const vector<string>& com)
4053 dout(10) << "send_command " << inst << "" << com << dendl;
4054 MMonCommand *c = new MMonCommand(monmap->fsid);
4056 try_send_message(c, inst);
4059 void Monitor::waitlist_or_zap_client(MonOpRequestRef op)
4062 * Wait list the new session until we're in the quorum, assuming it's
4064 * tick() will periodically send them back through so we can send
4065 * the client elsewhere if we don't think we're getting back in.
4067 * But we whitelist a few sorts of messages:
4068 * 1) Monitors can talk to us at any time, of course.
4069 * 2) auth messages. It's unlikely to go through much faster, but
4070 * it's possible we've just lost our quorum status and we want to take...
4071 * 3) command messages. We want to accept these under all possible
4074 Message *m = op->get_req();
4075 MonSession *s = op->get_session();
4076 ConnectionRef con = op->get_connection();
4077 utime_t too_old = ceph_clock_now();
4078 too_old -= g_ceph_context->_conf->mon_lease;
4079 if (m->get_recv_stamp() > too_old &&
4080 con->is_connected()) {
4081 dout(5) << "waitlisting message " << *m << dendl;
4082 maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op));
4083 op->mark_wait_for_quorum();
4085 dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl;
4087 // proxied sessions aren't registered and don't have a con; don't remove
4089 if (!s->proxy_con) {
4090 Mutex::Locker l(session_map_lock);
4097 void Monitor::_ms_dispatch(Message *m)
4099 if (is_shutdown()) {
4104 MonOpRequestRef op = op_tracker.create_request<MonOpRequest>(m);
4105 bool src_is_mon = op->is_src_mon();
4106 op->mark_event("mon:_ms_dispatch");
4107 MonSession *s = op->get_session();
4108 if (s && s->closed) {
4112 if (src_is_mon && s) {
4113 ConnectionRef con = m->get_connection();
4114 if (con->get_messenger() && con->get_features() != s->con_features) {
4115 // only update features if this is a non-anonymous connection
4116 dout(10) << __func__ << " feature change for " << m->get_source_inst()
4117 << " (was " << s->con_features
4118 << ", now " << con->get_features() << ")" << dendl;
4119 // connection features changed - recreate session.
4120 if (s->con && s->con != con) {
4121 dout(10) << __func__ << " connection for " << m->get_source_inst()
4122 << " changed from session; mark down and replace" << dendl;
4123 s->con->mark_down();
4125 if (s->item.is_on_list()) {
4126 // forwarded messages' sessions are not in the sessions map and
4127 // exist only while the op is being handled.
4136 // if the sender is not a monitor, make sure their first message for a
4137 // session is an MAuth. If it is not, assume it's a stray message,
4138 // and considering that we are creating a new session it is safe to
4139 // assume that the sender hasn't authenticated yet, so we have no way
4140 // of assessing whether we should handle it or not.
4141 if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH &&
4142 m->get_type() != CEPH_MSG_MON_GET_MAP &&
4143 m->get_type() != CEPH_MSG_PING)) {
4144 dout(1) << __func__ << " dropping stray message " << *m
4145 << " from " << m->get_source_inst() << dendl;
4149 ConnectionRef con = m->get_connection();
4151 Mutex::Locker l(session_map_lock);
4152 s = session_map.new_session(m->get_source_inst(), con.get());
4155 con->set_priv(s->get());
4156 dout(10) << __func__ << " new session " << s << " " << *s
4157 << " features 0x" << std::hex
4158 << s->con_features << std::dec << dendl;
4161 logger->set(l_mon_num_sessions, session_map.get_size());
4162 logger->inc(l_mon_session_add);
4165 // give it monitor caps; the peer type has been authenticated
4166 dout(5) << __func__ << " setting monitor caps on this connection" << dendl;
4167 if (!s->caps.is_allow_all()) // but no need to repeatedly copy
4168 s->caps = *mon_caps;
4172 dout(20) << __func__ << " existing session " << s << " for " << s->inst
4178 s->session_timeout = ceph_clock_now();
4179 s->session_timeout += g_conf->mon_session_timeout;
4181 if (s->auth_handler) {
4182 s->entity_name = s->auth_handler->get_entity_name();
4184 dout(20) << " caps " << s->caps.get_str() << dendl;
4186 if ((is_synchronizing() ||
4187 (s->global_id == 0 && !exited_quorum.is_zero())) &&
4189 m->get_type() != CEPH_MSG_PING) {
4190 waitlist_or_zap_client(op);
4197 void Monitor::dispatch_op(MonOpRequestRef op)
4199 op->mark_event("mon:dispatch_op");
4200 MonSession *s = op->get_session();
4203 dout(10) << " session closed, dropping " << op->get_req() << dendl;
4207 /* we will consider the default type as being 'monitor' until proven wrong */
4208 op->set_type_monitor();
4209 /* deal with all messages that do not necessarily need caps */
4210 bool dealt_with = true;
4211 switch (op->get_req()->get_type()) {
4213 case MSG_MON_GLOBAL_ID:
4215 op->set_type_service();
4216 /* no need to check caps here */
4217 paxos_service[PAXOS_AUTH]->dispatch(op);
4224 /* MMonGetMap may be used by clients to obtain a monmap *before*
4225 * authenticating with the monitor. We need to handle these without
4226 * checking caps because, even on a cluster without cephx, we only set
4227 * session caps *after* the auth handshake. A good example of this
4228 * is when a client calls MonClient::get_monmap_privately(), which does
4229 * not authenticate when obtaining a monmap.
4231 case CEPH_MSG_MON_GET_MAP:
4232 handle_mon_get_map(op);
4235 case CEPH_MSG_MON_METADATA:
4236 return handle_mon_metadata(op);
4245 /* well, maybe the op belongs to a service... */
4246 op->set_type_service();
4247 /* deal with all messages which caps should be checked somewhere else */
4249 switch (op->get_req()->get_type()) {
4252 case CEPH_MSG_MON_GET_OSDMAP:
4253 case CEPH_MSG_POOLOP:
4254 case MSG_OSD_BEACON:
4255 case MSG_OSD_MARK_ME_DOWN:
4257 case MSG_OSD_FAILURE:
4260 case MSG_OSD_PGTEMP:
4261 case MSG_OSD_PG_CREATED:
4262 case MSG_REMOVE_SNAPS:
4263 paxos_service[PAXOS_OSDMAP]->dispatch(op);
4267 case MSG_MDS_BEACON:
4268 case MSG_MDS_OFFLOAD_TARGETS:
4269 paxos_service[PAXOS_MDSMAP]->dispatch(op);
4273 case MSG_MGR_BEACON:
4274 paxos_service[PAXOS_MGR]->dispatch(op);
4278 case CEPH_MSG_STATFS:
4279 // this is an ugly hack, sorry! force the version to 1 so that we do
4280 // not run afoul of the is_readable() paxos check. the client is going
4281 // by the pgmonitor version and the MgrStatMonitor version will lag behind
4282 // that until we complete the upgrade. The paxos ordering crap really
4283 // doesn't matter for statfs results, so just kludge around it here.
4284 if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) {
4285 ((MStatfs*)op->get_req())->version = 1;
4287 case MSG_MON_MGR_REPORT:
4288 case MSG_GETPOOLSTATS:
4289 paxos_service[PAXOS_MGRSTAT]->dispatch(op);
4294 paxos_service[PAXOS_PGMAP]->dispatch(op);
4299 paxos_service[PAXOS_LOG]->dispatch(op);
4302 // handle_command() does its own caps checking
4303 case MSG_MON_COMMAND:
4304 op->set_type_command();
4315 /* nop, looks like it's not a service message; revert back to monitor */
4316 op->set_type_monitor();
4318 /* messages we, the Monitor class, need to deal with
4319 * but may be sent by clients. */
4321 if (!op->get_session()->is_capable("mon", MON_CAP_R)) {
4322 dout(5) << __func__ << " " << op->get_req()->get_source_inst()
4323 << " not enough caps for " << *(op->get_req()) << " -- dropping"
4329 switch (op->get_req()->get_type()) {
4332 case CEPH_MSG_MON_GET_VERSION:
4333 handle_get_version(op);
4336 case CEPH_MSG_MON_SUBSCRIBE:
4337 /* FIXME: check what's being subscribed, filter accordingly */
4338 handle_subscribe(op);
4348 if (!op->is_src_mon()) {
4349 dout(1) << __func__ << " unexpected monitor message from"
4350 << " non-monitor entity " << op->get_req()->get_source_inst()
4351 << " " << *(op->get_req()) << " -- dropping" << dendl;
4355 /* messages that should only be sent by another monitor */
4357 switch (op->get_req()->get_type()) {
4367 // Sync (i.e., the new slurp, but on steroids)
4375 /* log acks are sent from a monitor we sent the MLog to, and are
4376 never sent by clients to us. */
4378 log_client.handle_log_ack((MLogAck*)op->get_req());
4383 op->set_type_service();
4384 paxos_service[PAXOS_MONMAP]->dispatch(op);
4390 op->set_type_paxos();
4391 MMonPaxos *pm = static_cast<MMonPaxos*>(op->get_req());
4392 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4397 if (state == STATE_SYNCHRONIZING) {
4398 // we are synchronizing. These messages would do us no
4399 // good, thus just drop them and ignore them.
4400 dout(10) << __func__ << " ignore paxos msg from "
4401 << pm->get_source_inst() << dendl;
4406 if (pm->epoch > get_epoch()) {
4410 if (pm->epoch != get_epoch()) {
4414 paxos->dispatch(op);
4419 case MSG_MON_ELECTION:
4420 op->set_type_election();
4421 //check privileges here for simplicity
4422 if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
4423 dout(0) << "MMonElection received from entity without enough caps!"
4424 << op->get_session()->caps << dendl;
4427 if (!is_probing() && !is_synchronizing()) {
4428 elector.dispatch(op);
4437 handle_timecheck(op);
4440 case MSG_MON_HEALTH:
4441 health_monitor->dispatch(op);
4444 case MSG_MON_HEALTH_CHECKS:
4445 op->set_type_service();
4446 paxos_service[PAXOS_HEALTH]->dispatch(op);
4454 dout(1) << "dropping unexpected " << *(op->get_req()) << dendl;
4463 void Monitor::handle_ping(MonOpRequestRef op)
4465 MPing *m = static_cast<MPing*>(op->get_req());
4466 dout(10) << __func__ << " " << *m << dendl;
4467 MPing *reply = new MPing;
4468 entity_inst_t inst = m->get_source_inst();
4470 boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
4471 f->open_object_section("pong");
4473 if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) {
4474 get_health_status(false, f.get(), nullptr);
4476 list<string> health_str;
4477 get_health(health_str, nullptr, f.get());
4482 get_mon_status(f.get(), ss);
4488 ::encode(ss.str(), payload);
4489 reply->set_payload(payload);
4490 dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
4491 messenger->send_message(reply, inst);
4494 void Monitor::timecheck_start()
4496 dout(10) << __func__ << dendl;
4497 timecheck_cleanup();
4498 timecheck_start_round();
4501 void Monitor::timecheck_finish()
4503 dout(10) << __func__ << dendl;
4504 timecheck_cleanup();
4507 void Monitor::timecheck_start_round()
4509 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4510 assert(is_leader());
4512 if (monmap->size() == 1) {
4513 assert(0 == "We are alone; this shouldn't have been scheduled!");
4517 if (timecheck_round % 2) {
4518 dout(10) << __func__ << " there's a timecheck going on" << dendl;
4519 utime_t curr_time = ceph_clock_now();
4520 double max = g_conf->mon_timecheck_interval*3;
4521 if (curr_time - timecheck_round_start < max) {
4522 dout(10) << __func__ << " keep current round going" << dendl;
4525 dout(10) << __func__
4526 << " finish current timecheck and start new" << dendl;
4527 timecheck_cancel_round();
4531 assert(timecheck_round % 2 == 0);
4534 timecheck_round_start = ceph_clock_now();
4535 dout(10) << __func__ << " new " << timecheck_round << dendl;
4539 dout(10) << __func__ << " setting up next event" << dendl;
4540 timecheck_reset_event();
4543 void Monitor::timecheck_finish_round(bool success)
4545 dout(10) << __func__ << " curr " << timecheck_round << dendl;
4546 assert(timecheck_round % 2);
4548 timecheck_round_start = utime_t();
4551 assert(timecheck_waiting.empty());
4552 assert(timecheck_acks == quorum.size());
4554 timecheck_check_skews();
4558 dout(10) << __func__ << " " << timecheck_waiting.size()
4559 << " peers still waiting:";
4560 for (map<entity_inst_t,utime_t>::iterator p = timecheck_waiting.begin();
4561 p != timecheck_waiting.end(); ++p) {
4562 *_dout << " " << p->first.name;
4565 timecheck_waiting.clear();
4567 dout(10) << __func__ << " finished to " << timecheck_round << dendl;
4570 void Monitor::timecheck_cancel_round()
4572 timecheck_finish_round(false);
4575 void Monitor::timecheck_cleanup()
4577 timecheck_round = 0;
4579 timecheck_round_start = utime_t();
4581 if (timecheck_event) {
4582 timer.cancel_event(timecheck_event);
4583 timecheck_event = NULL;
4585 timecheck_waiting.clear();
4586 timecheck_skews.clear();
4587 timecheck_latencies.clear();
4589 timecheck_rounds_since_clean = 0;
4592 void Monitor::timecheck_reset_event()
4594 if (timecheck_event) {
4595 timer.cancel_event(timecheck_event);
4596 timecheck_event = NULL;
4600 cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean;
4602 if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) {
4603 delay = cct->_conf->mon_timecheck_interval;
4606 dout(10) << __func__ << " delay " << delay
4607 << " rounds_since_clean " << timecheck_rounds_since_clean
4610 timecheck_event = timer.add_event_after(
4612 new C_MonContext(this, [this](int) {
4613 timecheck_start_round();
4617 void Monitor::timecheck_check_skews()
4619 dout(10) << __func__ << dendl;
4620 assert(is_leader());
4621 assert((timecheck_round % 2) == 0);
4622 if (monmap->size() == 1) {
4623 assert(0 == "We are alone; we shouldn't have gotten here!");
4626 assert(timecheck_latencies.size() == timecheck_skews.size());
4628 bool found_skew = false;
4629 for (map<entity_inst_t, double>::iterator p = timecheck_skews.begin();
4630 p != timecheck_skews.end(); ++p) {
4633 if (timecheck_has_skew(p->second, &abs_skew)) {
4634 dout(10) << __func__
4635 << " " << p->first << " skew " << abs_skew << dendl;
4641 ++timecheck_rounds_since_clean;
4642 timecheck_reset_event();
4643 } else if (timecheck_rounds_since_clean > 0) {
4645 << " no clock skews found after " << timecheck_rounds_since_clean
4646 << " rounds" << dendl;
4647 // make sure the skews are really gone and not just a transient success
4648 // this will run just once if not in the presence of skews again.
4649 timecheck_rounds_since_clean = 1;
4650 timecheck_reset_event();
4651 timecheck_rounds_since_clean = 0;
4656 void Monitor::timecheck_report()
4658 dout(10) << __func__ << dendl;
4659 assert(is_leader());
4660 assert((timecheck_round % 2) == 0);
4661 if (monmap->size() == 1) {
4662 assert(0 == "We are alone; we shouldn't have gotten here!");
4666 assert(timecheck_latencies.size() == timecheck_skews.size());
4667 bool do_output = true; // only output report once
4668 for (set<int>::iterator q = quorum.begin(); q != quorum.end(); ++q) {
4669 if (monmap->get_name(*q) == name)
4672 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT);
4673 m->epoch = get_epoch();
4674 m->round = timecheck_round;
4676 for (map<entity_inst_t, double>::iterator it = timecheck_skews.begin();
4677 it != timecheck_skews.end(); ++it) {
4678 double skew = it->second;
4679 double latency = timecheck_latencies[it->first];
4681 m->skews[it->first] = skew;
4682 m->latencies[it->first] = latency;
4685 dout(25) << __func__ << " " << it->first
4686 << " latency " << latency
4687 << " skew " << skew << dendl;
4691 entity_inst_t inst = monmap->get_inst(*q);
4692 dout(10) << __func__ << " send report to " << inst << dendl;
4693 messenger->send_message(m, inst);
4697 void Monitor::timecheck()
4699 dout(10) << __func__ << dendl;
4700 assert(is_leader());
4701 if (monmap->size() == 1) {
4702 assert(0 == "We are alone; we shouldn't have gotten here!");
4705 assert(timecheck_round % 2 != 0);
4707 timecheck_acks = 1; // we ack ourselves
4709 dout(10) << __func__ << " start timecheck epoch " << get_epoch()
4710 << " round " << timecheck_round << dendl;
4712 // we are at the eye of the storm; the point of reference
4713 timecheck_skews[messenger->get_myinst()] = 0.0;
4714 timecheck_latencies[messenger->get_myinst()] = 0.0;
4716 for (set<int>::iterator it = quorum.begin(); it != quorum.end(); ++it) {
4717 if (monmap->get_name(*it) == name)
4720 entity_inst_t inst = monmap->get_inst(*it);
4721 utime_t curr_time = ceph_clock_now();
4722 timecheck_waiting[inst] = curr_time;
4723 MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING);
4724 m->epoch = get_epoch();
4725 m->round = timecheck_round;
4726 dout(10) << __func__ << " send " << *m << " to " << inst << dendl;
4727 messenger->send_message(m, inst);
4731 health_status_t Monitor::timecheck_status(ostringstream &ss,
4732 const double skew_bound,
4733 const double latency)
4735 health_status_t status = HEALTH_OK;
4736 assert(latency >= 0);
4739 if (timecheck_has_skew(skew_bound, &abs_skew)) {
4740 status = HEALTH_WARN;
4741 ss << "clock skew " << abs_skew << "s"
4742 << " > max " << g_conf->mon_clock_drift_allowed << "s";
4748 void Monitor::handle_timecheck_leader(MonOpRequestRef op)
4750 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4751 dout(10) << __func__ << " " << *m << dendl;
4752 /* handles PONG's */
4753 assert(m->op == MTimeCheck::OP_PONG);
4755 entity_inst_t other = m->get_source_inst();
4756 if (m->epoch < get_epoch()) {
4757 dout(1) << __func__ << " got old timecheck epoch " << m->epoch
4758 << " from " << other
4759 << " curr " << get_epoch()
4760 << " -- severely lagged? discard" << dendl;
4763 assert(m->epoch == get_epoch());
4765 if (m->round < timecheck_round) {
4766 dout(1) << __func__ << " got old round " << m->round
4767 << " from " << other
4768 << " curr " << timecheck_round << " -- discard" << dendl;
4772 utime_t curr_time = ceph_clock_now();
4774 assert(timecheck_waiting.count(other) > 0);
4775 utime_t timecheck_sent = timecheck_waiting[other];
4776 timecheck_waiting.erase(other);
4777 if (curr_time < timecheck_sent) {
4778 // our clock was readjusted -- drop everything until it all makes sense.
4779 dout(1) << __func__ << " our clock was readjusted --"
4780 << " bump round and drop current check"
4782 timecheck_cancel_round();
4786 /* update peer latencies */
4787 double latency = (double)(curr_time - timecheck_sent);
4789 if (timecheck_latencies.count(other) == 0)
4790 timecheck_latencies[other] = latency;
4792 double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2));
4793 timecheck_latencies[other] = avg_latency;
4799 * some nasty thing goes on if we were to do 'a - b' between two utime_t,
4800 * and 'a' happens to be lower than 'b'; so we use double instead.
4802 * latency is always expected to be >= 0.
4804 * delta, the difference between theirs timestamp and ours, may either be
4805 * lower or higher than 0; will hardly ever be 0.
4807 * The absolute skew is the absolute delta minus the latency, which is
4808 * taken as a whole instead of an rtt given that there is some queueing
4809 * and dispatch times involved and it's hard to assess how long exactly
4810 * it took for the message to travel to the other side and be handled. So
4811 * we call it a bounded skew, the worst case scenario.
4815 * Given that the latency is always positive, we can establish that the
4816 * bounded skew will be:
4818 * 1. positive if the absolute delta is higher than the latency and
4820 * 2. negative if the absolute delta is higher than the latency and
4821 * delta is negative.
4822 * 3. zero if the absolute delta is lower than the latency.
4824 * On 3. we make a judgement call and treat the skew as non-existent.
4825 * This is because that, if the absolute delta is lower than the
4826 * latency, then the apparently existing skew is nothing more than a
4827 * side-effect of the high latency at work.
4829 * This may not be entirely true though, as a severely skewed clock
4830 * may be masked by an even higher latency, but with high latencies
4831 * we probably have worse issues to deal with than just skewed clocks.
4833 assert(latency >= 0);
4835 double delta = ((double) m->timestamp) - ((double) curr_time);
4836 double abs_delta = (delta > 0 ? delta : -delta);
4837 double skew_bound = abs_delta - latency;
4841 skew_bound = -skew_bound;
4844 health_status_t status = timecheck_status(ss, skew_bound, latency);
4845 clog->health(status) << other << " " << ss.str();
4847 dout(10) << __func__ << " from " << other << " ts " << m->timestamp
4848 << " delta " << delta << " skew_bound " << skew_bound
4849 << " latency " << latency << dendl;
4851 timecheck_skews[other] = skew_bound;
4854 if (timecheck_acks == quorum.size()) {
4855 dout(10) << __func__ << " got pongs from everybody ("
4856 << timecheck_acks << " total)" << dendl;
4857 assert(timecheck_skews.size() == timecheck_acks);
4858 assert(timecheck_waiting.empty());
4859 // everyone has acked, so bump the round to finish it.
4860 timecheck_finish_round();
4864 void Monitor::handle_timecheck_peon(MonOpRequestRef op)
4866 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4867 dout(10) << __func__ << " " << *m << dendl;
4870 assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT);
4872 if (m->epoch != get_epoch()) {
4873 dout(1) << __func__ << " got wrong epoch "
4874 << "(ours " << get_epoch()
4875 << " theirs: " << m->epoch << ") -- discarding" << dendl;
4879 if (m->round < timecheck_round) {
4880 dout(1) << __func__ << " got old round " << m->round
4881 << " current " << timecheck_round
4882 << " (epoch " << get_epoch() << ") -- discarding" << dendl;
4886 timecheck_round = m->round;
4888 if (m->op == MTimeCheck::OP_REPORT) {
4889 assert((timecheck_round % 2) == 0);
4890 timecheck_latencies.swap(m->latencies);
4891 timecheck_skews.swap(m->skews);
4895 assert((timecheck_round % 2) != 0);
4896 MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG);
4897 utime_t curr_time = ceph_clock_now();
4898 reply->timestamp = curr_time;
4899 reply->epoch = m->epoch;
4900 reply->round = m->round;
4901 dout(10) << __func__ << " send " << *m
4902 << " to " << m->get_source_inst() << dendl;
4903 m->get_connection()->send_message(reply);
4906 void Monitor::handle_timecheck(MonOpRequestRef op)
4908 MTimeCheck *m = static_cast<MTimeCheck*>(op->get_req());
4909 dout(10) << __func__ << " " << *m << dendl;
4912 if (m->op != MTimeCheck::OP_PONG) {
4913 dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl;
4915 handle_timecheck_leader(op);
4917 } else if (is_peon()) {
4918 if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) {
4919 dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl;
4921 handle_timecheck_peon(op);
4924 dout(1) << __func__ << " drop unexpected msg" << dendl;
4928 void Monitor::handle_subscribe(MonOpRequestRef op)
4930 MMonSubscribe *m = static_cast<MMonSubscribe*>(op->get_req());
4931 dout(10) << "handle_subscribe " << *m << dendl;
4935 MonSession *s = op->get_session();
4938 for (map<string,ceph_mon_subscribe_item>::iterator p = m->what.begin();
4941 // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer
4942 if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0)
4945 // remove conflicting subscribes
4946 if (logmon()->sub_name_to_id(p->first) >= 0) {
4947 for (map<string, Subscription*>::iterator it = s->sub_map.begin();
4948 it != s->sub_map.end(); ) {
4949 if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) {
4950 Mutex::Locker l(session_map_lock);
4951 session_map.remove_sub((it++)->second);
4959 Mutex::Locker l(session_map_lock);
4960 session_map.add_update_sub(s, p->first, p->second.start,
4961 p->second.flags & CEPH_SUBSCRIBE_ONETIME,
4962 m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP));
4965 if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) {
4966 dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl;
4967 if ((int)s->is_capable("mds", MON_CAP_R)) {
4968 Subscription *sub = s->sub_map[p->first];
4969 assert(sub != nullptr);
4970 mdsmon()->check_sub(sub);
4972 } else if (p->first == "osdmap") {
4973 if ((int)s->is_capable("osd", MON_CAP_R)) {
4974 if (s->osd_epoch > p->second.start) {
4975 // client needs earlier osdmaps on purpose, so reset the sent epoch
4978 osdmon()->check_osdmap_sub(s->sub_map["osdmap"]);
4980 } else if (p->first == "osd_pg_creates") {
4981 if ((int)s->is_capable("osd", MON_CAP_W)) {
4982 if (monmap->get_required_features().contains_all(
4983 ceph::features::mon::FEATURE_LUMINOUS)) {
4984 osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]);
4986 pgmon()->check_sub(s->sub_map["osd_pg_creates"]);
4989 } else if (p->first == "monmap") {
4990 monmon()->check_sub(s->sub_map[p->first]);
4991 } else if (logmon()->sub_name_to_id(p->first) >= 0) {
4992 logmon()->check_sub(s->sub_map[p->first]);
4993 } else if (p->first == "mgrmap" || p->first == "mgrdigest") {
4994 mgrmon()->check_sub(s->sub_map[p->first]);
4995 } else if (p->first == "servicemap") {
4996 mgrstatmon()->check_sub(s->sub_map[p->first]);
5001 // we only need to reply if the client is old enough to think it
5002 // has to send renewals.
5003 ConnectionRef con = m->get_connection();
5004 if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB))
5005 m->get_connection()->send_message(new MMonSubscribeAck(
5006 monmap->get_fsid(), (int)g_conf->mon_subscribe_interval));
5011 void Monitor::handle_get_version(MonOpRequestRef op)
5013 MMonGetVersion *m = static_cast<MMonGetVersion*>(op->get_req());
5014 dout(10) << "handle_get_version " << *m << dendl;
5015 PaxosService *svc = NULL;
5017 MonSession *s = op->get_session();
5020 if (!is_leader() && !is_peon()) {
5021 dout(10) << " waiting for quorum" << dendl;
5022 waitfor_quorum.push_back(new C_RetryMessage(this, op));
5026 if (m->what == "mdsmap") {
5028 } else if (m->what == "fsmap") {
5030 } else if (m->what == "osdmap") {
5032 } else if (m->what == "monmap") {
5035 derr << "invalid map type " << m->what << dendl;
5039 if (!svc->is_readable()) {
5040 svc->wait_for_readable(op, new C_RetryMessage(this, op));
5044 MMonGetVersionReply *reply = new MMonGetVersionReply();
5045 reply->handle = m->handle;
5046 reply->version = svc->get_last_committed();
5047 reply->oldest_version = svc->get_first_committed();
5048 reply->set_tid(m->get_tid());
5050 m->get_connection()->send_message(reply);
5056 bool Monitor::ms_handle_reset(Connection *con)
5058 dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl;
5060 // ignore lossless monitor sessions
5061 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
5064 MonSession *s = static_cast<MonSession *>(con->get_priv());
5068 // break any con <-> session ref cycle
5069 s->con->set_priv(NULL);
5074 Mutex::Locker l(lock);
5076 dout(10) << "reset/close on session " << s->inst << dendl;
5078 Mutex::Locker l(session_map_lock);
5085 bool Monitor::ms_handle_refused(Connection *con)
5087 // just log for now...
5088 dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl;
5094 void Monitor::send_latest_monmap(Connection *con)
5097 monmap->encode(bl, con->get_features());
5098 con->send_message(new MMonMap(bl));
5101 void Monitor::handle_mon_get_map(MonOpRequestRef op)
5103 MMonGetMap *m = static_cast<MMonGetMap*>(op->get_req());
5104 dout(10) << "handle_mon_get_map" << dendl;
5105 send_latest_monmap(m->get_connection().get());
5108 void Monitor::handle_mon_metadata(MonOpRequestRef op)
5110 MMonMetadata *m = static_cast<MMonMetadata*>(op->get_req());
5112 dout(10) << __func__ << dendl;
5113 update_mon_metadata(m->get_source().num(), std::move(m->data));
5117 void Monitor::update_mon_metadata(int from, Metadata&& m)
5119 // NOTE: this is now for legacy (kraken or jewel) mons only.
5120 pending_metadata[from] = std::move(m);
5122 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5124 ::encode(pending_metadata, bl);
5125 t->put(MONITOR_STORE_PREFIX, "last_metadata", bl);
5126 paxos->trigger_propose();
5129 int Monitor::load_metadata()
5132 int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl);
5135 bufferlist::iterator it = bl.begin();
5136 ::decode(mon_metadata, it);
5138 pending_metadata = mon_metadata;
5142 int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err)
5145 if (!mon_metadata.count(mon)) {
5146 err << "mon." << mon << " not found";
5149 const Metadata& m = mon_metadata[mon];
5150 for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) {
5151 f->dump_string(p->first.c_str(), p->second);
5156 void Monitor::count_metadata(const string& field, map<string,int> *out)
5158 for (auto& p : mon_metadata) {
5159 auto q = p.second.find(field);
5160 if (q == p.second.end()) {
5161 (*out)["unknown"]++;
5163 (*out)[q->second]++;
5168 void Monitor::count_metadata(const string& field, Formatter *f)
5170 map<string,int> by_val;
5171 count_metadata(field, &by_val);
5172 f->open_object_section(field.c_str());
5173 for (auto& p : by_val) {
5174 f->dump_int(p.first.c_str(), p.second);
5179 int Monitor::print_nodes(Formatter *f, ostream& err)
5181 map<string, list<int> > mons; // hostname => mon
5182 for (map<int, Metadata>::iterator it = mon_metadata.begin();
5183 it != mon_metadata.end(); ++it) {
5184 const Metadata& m = it->second;
5185 Metadata::const_iterator hostname = m.find("hostname");
5186 if (hostname == m.end()) {
5187 // not likely though
5190 mons[hostname->second].push_back(it->first);
5193 dump_services(f, mons, "mon");
5197 // ----------------------------------------------
5200 int Monitor::scrub_start()
5202 dout(10) << __func__ << dendl;
5203 assert(is_leader());
5205 if (!scrub_result.empty()) {
5206 clog->info() << "scrub already in progress";
5210 scrub_event_cancel();
5211 scrub_result.clear();
5212 scrub_state.reset(new ScrubState);
5218 int Monitor::scrub()
5220 assert(is_leader());
5221 assert(scrub_state);
5223 scrub_cancel_timeout();
5224 wait_for_paxos_write();
5225 scrub_version = paxos->get_version();
5228 // scrub all keys if we're the only monitor in the quorum
5230 (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
5232 for (set<int>::iterator p = quorum.begin();
5237 MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
5239 r->key = scrub_state->last_key;
5240 messenger->send_message(r, monmap->get_inst(*p));
5244 bool r = _scrub(&scrub_result[rank],
5245 &scrub_state->last_key,
5248 scrub_state->finished = !r;
5250 // only after we got our scrub results do we really care whether the
5251 // other monitors are late on their results. Also, this way we avoid
5252 // triggering the timeout if we end up getting stuck in _scrub() for
5253 // longer than the duration of the timeout.
5254 scrub_reset_timeout();
5256 if (quorum.size() == 1) {
5257 assert(scrub_state->finished == true);
5263 void Monitor::handle_scrub(MonOpRequestRef op)
5265 MMonScrub *m = static_cast<MMonScrub*>(op->get_req());
5266 dout(10) << __func__ << " " << *m << dendl;
5268 case MMonScrub::OP_SCRUB:
5273 wait_for_paxos_write();
5275 if (m->version != paxos->get_version())
5278 MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
5282 reply->key = m->key;
5283 _scrub(&reply->result, &reply->key, &reply->num_keys);
5284 m->get_connection()->send_message(reply);
5288 case MMonScrub::OP_RESULT:
5292 if (m->version != scrub_version)
5294 // reset the timeout each time we get a result
5295 scrub_reset_timeout();
5297 int from = m->get_source().num();
5298 assert(scrub_result.count(from) == 0);
5299 scrub_result[from] = m->result;
5301 if (scrub_result.size() == quorum.size()) {
5302 scrub_check_results();
5303 scrub_result.clear();
5304 if (scrub_state->finished)
5314 bool Monitor::_scrub(ScrubResult *r,
5315 pair<string,string> *start,
5319 assert(start != NULL);
5320 assert(num_keys != NULL);
5322 set<string> prefixes = get_sync_targets_names();
5323 prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
5325 dout(10) << __func__ << " start (" << *start << ")"
5326 << " num_keys " << *num_keys << dendl;
5328 MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
5330 int scrubbed_keys = 0;
5331 pair<string,string> last_key;
5333 while (it->has_next_chunk()) {
5335 if (*num_keys > 0 && scrubbed_keys == *num_keys)
5338 pair<string,string> k = it->get_next_key();
5339 if (prefixes.count(k.first) == 0)
5342 if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 &&
5343 (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) {
5344 dout(10) << __func__ << " inject missing key, skipping (" << k << ")"
5350 int err = store->get(k.first, k.second, bl);
5353 uint32_t key_crc = bl.crc32c(0);
5354 dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
5355 << " crc " << key_crc << dendl;
5356 r->prefix_keys[k.first]++;
5357 if (r->prefix_crc.count(k.first) == 0) {
5358 r->prefix_crc[k.first] = 0;
5360 r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
5362 if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 &&
5363 (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) {
5364 dout(10) << __func__ << " inject failure at (" << k << ")" << dendl;
5365 r->prefix_crc[k.first] += 1;
5372 dout(20) << __func__ << " last_key (" << last_key << ")"
5373 << " scrubbed_keys " << scrubbed_keys
5374 << " has_next " << it->has_next_chunk() << dendl;
5377 *num_keys = scrubbed_keys;
5379 return it->has_next_chunk();
5382 void Monitor::scrub_check_results()
5384 dout(10) << __func__ << dendl;
5388 ScrubResult& mine = scrub_result[rank];
5389 for (map<int,ScrubResult>::iterator p = scrub_result.begin();
5390 p != scrub_result.end();
5392 if (p->first == rank)
5394 if (p->second != mine) {
5396 clog->error() << "scrub mismatch";
5397 clog->error() << " mon." << rank << " " << mine;
5398 clog->error() << " mon." << p->first << " " << p->second;
5402 clog->debug() << "scrub ok on " << quorum << ": " << mine;
5405 inline void Monitor::scrub_timeout()
5407 dout(1) << __func__ << " restarting scrub" << dendl;
5412 void Monitor::scrub_finish()
5414 dout(10) << __func__ << dendl;
5416 scrub_event_start();
5419 void Monitor::scrub_reset()
5421 dout(10) << __func__ << dendl;
5422 scrub_cancel_timeout();
5424 scrub_result.clear();
5425 scrub_state.reset();
5428 inline void Monitor::scrub_update_interval(int secs)
5430 // we don't care about changes if we are not the leader.
5431 // changes will be visible if we become the leader.
5435 dout(1) << __func__ << " new interval = " << secs << dendl;
5437 // if scrub already in progress, all changes will already be visible during
5438 // the next round. Nothing to do.
5439 if (scrub_state != NULL)
5442 scrub_event_cancel();
5443 scrub_event_start();
5446 void Monitor::scrub_event_start()
5448 dout(10) << __func__ << dendl;
5451 scrub_event_cancel();
5453 if (cct->_conf->mon_scrub_interval <= 0) {
5454 dout(1) << __func__ << " scrub event is disabled"
5455 << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval
5460 scrub_event = timer.add_event_after(
5461 cct->_conf->mon_scrub_interval,
5462 new C_MonContext(this, [this](int) {
5467 void Monitor::scrub_event_cancel()
5469 dout(10) << __func__ << dendl;
5471 timer.cancel_event(scrub_event);
5476 inline void Monitor::scrub_cancel_timeout()
5478 if (scrub_timeout_event) {
5479 timer.cancel_event(scrub_timeout_event);
5480 scrub_timeout_event = NULL;
5484 void Monitor::scrub_reset_timeout()
5486 dout(15) << __func__ << " reset timeout event" << dendl;
5487 scrub_cancel_timeout();
5488 scrub_timeout_event = timer.add_event_after(
5489 g_conf->mon_scrub_timeout,
5490 new C_MonContext(this, [this](int) {
5495 /************ TICK ***************/
5496 void Monitor::new_tick()
5498 timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) {
5503 void Monitor::tick()
5506 dout(11) << "tick" << dendl;
5507 const utime_t now = ceph_clock_now();
5509 // Check if we need to emit any delayed health check updated messages
5511 const auto min_period = g_conf->get_val<int64_t>(
5512 "mon_health_log_update_period");
5513 for (auto& svc : paxos_service) {
5514 auto health = svc->get_health_checks();
5516 for (const auto &i : health.checks) {
5517 const std::string &code = i.first;
5518 const std::string &summary = i.second.summary;
5519 const health_status_t severity = i.second.severity;
5521 auto status_iter = health_check_log_times.find(code);
5522 if (status_iter == health_check_log_times.end()) {
5526 auto &log_status = status_iter->second;
5527 bool const changed = log_status.last_message != summary
5528 || log_status.severity != severity;
5530 if (changed && now - log_status.updated_at > min_period) {
5531 log_status.last_message = summary;
5532 log_status.updated_at = now;
5533 log_status.severity = severity;
5536 ss << "Health check update: " << summary << " (" << code << ")";
5537 clog->health(severity) << ss.str();
5544 for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) {
5551 Mutex::Locker l(session_map_lock);
5552 auto p = session_map.sessions.begin();
5554 bool out_for_too_long = (!exited_quorum.is_zero() &&
5555 now > (exited_quorum + 2*g_conf->mon_lease));
5561 // don't trim monitors
5562 if (s->inst.name.is_mon())
5565 if (s->session_timeout < now && s->con) {
5566 // check keepalive, too
5567 s->session_timeout = s->con->get_last_keepalive();
5568 s->session_timeout += g_conf->mon_session_timeout;
5570 if (s->session_timeout < now) {
5571 dout(10) << " trimming session " << s->con << " " << s->inst
5572 << " (timeout " << s->session_timeout
5573 << " < now " << now << ")" << dendl;
5574 } else if (out_for_too_long) {
5575 // boot the client Session because we've taken too long getting back in
5576 dout(10) << " trimming session " << s->con << " " << s->inst
5577 << " because we've been out of quorum too long" << dendl;
5582 s->con->mark_down();
5584 logger->inc(l_mon_session_trim);
5587 sync_trim_providers();
5589 if (!maybe_wait_for_quorum.empty()) {
5590 finish_contexts(g_ceph_context, maybe_wait_for_quorum);
5593 if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
5594 // this is only necessary on upgraded clusters.
5595 MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
5596 prepare_new_fingerprint(t);
5597 paxos->trigger_propose();
5603 void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
5606 nf.generate_random();
5607 dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl;
5611 t->put(MONITOR_NAME, "cluster_fingerprint", bl);
5614 int Monitor::check_fsid()
5617 int r = store->get(MONITOR_NAME, "cluster_uuid", ebl);
5622 string es(ebl.c_str(), ebl.length());
5624 // only keep the first line
5625 size_t pos = es.find_first_of('\n');
5626 if (pos != string::npos)
5629 dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl;
5631 if (!ondisk.parse(es.c_str())) {
5632 derr << "error: unable to parse uuid" << dendl;
5636 if (monmap->get_fsid() != ondisk) {
5637 derr << "error: cluster_uuid file exists with value " << ondisk
5638 << ", != our uuid " << monmap->get_fsid() << dendl;
5645 int Monitor::write_fsid()
5647 auto t(std::make_shared<MonitorDBStore::Transaction>());
5649 int r = store->apply_transaction(t);
5653 int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
5656 ss << monmap->get_fsid() << "\n";
5657 string us = ss.str();
5662 t->put(MONITOR_NAME, "cluster_uuid", b);
5667 * this is the closest thing to a traditional 'mkfs' for ceph.
5668 * initialize the monitor state machines to their initial values.
5670 int Monitor::mkfs(bufferlist& osdmapbl)
5672 auto t(std::make_shared<MonitorDBStore::Transaction>());
5674 // verify cluster fsid
5675 int r = check_fsid();
5676 if (r < 0 && r != -ENOENT)
5680 magicbl.append(CEPH_MON_ONDISK_MAGIC);
5681 magicbl.append("\n");
5682 t->put(MONITOR_NAME, "magic", magicbl);
5685 features = get_initial_supported_features();
5688 // save monmap, osdmap, keyring.
5689 bufferlist monmapbl;
5690 monmap->encode(monmapbl, CEPH_FEATURES_ALL);
5691 monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
5692 t->put("mkfs", "monmap", monmapbl);
5694 if (osdmapbl.length()) {
5695 // make sure it's a valid osdmap
5698 om.decode(osdmapbl);
5700 catch (buffer::error& e) {
5701 derr << "error decoding provided osdmap: " << e.what() << dendl;
5704 t->put("mkfs", "osdmap", osdmapbl);
5707 if (is_keyring_required()) {
5709 string keyring_filename;
5711 r = ceph_resolve_file_search(g_conf->keyring, keyring_filename);
5713 derr << "unable to find a keyring file on " << g_conf->keyring
5714 << ": " << cpp_strerror(r) << dendl;
5715 if (g_conf->key != "") {
5716 string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key +
5717 "\n\tcaps mon = \"allow *\"\n";
5719 bl.append(keyring_plaintext);
5721 bufferlist::iterator i = bl.begin();
5722 keyring.decode_plaintext(i);
5724 catch (const buffer::error& e) {
5725 derr << "error decoding keyring " << keyring_plaintext
5726 << ": " << e.what() << dendl;
5733 r = keyring.load(g_ceph_context, keyring_filename);
5735 derr << "unable to load initial keyring " << g_conf->keyring << dendl;
5740 // put mon. key in external keyring; seed with everything else.
5741 extract_save_mon_key(keyring);
5743 bufferlist keyringbl;
5744 keyring.encode_plaintext(keyringbl);
5745 t->put("mkfs", "keyring", keyringbl);
5748 store->apply_transaction(t);
5753 int Monitor::write_default_keyring(bufferlist& bl)
5756 os << g_conf->mon_data << "/keyring";
5759 int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600);
5762 dout(0) << __func__ << " failed to open " << os.str()
5763 << ": " << cpp_strerror(err) << dendl;
5767 err = bl.write_fd(fd);
5770 VOID_TEMP_FAILURE_RETRY(::close(fd));
5775 void Monitor::extract_save_mon_key(KeyRing& keyring)
5777 EntityName mon_name;
5778 mon_name.set_type(CEPH_ENTITY_TYPE_MON);
5780 if (keyring.get_auth(mon_name, mon_key)) {
5781 dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl;
5783 pkey.add(mon_name, mon_key);
5785 pkey.encode_plaintext(bl);
5786 write_default_keyring(bl);
5787 keyring.remove(mon_name);
5791 bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer,
5794 dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id)
5800 // we only connect to other monitors and mgr; every else connects to us.
5801 if (service_id != CEPH_ENTITY_TYPE_MON &&
5802 service_id != CEPH_ENTITY_TYPE_MGR)
5805 if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5807 dout(20) << __func__ << " building auth_none authorizer" << dendl;
5808 AuthNoneClientHandler handler(g_ceph_context, nullptr);
5809 handler.set_global_id(0);
5810 *authorizer = handler.build_authorizer(service_id);
5814 CephXServiceTicketInfo auth_ticket_info;
5815 CephXSessionAuthInfo info;
5819 name.set_type(CEPH_ENTITY_TYPE_MON);
5820 auth_ticket_info.ticket.name = name;
5821 auth_ticket_info.ticket.global_id = 0;
5823 if (service_id == CEPH_ENTITY_TYPE_MON) {
5824 // mon to mon authentication uses the private monitor shared key and not the
5827 if (!keyring.get_secret(name, secret) &&
5828 !key_server.get_secret(name, secret)) {
5829 dout(0) << " couldn't get secret for mon service from keyring or keyserver"
5831 stringstream ss, ds;
5832 int err = key_server.list_secrets(ds);
5834 ss << "no installed auth entries!";
5836 ss << "installed auth entries:";
5837 dout(0) << ss.str() << "\n" << ds.str() << dendl;
5841 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info,
5842 secret, (uint64_t)-1);
5844 dout(0) << __func__ << " failed to build mon session_auth_info "
5845 << cpp_strerror(ret) << dendl;
5848 } else if (service_id == CEPH_ENTITY_TYPE_MGR) {
5850 ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info);
5852 derr << __func__ << " failed to build mgr service session_auth_info "
5853 << cpp_strerror(ret) << dendl;
5857 ceph_abort(); // see check at top of fn
5860 CephXTicketBlob blob;
5861 if (!cephx_build_service_ticket_blob(cct, info, blob)) {
5862 dout(0) << "ms_get_authorizer failed to build service ticket" << dendl;
5865 bufferlist ticket_data;
5866 ::encode(blob, ticket_data);
5868 bufferlist::iterator iter = ticket_data.begin();
5869 CephXTicketHandler handler(g_ceph_context, service_id);
5870 ::decode(handler.ticket, iter);
5872 handler.session_key = info.session_key;
5874 *authorizer = handler.build_authorizer(0);
5879 bool Monitor::ms_verify_authorizer(Connection *con, int peer_type,
5880 int protocol, bufferlist& authorizer_data,
5881 bufferlist& authorizer_reply,
5882 bool& isvalid, CryptoKey& session_key)
5884 dout(10) << "ms_verify_authorizer " << con->get_peer_addr()
5885 << " " << ceph_entity_type_name(peer_type)
5886 << " protocol " << protocol << dendl;
5891 if (peer_type == CEPH_ENTITY_TYPE_MON &&
5892 auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) {
5893 // monitor, and cephx is enabled
5895 if (protocol == CEPH_AUTH_CEPHX) {
5896 bufferlist::iterator iter = authorizer_data.begin();
5897 CephXServiceTicketInfo auth_ticket_info;
5899 if (authorizer_data.length()) {
5900 bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter,
5901 auth_ticket_info, authorizer_reply);
5903 session_key = auth_ticket_info.session_key;
5906 dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl;
5910 dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl;