// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #include #include #include #include #include #include #include #include "Monitor.h" #include "common/version.h" #include "osd/OSDMap.h" #include "MonitorDBStore.h" #include "messages/PaxosServiceMessage.h" #include "messages/MMonMap.h" #include "messages/MMonGetMap.h" #include "messages/MMonGetVersion.h" #include "messages/MMonGetVersionReply.h" #include "messages/MGenericMessage.h" #include "messages/MMonCommand.h" #include "messages/MMonCommandAck.h" #include "messages/MMonHealth.h" #include "messages/MMonMetadata.h" #include "messages/MMonSync.h" #include "messages/MMonScrub.h" #include "messages/MMonProbe.h" #include "messages/MMonJoin.h" #include "messages/MMonPaxos.h" #include "messages/MRoute.h" #include "messages/MForward.h" #include "messages/MStatfs.h" #include "messages/MMonSubscribe.h" #include "messages/MMonSubscribeAck.h" #include "messages/MAuthReply.h" #include "messages/MTimeCheck.h" #include "messages/MPing.h" #include "common/strtol.h" #include "common/ceph_argparse.h" #include "common/Timer.h" #include "common/Clock.h" #include "common/errno.h" #include "common/perf_counters.h" #include "common/admin_socket.h" #include "global/signal_handler.h" #include "common/Formatter.h" #include "include/stringify.h" #include "include/color.h" #include "include/ceph_fs.h" #include "include/str_list.h" #include "OSDMonitor.h" #include "MDSMonitor.h" #include "MonmapMonitor.h" #include "PGMonitor.h" #include "LogMonitor.h" #include "AuthMonitor.h" #include "MgrMonitor.h" #include "MgrStatMonitor.h" #include "mon/QuorumService.h" #include "mon/OldHealthMonitor.h" #include "mon/HealthMonitor.h" #include "mon/ConfigKeyService.h" #include "common/config.h" #include "common/cmdparse.h" #include "include/assert.h" #include "include/compat.h" #include "perfglue/heap_profiler.h" #include "auth/none/AuthNoneClientHandler.h" #define dout_subsys ceph_subsys_mon #undef dout_prefix #define dout_prefix _prefix(_dout, this) static ostream& _prefix(std::ostream *_dout, const Monitor *mon) { return *_dout << "mon." << mon->name << "@" << mon->rank << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " "; } const string Monitor::MONITOR_NAME = "monitor"; const string Monitor::MONITOR_STORE_PREFIX = "monitor_store"; #undef FLAG #undef COMMAND #undef COMMAND_WITH_FLAG #define FLAG(f) (MonCommand::FLAG_##f) #define COMMAND(parsesig, helptext, modulename, req_perms, avail) \ {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)}, #define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \ {parsesig, helptext, modulename, req_perms, avail, flags}, MonCommand mon_commands[] = { #include }; MonCommand pgmonitor_commands[] = { #include }; #undef COMMAND #undef COMMAND_WITH_FLAG void C_MonContext::finish(int r) { if (mon->is_shutdown()) return; FunctionContext::finish(r); } Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, Messenger *m, Messenger *mgr_m, MonMap *map) : Dispatcher(cct_), name(nm), rank(-1), messenger(m), con_self(m ? m->get_loopback_connection() : NULL), lock("Monitor::lock"), timer(cct_, lock), finisher(cct_, "mon_finisher", "fin"), cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads), has_ever_joined(false), logger(NULL), cluster_logger(NULL), cluster_logger_registered(false), monmap(map), log_client(cct_, messenger, monmap, LogClient::FLAG_MON), key_server(cct, &keyring), auth_cluster_required(cct, cct->_conf->auth_supported.empty() ? cct->_conf->auth_cluster_required : cct->_conf->auth_supported), auth_service_required(cct, cct->_conf->auth_supported.empty() ? cct->_conf->auth_service_required : cct->_conf->auth_supported ), mgr_messenger(mgr_m), mgr_client(cct_, mgr_m), pgservice(nullptr), store(s), state(STATE_PROBING), elector(this), required_features(0), leader(0), quorum_con_features(0), // scrub scrub_version(0), scrub_event(NULL), scrub_timeout_event(NULL), // sync state sync_provider_count(0), sync_cookie(0), sync_full(false), sync_start_version(0), sync_timeout_event(NULL), sync_last_committed_floor(0), timecheck_round(0), timecheck_acks(0), timecheck_rounds_since_clean(0), timecheck_event(NULL), paxos_service(PAXOS_NUM), admin_hook(NULL), routed_request_tid(0), op_tracker(cct, true, 1) { clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER); audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT); update_log_clients(); paxos = new Paxos(this, "paxos"); paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap"); paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap"); paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap"); paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap"); paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm"); paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth"); paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr"); paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat"); paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health"); health_monitor = new OldHealthMonitor(this); config_key_service = new ConfigKeyService(this, paxos); mon_caps = new MonCap(); bool r = mon_caps->parse("allow *", NULL); assert(r); exited_quorum = ceph_clock_now(); // prepare local commands local_mon_commands.resize(ARRAY_SIZE(mon_commands)); for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) { local_mon_commands[i] = mon_commands[i]; } MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl); local_upgrading_mon_commands = local_mon_commands; for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) { local_upgrading_mon_commands.push_back(pgmonitor_commands[i]); } MonCommand::encode_vector(local_upgrading_mon_commands, local_upgrading_mon_commands_bl); // assume our commands until we have an election. this only means // we won't reply with EINVAL before the election; any command that // actually matters will wait until we have quorum etc and then // retry (and revalidate). leader_mon_commands = local_mon_commands; // note: OSDMonitor may update this based on the luminous flag. pgservice = mgrstatmon()->get_pg_stat_service(); } Monitor::~Monitor() { for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) delete *p; delete health_monitor; delete config_key_service; delete paxos; assert(session_map.sessions.empty()); delete mon_caps; } class AdminHook : public AdminSocketHook { Monitor *mon; public: explicit AdminHook(Monitor *m) : mon(m) {} bool call(std::string command, cmdmap_t& cmdmap, std::string format, bufferlist& out) override { stringstream ss; mon->do_admin_command(command, cmdmap, format, ss); out.append(ss); return true; } }; void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format, ostream& ss) { Mutex::Locker l(lock); boost::scoped_ptr f(Formatter::create(format)); string args; for (cmdmap_t::iterator p = cmdmap.begin(); p != cmdmap.end(); ++p) { if (p->first == "prefix") continue; if (!args.empty()) args += ", "; args += cmd_vartype_stringify(p->second); } args = "[" + args + "]"; bool read_only = (command == "mon_status" || command == "mon metadata" || command == "quorum_status" || command == "ops" || command == "sessions"); (read_only ? audit_clog->debug() : audit_clog->info()) << "from='admin socket' entity='admin socket' " << "cmd='" << command << "' args=" << args << ": dispatch"; if (command == "mon_status") { get_mon_status(f.get(), ss); if (f) f->flush(ss); } else if (command == "quorum_status") { _quorum_status(f.get(), ss); } else if (command == "sync_force") { string validate; if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) || (validate != "--yes-i-really-mean-it")) { ss << "are you SURE? this will mean the monitor store will be erased " "the next time the monitor is restarted. pass " "'--yes-i-really-mean-it' if you really do."; goto abort; } sync_force(f.get(), ss); } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) { if (!_add_bootstrap_peer_hint(command, cmdmap, ss)) goto abort; } else if (command == "quorum enter") { elector.start_participating(); start_election(); ss << "started responding to quorum, initiated new election"; } else if (command == "quorum exit") { start_election(); elector.stop_participating(); ss << "stopped responding to quorum, initiated new election"; } else if (command == "ops") { (void)op_tracker.dump_ops_in_flight(f.get()); if (f) { f->flush(ss); } } else if (command == "sessions") { if (f) { f->open_array_section("sessions"); for (auto p : session_map.sessions) { f->dump_stream("session") << *p; } f->close_section(); f->flush(ss); } } else { assert(0 == "bad AdminSocket command binding"); } (read_only ? audit_clog->debug() : audit_clog->info()) << "from='admin socket' " << "entity='admin socket' " << "cmd=" << command << " " << "args=" << args << ": finished"; return; abort: (read_only ? audit_clog->debug() : audit_clog->info()) << "from='admin socket' " << "entity='admin socket' " << "cmd=" << command << " " << "args=" << args << ": aborted"; } void Monitor::handle_signal(int signum) { assert(signum == SIGINT || signum == SIGTERM); derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl; shutdown(); } CompatSet Monitor::get_initial_supported_features() { CompatSet::FeatureSet ceph_mon_feature_compat; CompatSet::FeatureSet ceph_mon_feature_ro_compat; CompatSet::FeatureSet ceph_mon_feature_incompat; ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE); ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS); return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat, ceph_mon_feature_incompat); } CompatSet Monitor::get_supported_features() { CompatSet compat = get_initial_supported_features(); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN); compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS); return compat; } CompatSet Monitor::get_legacy_features() { CompatSet::FeatureSet ceph_mon_feature_compat; CompatSet::FeatureSet ceph_mon_feature_ro_compat; CompatSet::FeatureSet ceph_mon_feature_incompat; ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE); return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat, ceph_mon_feature_incompat); } int Monitor::check_features(MonitorDBStore *store) { CompatSet required = get_supported_features(); CompatSet ondisk; read_features_off_disk(store, &ondisk); if (!required.writeable(ondisk)) { CompatSet diff = required.unsupported(ondisk); generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl; return -EPERM; } return 0; } void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features) { bufferlist featuresbl; store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl); if (featuresbl.length() == 0) { generic_dout(0) << "WARNING: mon fs missing feature list.\n" << "Assuming it is old-style and introducing one." << dendl; //we only want the baseline ~v.18 features assumed to be on disk. //If new features are introduced this code needs to disappear or //be made smarter. *features = get_legacy_features(); features->encode(featuresbl); auto t(std::make_shared()); t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl); store->apply_transaction(t); } else { bufferlist::iterator it = featuresbl.begin(); features->decode(it); } } void Monitor::read_features() { read_features_off_disk(store, &features); dout(10) << "features " << features << dendl; calc_quorum_requirements(); dout(10) << "required_features " << required_features << dendl; } void Monitor::write_features(MonitorDBStore::TransactionRef t) { bufferlist bl; features.encode(bl); t->put(MONITOR_NAME, COMPAT_SET_LOC, bl); } const char** Monitor::get_tracked_conf_keys() const { static const char* KEYS[] = { "crushtool", // helpful for testing "mon_election_timeout", "mon_lease", "mon_lease_renew_interval_factor", "mon_lease_ack_timeout_factor", "mon_accept_timeout_factor", // clog & admin clog "clog_to_monitors", "clog_to_syslog", "clog_to_syslog_facility", "clog_to_syslog_level", "clog_to_graylog", "clog_to_graylog_host", "clog_to_graylog_port", "host", "fsid", // periodic health to clog "mon_health_to_clog", "mon_health_to_clog_interval", "mon_health_to_clog_tick_interval", // scrub interval "mon_scrub_interval", NULL }; return KEYS; } void Monitor::handle_conf_change(const struct md_config_t *conf, const std::set &changed) { sanitize_options(); dout(10) << __func__ << " " << changed << dendl; if (changed.count("clog_to_monitors") || changed.count("clog_to_syslog") || changed.count("clog_to_syslog_level") || changed.count("clog_to_syslog_facility") || changed.count("clog_to_graylog") || changed.count("clog_to_graylog_host") || changed.count("clog_to_graylog_port") || changed.count("host") || changed.count("fsid")) { update_log_clients(); } if (changed.count("mon_health_to_clog") || changed.count("mon_health_to_clog_interval") || changed.count("mon_health_to_clog_tick_interval")) { health_to_clog_update_conf(changed); } if (changed.count("mon_scrub_interval")) { scrub_update_interval(conf->mon_scrub_interval); } } void Monitor::update_log_clients() { map log_to_monitors; map log_to_syslog; map log_channel; map log_prio; map log_to_graylog; map log_to_graylog_host; map log_to_graylog_port; uuid_d fsid; string host; if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog, log_channel, log_prio, log_to_graylog, log_to_graylog_host, log_to_graylog_port, fsid, host)) return; clog->update_config(log_to_monitors, log_to_syslog, log_channel, log_prio, log_to_graylog, log_to_graylog_host, log_to_graylog_port, fsid, host); audit_clog->update_config(log_to_monitors, log_to_syslog, log_channel, log_prio, log_to_graylog, log_to_graylog_host, log_to_graylog_port, fsid, host); } int Monitor::sanitize_options() { int r = 0; // mon_lease must be greater than mon_lease_renewal; otherwise we // may incur in leases expiring before they are renewed. if (g_conf->mon_lease_renew_interval_factor >= 1.0) { clog->error() << "mon_lease_renew_interval_factor (" << g_conf->mon_lease_renew_interval_factor << ") must be less than 1.0"; r = -EINVAL; } // mon_lease_ack_timeout must be greater than mon_lease to make sure we've // got time to renew the lease and get an ack for it. Having both options // with the same value, for a given small vale, could mean timing out if // the monitors happened to be overloaded -- or even under normal load for // a small enough value. if (g_conf->mon_lease_ack_timeout_factor <= 1.0) { clog->error() << "mon_lease_ack_timeout_factor (" << g_conf->mon_lease_ack_timeout_factor << ") must be greater than 1.0"; r = -EINVAL; } return r; } int Monitor::preinit() { lock.Lock(); dout(1) << "preinit fsid " << monmap->fsid << dendl; int r = sanitize_options(); if (r < 0) { derr << "option sanitization failed!" << dendl; lock.Unlock(); return r; } assert(!logger); { PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last); pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess", PerfCountersBuilder::PRIO_USEFUL); pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions", "sadd", PerfCountersBuilder::PRIO_INTERESTING); pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions", "srm", PerfCountersBuilder::PRIO_INTERESTING); pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions", "strm", PerfCountersBuilder::PRIO_USEFUL); pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in", "ecnt", PerfCountersBuilder::PRIO_USEFUL); pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started", "estt", PerfCountersBuilder::PRIO_INTERESTING); pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won", "ewon", PerfCountersBuilder::PRIO_INTERESTING); pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost", "elst", PerfCountersBuilder::PRIO_INTERESTING); logger = pcb.create_perf_counters(); cct->get_perfcounters_collection()->add(logger); } assert(!cluster_logger); { PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last); pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors"); pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum"); pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs"); pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up"); pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)"); pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map"); pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster"); pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space"); pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space"); pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools"); pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups"); pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state"); pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state"); pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state"); pcb.add_u64(l_cluster_num_object, "num_object", "Objects"); pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects"); pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects"); pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects"); pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects"); pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up"); pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)"); pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS"); pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map"); cluster_logger = pcb.create_perf_counters(); } paxos->init_logger(); // verify cluster_uuid { int r = check_fsid(); if (r == -ENOENT) r = write_fsid(); if (r < 0) { lock.Unlock(); return r; } } // open compatset read_features(); // have we ever joined a quorum? has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0); dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl; if (!has_ever_joined) { // impose initial quorum restrictions? list initial_members; get_str_list(g_conf->mon_initial_members, initial_members); if (!initial_members.empty()) { dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl; monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(), &extra_probe_peers); dout(10) << " monmap is " << *monmap << dendl; dout(10) << " extra probe peers " << extra_probe_peers << dendl; } } else if (!monmap->contains(name)) { derr << "not in monmap and have been in a quorum before; " << "must have been removed" << dendl; if (g_conf->mon_force_quorum_join) { dout(0) << "we should have died but " << "'mon_force_quorum_join' is set -- allowing boot" << dendl; } else { derr << "commit suicide!" << dendl; lock.Unlock(); return -ENOENT; } } { // We have a potentially inconsistent store state in hands. Get rid of it // and start fresh. bool clear_store = false; if (store->exists("mon_sync", "in_sync")) { dout(1) << __func__ << " clean up potentially inconsistent store state" << dendl; clear_store = true; } if (store->get("mon_sync", "force_sync") > 0) { dout(1) << __func__ << " force sync by clearing store state" << dendl; clear_store = true; } if (clear_store) { set sync_prefixes = get_sync_targets_names(); store->clear(sync_prefixes); } } sync_last_committed_floor = store->get("mon_sync", "last_committed_floor"); dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl; init_paxos(); health_monitor->init(); if (is_keyring_required()) { // we need to bootstrap authentication keys so we can form an // initial quorum. if (authmon()->get_last_committed() == 0) { dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl; bufferlist bl; int err = store->get("mkfs", "keyring", bl); if (err == 0 && bl.length() > 0) { // Attempt to decode and extract keyring only if it is found. KeyRing keyring; bufferlist::iterator p = bl.begin(); ::decode(keyring, p); extract_save_mon_key(keyring); } } string keyring_loc = g_conf->mon_data + "/keyring"; r = keyring.load(cct, keyring_loc); if (r < 0) { EntityName mon_name; mon_name.set_type(CEPH_ENTITY_TYPE_MON); EntityAuth mon_key; if (key_server.get_auth(mon_name, mon_key)) { dout(1) << "copying mon. key from old db to external keyring" << dendl; keyring.add(mon_name, mon_key); bufferlist bl; keyring.encode_plaintext(bl); write_default_keyring(bl); } else { derr << "unable to load initial keyring " << g_conf->keyring << dendl; lock.Unlock(); return r; } } } admin_hook = new AdminHook(this); AdminSocket* admin_socket = cct->get_admin_socket(); // unlock while registering to avoid mon_lock -> admin socket lock dependency. lock.Unlock(); r = admin_socket->register_command("mon_status", "mon_status", admin_hook, "show current monitor status"); assert(r == 0); r = admin_socket->register_command("quorum_status", "quorum_status", admin_hook, "show current quorum status"); assert(r == 0); r = admin_socket->register_command("sync_force", "sync_force name=validate," "type=CephChoices," "strings=--yes-i-really-mean-it", admin_hook, "force sync of and clear monitor store"); assert(r == 0); r = admin_socket->register_command("add_bootstrap_peer_hint", "add_bootstrap_peer_hint name=addr," "type=CephIPAddr", admin_hook, "add peer address as potential bootstrap" " peer for cluster bringup"); assert(r == 0); r = admin_socket->register_command("quorum enter", "quorum enter", admin_hook, "force monitor back into quorum"); assert(r == 0); r = admin_socket->register_command("quorum exit", "quorum exit", admin_hook, "force monitor out of the quorum"); assert(r == 0); r = admin_socket->register_command("ops", "ops", admin_hook, "show the ops currently in flight"); assert(r == 0); r = admin_socket->register_command("sessions", "sessions", admin_hook, "list existing sessions"); assert(r == 0); lock.Lock(); // add ourselves as a conf observer g_conf->add_observer(this); lock.Unlock(); return 0; } int Monitor::init() { dout(2) << "init" << dendl; Mutex::Locker l(lock); finisher.start(); // start ticker timer.init(); new_tick(); cpu_tp.start(); // i'm ready! messenger->add_dispatcher_tail(this); mgr_client.init(); mgr_messenger->add_dispatcher_tail(&mgr_client); mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls bootstrap(); // add features of myself into feature_map session_map.feature_map.add_mon(con_self->get_features()); return 0; } void Monitor::init_paxos() { dout(10) << __func__ << dendl; paxos->init(); // init services for (int i = 0; i < PAXOS_NUM; ++i) { paxos_service[i]->init(); } refresh_from_paxos(NULL); } void Monitor::refresh_from_paxos(bool *need_bootstrap) { dout(10) << __func__ << dendl; bufferlist bl; int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl); if (r >= 0) { try { bufferlist::iterator p = bl.begin(); ::decode(fingerprint, p); } catch (buffer::error& e) { dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl; } } else { dout(10) << __func__ << " no cluster_fingerprint" << dendl; } for (int i = 0; i < PAXOS_NUM; ++i) { paxos_service[i]->refresh(need_bootstrap); } for (int i = 0; i < PAXOS_NUM; ++i) { paxos_service[i]->post_refresh(); } load_metadata(); } void Monitor::register_cluster_logger() { if (!cluster_logger_registered) { dout(10) << "register_cluster_logger" << dendl; cluster_logger_registered = true; cct->get_perfcounters_collection()->add(cluster_logger); } else { dout(10) << "register_cluster_logger - already registered" << dendl; } } void Monitor::unregister_cluster_logger() { if (cluster_logger_registered) { dout(10) << "unregister_cluster_logger" << dendl; cluster_logger_registered = false; cct->get_perfcounters_collection()->remove(cluster_logger); } else { dout(10) << "unregister_cluster_logger - not registered" << dendl; } } void Monitor::update_logger() { cluster_logger->set(l_cluster_num_mon, monmap->size()); cluster_logger->set(l_cluster_num_mon_quorum, quorum.size()); } void Monitor::shutdown() { dout(1) << "shutdown" << dendl; lock.Lock(); wait_for_paxos_write(); state = STATE_SHUTDOWN; g_conf->remove_observer(this); if (admin_hook) { AdminSocket* admin_socket = cct->get_admin_socket(); admin_socket->unregister_command("mon_status"); admin_socket->unregister_command("quorum_status"); admin_socket->unregister_command("sync_force"); admin_socket->unregister_command("add_bootstrap_peer_hint"); admin_socket->unregister_command("quorum enter"); admin_socket->unregister_command("quorum exit"); admin_socket->unregister_command("ops"); admin_socket->unregister_command("sessions"); delete admin_hook; admin_hook = NULL; } elector.shutdown(); mgr_client.shutdown(); lock.Unlock(); finisher.wait_for_empty(); finisher.stop(); lock.Lock(); // clean up paxos->shutdown(); for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) (*p)->shutdown(); health_monitor->shutdown(); finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED); finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED); timer.shutdown(); cpu_tp.stop(); remove_all_sessions(); if (logger) { cct->get_perfcounters_collection()->remove(logger); delete logger; logger = NULL; } if (cluster_logger) { if (cluster_logger_registered) cct->get_perfcounters_collection()->remove(cluster_logger); delete cluster_logger; cluster_logger = NULL; } log_client.shutdown(); // unlock before msgr shutdown... lock.Unlock(); messenger->shutdown(); // last thing! ceph_mon.cc will delete mon. mgr_messenger->shutdown(); } void Monitor::wait_for_paxos_write() { if (paxos->is_writing() || paxos->is_writing_previous()) { dout(10) << __func__ << " flushing pending write" << dendl; lock.Unlock(); store->flush(); lock.Lock(); dout(10) << __func__ << " flushed pending write" << dendl; } } void Monitor::bootstrap() { dout(10) << "bootstrap" << dendl; wait_for_paxos_write(); sync_reset_requester(); unregister_cluster_logger(); cancel_probe_timeout(); // note my rank int newrank = monmap->get_rank(messenger->get_myaddr()); if (newrank < 0 && rank >= 0) { // was i ever part of the quorum? if (has_ever_joined) { dout(0) << " removed from monmap, suicide." << dendl; exit(0); } } if (newrank != rank) { dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl; messenger->set_myname(entity_name_t::MON(newrank)); rank = newrank; // reset all connections, or else our peers will think we are someone else. messenger->mark_down_all(); } // reset state = STATE_PROBING; _reset(); // sync store if (g_conf->mon_compact_on_bootstrap) { dout(10) << "bootstrap -- triggering compaction" << dendl; store->compact(); dout(10) << "bootstrap -- finished compaction" << dendl; } // singleton monitor? if (monmap->size() == 1 && rank == 0) { win_standalone_election(); return; } reset_probe_timeout(); // i'm outside the quorum if (monmap->contains(name)) outside_quorum.insert(name); // probe monitors dout(10) << "probing other monitors" << dendl; for (unsigned i = 0; i < monmap->size(); i++) { if ((int)i != rank) messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), monmap->get_inst(i)); } for (set::iterator p = extra_probe_peers.begin(); p != extra_probe_peers.end(); ++p) { if (*p != messenger->get_myaddr()) { entity_inst_t i; i.name = entity_name_t::MON(-1); i.addr = *p; messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i); } } } bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss) { string addrstr; if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) { ss << "unable to parse address string value '" << cmd_vartype_stringify(cmdmap["addr"]) << "'"; return false; } dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '" << addrstr << "'" << dendl; entity_addr_t addr; const char *end = 0; if (!addr.parse(addrstr.c_str(), &end)) { ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'"; return false; } if (is_leader() || is_peon()) { ss << "mon already active; ignoring bootstrap hint"; return true; } if (addr.get_port() == 0) addr.set_port(CEPH_MON_PORT); extra_probe_peers.insert(addr); ss << "adding peer " << addr << " to list: " << extra_probe_peers; return true; } // called by bootstrap(), or on leader|peon -> electing void Monitor::_reset() { dout(10) << __func__ << dendl; cancel_probe_timeout(); timecheck_finish(); health_events_cleanup(); health_check_log_times.clear(); scrub_event_cancel(); leader_since = utime_t(); if (!quorum.empty()) { exited_quorum = ceph_clock_now(); } quorum.clear(); outside_quorum.clear(); quorum_feature_map.clear(); scrub_reset(); paxos->restart(); for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) (*p)->restart(); health_monitor->finish(); } // ----------------------------------------------------------- // sync set Monitor::get_sync_targets_names() { set targets; targets.insert(paxos->get_name()); for (int i = 0; i < PAXOS_NUM; ++i) paxos_service[i]->get_store_prefixes(targets); ConfigKeyService *config_key_service_ptr = dynamic_cast(config_key_service); assert(config_key_service_ptr); config_key_service_ptr->get_store_prefixes(targets); return targets; } void Monitor::sync_timeout() { dout(10) << __func__ << dendl; assert(state == STATE_SYNCHRONIZING); bootstrap(); } void Monitor::sync_obtain_latest_monmap(bufferlist &bl) { dout(1) << __func__ << dendl; MonMap latest_monmap; // Grab latest monmap from MonmapMonitor bufferlist monmon_bl; int err = monmon()->get_monmap(monmon_bl); if (err < 0) { if (err != -ENOENT) { derr << __func__ << " something wrong happened while reading the store: " << cpp_strerror(err) << dendl; assert(0 == "error reading the store"); } } else { latest_monmap.decode(monmon_bl); } // Grab last backed up monmap (if any) and compare epochs if (store->exists("mon_sync", "latest_monmap")) { bufferlist backup_bl; int err = store->get("mon_sync", "latest_monmap", backup_bl); if (err < 0) { derr << __func__ << " something wrong happened while reading the store: " << cpp_strerror(err) << dendl; assert(0 == "error reading the store"); } assert(backup_bl.length() > 0); MonMap backup_monmap; backup_monmap.decode(backup_bl); if (backup_monmap.epoch > latest_monmap.epoch) latest_monmap = backup_monmap; } // Check if our current monmap's epoch is greater than the one we've // got so far. if (monmap->epoch > latest_monmap.epoch) latest_monmap = *monmap; dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl; latest_monmap.encode(bl, CEPH_FEATURES_ALL); } void Monitor::sync_reset_requester() { dout(10) << __func__ << dendl; if (sync_timeout_event) { timer.cancel_event(sync_timeout_event); sync_timeout_event = NULL; } sync_provider = entity_inst_t(); sync_cookie = 0; sync_full = false; sync_start_version = 0; } void Monitor::sync_reset_provider() { dout(10) << __func__ << dendl; sync_providers.clear(); } void Monitor::sync_start(entity_inst_t &other, bool full) { dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl; assert(state == STATE_PROBING || state == STATE_SYNCHRONIZING); state = STATE_SYNCHRONIZING; // make sure are not a provider for anyone! sync_reset_provider(); sync_full = full; if (sync_full) { // stash key state, and mark that we are syncing auto t(std::make_shared()); sync_stash_critical_state(t); t->put("mon_sync", "in_sync", 1); sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version()); dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor " << sync_last_committed_floor << dendl; t->put("mon_sync", "last_committed_floor", sync_last_committed_floor); store->apply_transaction(t); assert(g_conf->mon_sync_requester_kill_at != 1); // clear the underlying store set targets = get_sync_targets_names(); dout(10) << __func__ << " clearing prefixes " << targets << dendl; store->clear(targets); // make sure paxos knows it has been reset. this prevents a // bootstrap and then different probe reply order from possibly // deciding a partial or no sync is needed. paxos->init(); assert(g_conf->mon_sync_requester_kill_at != 2); } // assume 'other' as the leader. We will update the leader once we receive // a reply to the sync start. sync_provider = other; sync_reset_timeout(); MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT); if (!sync_full) m->last_committed = paxos->get_version(); messenger->send_message(m, sync_provider); } void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t) { dout(10) << __func__ << dendl; bufferlist backup_monmap; sync_obtain_latest_monmap(backup_monmap); assert(backup_monmap.length() > 0); t->put("mon_sync", "latest_monmap", backup_monmap); } void Monitor::sync_reset_timeout() { dout(10) << __func__ << dendl; if (sync_timeout_event) timer.cancel_event(sync_timeout_event); sync_timeout_event = timer.add_event_after( g_conf->mon_sync_timeout, new C_MonContext(this, [this](int) { sync_timeout(); })); } void Monitor::sync_finish(version_t last_committed) { dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl; assert(g_conf->mon_sync_requester_kill_at != 7); if (sync_full) { // finalize the paxos commits auto tx(std::make_shared()); paxos->read_and_prepare_transactions(tx, sync_start_version, last_committed); tx->put(paxos->get_name(), "last_committed", last_committed); dout(30) << __func__ << " final tx dump:\n"; JSONFormatter f(true); tx->dump(&f); f.flush(*_dout); *_dout << dendl; store->apply_transaction(tx); } assert(g_conf->mon_sync_requester_kill_at != 8); auto t(std::make_shared()); t->erase("mon_sync", "in_sync"); t->erase("mon_sync", "force_sync"); t->erase("mon_sync", "last_committed_floor"); store->apply_transaction(t); assert(g_conf->mon_sync_requester_kill_at != 9); init_paxos(); assert(g_conf->mon_sync_requester_kill_at != 10); bootstrap(); } void Monitor::handle_sync(MonOpRequestRef op) { MMonSync *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; switch (m->op) { // provider --------- case MMonSync::OP_GET_COOKIE_FULL: case MMonSync::OP_GET_COOKIE_RECENT: handle_sync_get_cookie(op); break; case MMonSync::OP_GET_CHUNK: handle_sync_get_chunk(op); break; // client ----------- case MMonSync::OP_COOKIE: handle_sync_cookie(op); break; case MMonSync::OP_CHUNK: case MMonSync::OP_LAST_CHUNK: handle_sync_chunk(op); break; case MMonSync::OP_NO_COOKIE: handle_sync_no_cookie(op); break; default: dout(0) << __func__ << " unknown op " << m->op << dendl; assert(0 == "unknown op"); } } // leader void Monitor::_sync_reply_no_cookie(MonOpRequestRef op) { MMonSync *m = static_cast(op->get_req()); MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie); m->get_connection()->send_message(reply); } void Monitor::handle_sync_get_cookie(MonOpRequestRef op) { MMonSync *m = static_cast(op->get_req()); if (is_synchronizing()) { _sync_reply_no_cookie(op); return; } assert(g_conf->mon_sync_provider_kill_at != 1); // make sure they can understand us. if ((required_features ^ m->get_connection()->get_features()) & required_features) { dout(5) << " ignoring peer mon." << m->get_source().num() << " has features " << std::hex << m->get_connection()->get_features() << " but we require " << required_features << std::dec << dendl; return; } // make up a unique cookie. include election epoch (which persists // across restarts for the whole cluster) and a counter for this // process instance. there is no need to be unique *across* // monitors, though. uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count; assert(sync_providers.count(cookie) == 0); dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl; SyncProvider& sp = sync_providers[cookie]; sp.cookie = cookie; sp.entity = m->get_source_inst(); sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2); set sync_targets; if (m->op == MMonSync::OP_GET_COOKIE_FULL) { // full scan sync_targets = get_sync_targets_names(); sp.last_committed = paxos->get_version(); sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets); sp.full = true; dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl; } else { // just catch up paxos sp.last_committed = m->last_committed; } dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl; MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie); reply->last_committed = sp.last_committed; m->get_connection()->send_message(reply); } void Monitor::handle_sync_get_chunk(MonOpRequestRef op) { MMonSync *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; if (sync_providers.count(m->cookie) == 0) { dout(10) << __func__ << " no cookie " << m->cookie << dendl; _sync_reply_no_cookie(op); return; } assert(g_conf->mon_sync_provider_kill_at != 2); SyncProvider& sp = sync_providers[m->cookie]; sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2); if (sp.last_committed < paxos->get_first_committed() && paxos->get_first_committed() > 1) { dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed << " < our fc " << paxos->get_first_committed() << dendl; sync_providers.erase(m->cookie); _sync_reply_no_cookie(op); return; } MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie); auto tx(std::make_shared()); int left = g_conf->mon_sync_max_payload_size; while (sp.last_committed < paxos->get_version() && left > 0) { bufferlist bl; sp.last_committed++; int err = store->get(paxos->get_name(), sp.last_committed, bl); assert(err == 0); tx->put(paxos->get_name(), sp.last_committed, bl); left -= bl.length(); dout(20) << __func__ << " including paxos state " << sp.last_committed << dendl; } reply->last_committed = sp.last_committed; if (sp.full && left > 0) { sp.synchronizer->get_chunk_tx(tx, left); sp.last_key = sp.synchronizer->get_last_key(); reply->last_key = sp.last_key; } if ((sp.full && sp.synchronizer->has_next_chunk()) || sp.last_committed < paxos->get_version()) { dout(10) << __func__ << " chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl; } else { dout(10) << __func__ << " last chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl; reply->op = MMonSync::OP_LAST_CHUNK; assert(g_conf->mon_sync_provider_kill_at != 3); // clean up our local state sync_providers.erase(sp.cookie); } ::encode(*tx, reply->chunk_bl); m->get_connection()->send_message(reply); } // requester void Monitor::handle_sync_cookie(MonOpRequestRef op) { MMonSync *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; if (sync_cookie) { dout(10) << __func__ << " already have a cookie, ignoring" << dendl; return; } if (m->get_source_inst() != sync_provider) { dout(10) << __func__ << " source does not match, discarding" << dendl; return; } sync_cookie = m->cookie; sync_start_version = m->last_committed; sync_reset_timeout(); sync_get_next_chunk(); assert(g_conf->mon_sync_requester_kill_at != 3); } void Monitor::sync_get_next_chunk() { dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl; if (g_conf->mon_inject_sync_get_chunk_delay > 0) { dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl; usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0)); } MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie); messenger->send_message(r, sync_provider); assert(g_conf->mon_sync_requester_kill_at != 4); } void Monitor::handle_sync_chunk(MonOpRequestRef op) { MMonSync *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; if (m->cookie != sync_cookie) { dout(10) << __func__ << " cookie does not match, discarding" << dendl; return; } if (m->get_source_inst() != sync_provider) { dout(10) << __func__ << " source does not match, discarding" << dendl; return; } assert(state == STATE_SYNCHRONIZING); assert(g_conf->mon_sync_requester_kill_at != 5); auto tx(std::make_shared()); tx->append_from_encoded(m->chunk_bl); dout(30) << __func__ << " tx dump:\n"; JSONFormatter f(true); tx->dump(&f); f.flush(*_dout); *_dout << dendl; store->apply_transaction(tx); assert(g_conf->mon_sync_requester_kill_at != 6); if (!sync_full) { dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl; auto tx(std::make_shared()); paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1, m->last_committed); tx->put(paxos->get_name(), "last_committed", m->last_committed); dout(30) << __func__ << " tx dump:\n"; JSONFormatter f(true); tx->dump(&f); f.flush(*_dout); *_dout << dendl; store->apply_transaction(tx); paxos->init(); // to refresh what we just wrote } if (m->op == MMonSync::OP_CHUNK) { sync_reset_timeout(); sync_get_next_chunk(); } else if (m->op == MMonSync::OP_LAST_CHUNK) { sync_finish(m->last_committed); } } void Monitor::handle_sync_no_cookie(MonOpRequestRef op) { dout(10) << __func__ << dendl; bootstrap(); } void Monitor::sync_trim_providers() { dout(20) << __func__ << dendl; utime_t now = ceph_clock_now(); map::iterator p = sync_providers.begin(); while (p != sync_providers.end()) { if (now > p->second.timeout) { dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl; sync_providers.erase(p++); } else { ++p; } } } // --------------------------------------------------- // probe void Monitor::cancel_probe_timeout() { if (probe_timeout_event) { dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl; timer.cancel_event(probe_timeout_event); probe_timeout_event = NULL; } else { dout(10) << "cancel_probe_timeout (none scheduled)" << dendl; } } void Monitor::reset_probe_timeout() { cancel_probe_timeout(); probe_timeout_event = new C_MonContext(this, [this](int r) { probe_timeout(r); }); double t = g_conf->mon_probe_timeout; if (timer.add_event_after(t, probe_timeout_event)) { dout(10) << "reset_probe_timeout " << probe_timeout_event << " after " << t << " seconds" << dendl; } else { probe_timeout_event = nullptr; } } void Monitor::probe_timeout(int r) { dout(4) << "probe_timeout " << probe_timeout_event << dendl; assert(is_probing() || is_synchronizing()); assert(probe_timeout_event); probe_timeout_event = NULL; bootstrap(); } void Monitor::handle_probe(MonOpRequestRef op) { MMonProbe *m = static_cast(op->get_req()); dout(10) << "handle_probe " << *m << dendl; if (m->fsid != monmap->fsid) { dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl; return; } switch (m->op) { case MMonProbe::OP_PROBE: handle_probe_probe(op); break; case MMonProbe::OP_REPLY: handle_probe_reply(op); break; case MMonProbe::OP_MISSING_FEATURES: derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL << ", required " << m->required_features << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) << dendl; break; } } void Monitor::handle_probe_probe(MonOpRequestRef op) { MMonProbe *m = static_cast(op->get_req()); dout(10) << "handle_probe_probe " << m->get_source_inst() << *m << " features " << m->get_connection()->get_features() << dendl; uint64_t missing = required_features & ~m->get_connection()->get_features(); if (missing) { dout(1) << " peer " << m->get_source_addr() << " missing features " << missing << dendl; if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) { MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES, name, has_ever_joined); m->required_features = required_features; m->get_connection()->send_message(r); } goto out; } if (!is_probing() && !is_synchronizing()) { // If the probing mon is way ahead of us, we need to re-bootstrap. // Normally we capture this case when we initially bootstrap, but // it is possible we pass those checks (we overlap with // quorum-to-be) but fail to join a quorum before it moves past // us. We need to be kicked back to bootstrap so we can // synchonize, not keep calling elections. if (paxos->get_version() + 1 < m->paxos_first_version) { dout(1) << " peer " << m->get_source_addr() << " has first_committed " << "ahead of us, re-bootstrapping" << dendl; bootstrap(); goto out; } } MMonProbe *r; r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined); r->name = name; r->quorum = quorum; monmap->encode(r->monmap_bl, m->get_connection()->get_features()); r->paxos_first_version = paxos->get_first_committed(); r->paxos_last_version = paxos->get_version(); m->get_connection()->send_message(r); // did we discover a peer here? if (!monmap->contains(m->get_source_addr())) { dout(1) << " adding peer " << m->get_source_addr() << " to list of hints" << dendl; extra_probe_peers.insert(m->get_source_addr()); } out: return; } void Monitor::handle_probe_reply(MonOpRequestRef op) { MMonProbe *m = static_cast(op->get_req()); dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl; dout(10) << " monmap is " << *monmap << dendl; // discover name and addrs during probing or electing states. if (!is_probing() && !is_electing()) { return; } // newer map, or they've joined a quorum and we haven't? bufferlist mybl; monmap->encode(mybl, m->get_connection()->get_features()); // make sure it's actually different; the checks below err toward // taking the other guy's map, which could cause us to loop. if (!mybl.contents_equal(m->monmap_bl)) { MonMap *newmap = new MonMap; newmap->decode(m->monmap_bl); if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() || !has_ever_joined)) { dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch() << ", mine was " << monmap->get_epoch() << dendl; delete newmap; monmap->decode(m->monmap_bl); bootstrap(); return; } delete newmap; } // rename peer? string peer_name = monmap->get_name(m->get_source_addr()); if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) { dout(10) << " renaming peer " << m->get_source_addr() << " " << peer_name << " -> " << m->name << " in my monmap" << dendl; monmap->rename(peer_name, m->name); if (is_electing()) { bootstrap(); return; } } else { dout(10) << " peer name is " << peer_name << dendl; } // new initial peer? if (monmap->get_epoch() == 0 && monmap->contains(m->name) && monmap->get_addr(m->name).is_blank_ip()) { dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl; monmap->set_addr(m->name, m->get_source_addr()); bootstrap(); return; } // end discover phase if (!is_probing()) { return; } assert(paxos != NULL); if (is_synchronizing()) { dout(10) << " currently syncing" << dendl; return; } entity_inst_t other = m->get_source_inst(); if (m->paxos_last_version < sync_last_committed_floor) { dout(10) << " peer paxos versions [" << m->paxos_first_version << "," << m->paxos_last_version << "] < my sync_last_committed_floor " << sync_last_committed_floor << ", ignoring" << dendl; } else { if (paxos->get_version() < m->paxos_first_version && m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1. dout(10) << " peer paxos first versions [" << m->paxos_first_version << "," << m->paxos_last_version << "]" << " vs my version " << paxos->get_version() << " (too far ahead)" << dendl; cancel_probe_timeout(); sync_start(other, true); return; } if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) { dout(10) << " peer paxos last version " << m->paxos_last_version << " vs my version " << paxos->get_version() << " (too far ahead)" << dendl; cancel_probe_timeout(); sync_start(other, false); return; } } // is there an existing quorum? if (m->quorum.size()) { dout(10) << " existing quorum " << m->quorum << dendl; dout(10) << " peer paxos version " << m->paxos_last_version << " vs my version " << paxos->get_version() << " (ok)" << dendl; if (monmap->contains(name) && !monmap->get_addr(name).is_blank_ip()) { // i'm part of the cluster; just initiate a new election start_election(); } else { dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl; messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()), monmap->get_inst(*m->quorum.begin())); } } else { if (monmap->contains(m->name)) { dout(10) << " mon." << m->name << " is outside the quorum" << dendl; outside_quorum.insert(m->name); } else { dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl; return; } unsigned need = monmap->size() / 2 + 1; dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl; if (outside_quorum.size() >= need) { if (outside_quorum.count(name)) { dout(10) << " that's enough to form a new quorum, calling election" << dendl; start_election(); } else { dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl; } } else { dout(10) << " that's not yet enough for a new quorum, waiting" << dendl; } } } void Monitor::join_election() { dout(10) << __func__ << dendl; wait_for_paxos_write(); _reset(); state = STATE_ELECTING; logger->inc(l_mon_num_elections); } void Monitor::start_election() { dout(10) << "start_election" << dendl; wait_for_paxos_write(); _reset(); state = STATE_ELECTING; logger->inc(l_mon_num_elections); logger->inc(l_mon_election_call); clog->info() << "mon." << name << " calling new monitor election"; elector.call_election(); } void Monitor::win_standalone_election() { dout(1) << "win_standalone_election" << dendl; // bump election epoch, in case the previous epoch included other // monitors; we need to be able to make the distinction. elector.init(); elector.advance_epoch(); rank = monmap->get_rank(name); assert(rank == 0); set q; q.insert(rank); map metadata; collect_metadata(&metadata[0]); win_election(elector.get_epoch(), q, CEPH_FEATURES_ALL, ceph::features::mon::get_supported(), metadata); } const utime_t& Monitor::get_leader_since() const { assert(state == STATE_LEADER); return leader_since; } epoch_t Monitor::get_epoch() { return elector.get_epoch(); } void Monitor::_finish_svc_election() { assert(state == STATE_LEADER || state == STATE_PEON); for (auto p : paxos_service) { // we already called election_finished() on monmon(); avoid callig twice if (state == STATE_LEADER && p == monmon()) continue; p->election_finished(); } } void Monitor::win_election(epoch_t epoch, set& active, uint64_t features, const mon_feature_t& mon_features, const map& metadata) { dout(10) << __func__ << " epoch " << epoch << " quorum " << active << " features " << features << " mon_features " << mon_features << dendl; assert(is_electing()); state = STATE_LEADER; leader_since = ceph_clock_now(); leader = rank; quorum = active; quorum_con_features = features; quorum_mon_features = mon_features; pending_metadata = metadata; outside_quorum.clear(); clog->info() << "mon." << name << "@" << rank << " won leader election with quorum " << quorum; set_leader_commands(get_local_commands(mon_features)); paxos->leader_init(); // NOTE: tell monmap monitor first. This is important for the // bootstrap case to ensure that the very first paxos proposal // codifies the monmap. Otherwise any manner of chaos can ensue // when monitors are call elections or participating in a paxos // round without agreeing on who the participants are. monmon()->election_finished(); _finish_svc_election(); health_monitor->start(epoch); logger->inc(l_mon_election_win); // inject new metadata in first transaction. { // include previous metadata for missing mons (that aren't part of // the current quorum). map m = metadata; for (unsigned rank = 0; rank < monmap->size(); ++rank) { if (m.count(rank) == 0 && mon_metadata.count(rank)) { m[rank] = mon_metadata[rank]; } } // FIXME: This is a bit sloppy because we aren't guaranteed to submit // a new transaction immediately after the election finishes. We should // do that anyway for other reasons, though. MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); bufferlist bl; ::encode(m, bl); t->put(MONITOR_STORE_PREFIX, "last_metadata", bl); } finish_election(); if (monmap->size() > 1 && monmap->get_epoch() > 0) { timecheck_start(); health_tick_start(); do_health_to_clog_interval(); scrub_event_start(); } } void Monitor::lose_election(epoch_t epoch, set &q, int l, uint64_t features, const mon_feature_t& mon_features) { state = STATE_PEON; leader_since = utime_t(); leader = l; quorum = q; outside_quorum.clear(); quorum_con_features = features; quorum_mon_features = mon_features; dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader << " quorum is " << quorum << " features are " << quorum_con_features << " mon_features are " << quorum_mon_features << dendl; paxos->peon_init(); _finish_svc_election(); health_monitor->start(epoch); logger->inc(l_mon_election_lose); finish_election(); if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) && !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) { // for pre-luminous mons only Metadata sys_info; collect_metadata(&sys_info); messenger->send_message(new MMonMetadata(sys_info), monmap->get_inst(get_leader())); } } void Monitor::collect_metadata(Metadata *m) { collect_sys_info(m, g_ceph_context); (*m)["addr"] = stringify(messenger->get_myaddr()); } void Monitor::finish_election() { apply_quorum_to_compatset_features(); apply_monmap_to_compatset_features(); timecheck_finish(); exited_quorum = utime_t(); finish_contexts(g_ceph_context, waitfor_quorum); finish_contexts(g_ceph_context, maybe_wait_for_quorum); resend_routed_requests(); update_logger(); register_cluster_logger(); // am i named properly? string cur_name = monmap->get_name(messenger->get_myaddr()); if (cur_name != name) { dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl; messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()), monmap->get_inst(*quorum.begin())); } } void Monitor::_apply_compatset_features(CompatSet &new_features) { if (new_features.compare(features) != 0) { CompatSet diff = features.unsupported(new_features); dout(1) << __func__ << " enabling new quorum features: " << diff << dendl; features = new_features; auto t = std::make_shared(); write_features(t); store->apply_transaction(t); calc_quorum_requirements(); } } void Monitor::apply_quorum_to_compatset_features() { CompatSet new_features(features); if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) { new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES); } if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) { new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC); } if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) { new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2); } if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) { new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3); } dout(5) << __func__ << dendl; _apply_compatset_features(new_features); } void Monitor::apply_monmap_to_compatset_features() { CompatSet new_features(features); mon_feature_t monmap_features = monmap->get_required_features(); /* persistent monmap features may go into the compatset. * optional monmap features may not - why? * because optional monmap features may be set/unset by the admin, * and possibly by other means that haven't yet been thought out, * so we can't make the monitor enforce them on start - because they * may go away. * this, of course, does not invalidate setting a compatset feature * for an optional feature - as long as you make sure to clean it up * once you unset it. */ if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) { assert(ceph::features::mon::get_persistent().contains_all( ceph::features::mon::FEATURE_KRAKEN)); // this feature should only ever be set if the quorum supports it. assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN)); new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN); } if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) { assert(ceph::features::mon::get_persistent().contains_all( ceph::features::mon::FEATURE_LUMINOUS)); // this feature should only ever be set if the quorum supports it. assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)); new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS); } dout(5) << __func__ << dendl; _apply_compatset_features(new_features); } void Monitor::calc_quorum_requirements() { required_features = 0; // compatset if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) { required_features |= CEPH_FEATURE_OSD_ERASURE_CODES; } if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) { required_features |= CEPH_FEATURE_OSDMAP_ENC; } if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) { required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2; } if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) { required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3; } if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) { required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN; } if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) { required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS; } // monmap if (monmap->get_required_features().contains_all( ceph::features::mon::FEATURE_KRAKEN)) { required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN; } if (monmap->get_required_features().contains_all( ceph::features::mon::FEATURE_LUMINOUS)) { required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS; } dout(10) << __func__ << " required_features " << required_features << dendl; } void Monitor::get_combined_feature_map(FeatureMap *fm) { *fm += session_map.feature_map; for (auto id : quorum) { if (id != rank) { *fm += quorum_feature_map[id]; } } } void Monitor::sync_force(Formatter *f, ostream& ss) { bool free_formatter = false; if (!f) { // louzy/lazy hack: default to json if no formatter has been defined f = new JSONFormatter(); free_formatter = true; } auto tx(std::make_shared()); sync_stash_critical_state(tx); tx->put("mon_sync", "force_sync", 1); store->apply_transaction(tx); f->open_object_section("sync_force"); f->dump_int("ret", 0); f->dump_stream("msg") << "forcing store sync the next time the monitor starts"; f->close_section(); // sync_force f->flush(ss); if (free_formatter) delete f; } void Monitor::_quorum_status(Formatter *f, ostream& ss) { bool free_formatter = false; if (!f) { // louzy/lazy hack: default to json if no formatter has been defined f = new JSONFormatter(); free_formatter = true; } f->open_object_section("quorum_status"); f->dump_int("election_epoch", get_epoch()); f->open_array_section("quorum"); for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) f->dump_int("mon", *p); f->close_section(); // quorum list quorum_names = get_quorum_names(); f->open_array_section("quorum_names"); for (list::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p) f->dump_string("mon", *p); f->close_section(); // quorum_names f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin())); f->open_object_section("monmap"); monmap->dump(f); f->close_section(); // monmap f->close_section(); // quorum_status f->flush(ss); if (free_formatter) delete f; } void Monitor::get_mon_status(Formatter *f, ostream& ss) { bool free_formatter = false; if (!f) { // louzy/lazy hack: default to json if no formatter has been defined f = new JSONFormatter(); free_formatter = true; } f->open_object_section("mon_status"); f->dump_string("name", name); f->dump_int("rank", rank); f->dump_string("state", get_state_name()); f->dump_int("election_epoch", get_epoch()); f->open_array_section("quorum"); for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) { f->dump_int("mon", *p); } f->close_section(); // quorum f->open_object_section("features"); f->dump_stream("required_con") << required_features; mon_feature_t req_mon_features = get_required_mon_features(); req_mon_features.dump(f, "required_mon"); f->dump_stream("quorum_con") << quorum_con_features; quorum_mon_features.dump(f, "quorum_mon"); f->close_section(); // features f->open_array_section("outside_quorum"); for (set::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p) f->dump_string("mon", *p); f->close_section(); // outside_quorum f->open_array_section("extra_probe_peers"); for (set::iterator p = extra_probe_peers.begin(); p != extra_probe_peers.end(); ++p) f->dump_stream("peer") << *p; f->close_section(); // extra_probe_peers f->open_array_section("sync_provider"); for (map::const_iterator p = sync_providers.begin(); p != sync_providers.end(); ++p) { f->dump_unsigned("cookie", p->second.cookie); f->dump_stream("entity") << p->second.entity; f->dump_stream("timeout") << p->second.timeout; f->dump_unsigned("last_committed", p->second.last_committed); f->dump_stream("last_key") << p->second.last_key; } f->close_section(); if (is_synchronizing()) { f->open_object_section("sync"); f->dump_stream("sync_provider") << sync_provider; f->dump_unsigned("sync_cookie", sync_cookie); f->dump_unsigned("sync_start_version", sync_start_version); f->close_section(); } if (g_conf->mon_sync_provider_kill_at > 0) f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at); if (g_conf->mon_sync_requester_kill_at > 0) f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at); f->open_object_section("monmap"); monmap->dump(f); f->close_section(); f->dump_object("feature_map", session_map.feature_map); f->close_section(); // mon_status if (free_formatter) { // flush formatter to ss and delete it iff we created the formatter f->flush(ss); delete f; } } // health status to clog void Monitor::health_tick_start() { if (!cct->_conf->mon_health_to_clog || cct->_conf->mon_health_to_clog_tick_interval <= 0) return; dout(15) << __func__ << dendl; health_tick_stop(); health_tick_event = timer.add_event_after( cct->_conf->mon_health_to_clog_tick_interval, new C_MonContext(this, [this](int r) { if (r < 0) return; do_health_to_clog(); health_tick_start(); })); } void Monitor::health_tick_stop() { dout(15) << __func__ << dendl; if (health_tick_event) { timer.cancel_event(health_tick_event); health_tick_event = NULL; } } utime_t Monitor::health_interval_calc_next_update() { utime_t now = ceph_clock_now(); time_t secs = now.sec(); int remainder = secs % cct->_conf->mon_health_to_clog_interval; int adjustment = cct->_conf->mon_health_to_clog_interval - remainder; utime_t next = utime_t(secs + adjustment, 0); dout(20) << __func__ << " now: " << now << "," << " next: " << next << "," << " interval: " << cct->_conf->mon_health_to_clog_interval << dendl; return next; } void Monitor::health_interval_start() { dout(15) << __func__ << dendl; if (!cct->_conf->mon_health_to_clog || cct->_conf->mon_health_to_clog_interval <= 0) { return; } health_interval_stop(); utime_t next = health_interval_calc_next_update(); health_interval_event = new C_MonContext(this, [this](int r) { if (r < 0) return; do_health_to_clog_interval(); }); if (!timer.add_event_at(next, health_interval_event)) { health_interval_event = nullptr; } } void Monitor::health_interval_stop() { dout(15) << __func__ << dendl; if (health_interval_event) { timer.cancel_event(health_interval_event); } health_interval_event = NULL; } void Monitor::health_events_cleanup() { health_tick_stop(); health_interval_stop(); health_status_cache.reset(); } void Monitor::health_to_clog_update_conf(const std::set &changed) { dout(20) << __func__ << dendl; if (changed.count("mon_health_to_clog")) { if (!cct->_conf->mon_health_to_clog) { health_events_cleanup(); } else { if (!health_tick_event) { health_tick_start(); } if (!health_interval_event) { health_interval_start(); } } } if (changed.count("mon_health_to_clog_interval")) { if (cct->_conf->mon_health_to_clog_interval <= 0) { health_interval_stop(); } else { health_interval_start(); } } if (changed.count("mon_health_to_clog_tick_interval")) { if (cct->_conf->mon_health_to_clog_tick_interval <= 0) { health_tick_stop(); } else { health_tick_start(); } } } void Monitor::do_health_to_clog_interval() { // outputting to clog may have been disabled in the conf // since we were scheduled. if (!cct->_conf->mon_health_to_clog || cct->_conf->mon_health_to_clog_interval <= 0) return; dout(10) << __func__ << dendl; // do we have a cached value for next_clog_update? if not, // do we know when the last update was? do_health_to_clog(true); health_interval_start(); } void Monitor::do_health_to_clog(bool force) { // outputting to clog may have been disabled in the conf // since we were scheduled. if (!cct->_conf->mon_health_to_clog || cct->_conf->mon_health_to_clog_interval <= 0) return; dout(10) << __func__ << (force ? " (force)" : "") << dendl; if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { string summary; health_status_t level = get_health_status(false, nullptr, &summary); if (!force && summary == health_status_cache.summary && level == health_status_cache.overall) return; clog->health(level) << "overall " << summary; health_status_cache.summary = summary; health_status_cache.overall = level; } else { // for jewel only list status; health_status_t overall = get_health(status, NULL, NULL); dout(25) << __func__ << (force ? " (force)" : "") << dendl; string summary = joinify(status.begin(), status.end(), string("; ")); if (!force && overall == health_status_cache.overall && !health_status_cache.summary.empty() && health_status_cache.summary == summary) { // we got a dup! return; } clog->info() << summary; health_status_cache.overall = overall; health_status_cache.summary = summary; } } health_status_t Monitor::get_health_status( bool want_detail, Formatter *f, std::string *plain, const char *sep1, const char *sep2) { health_status_t r = HEALTH_OK; bool compat = g_conf->mon_health_preluminous_compat; bool compat_warn = g_conf->get_val("mon_health_preluminous_compat_warning"); if (f) { f->open_object_section("health"); f->open_object_section("checks"); } string summary; string *psummary = f ? nullptr : &summary; for (auto& svc : paxos_service) { r = std::min(r, svc->get_health_checks().dump_summary( f, psummary, sep2, want_detail)); } if (f) { f->close_section(); f->dump_stream("status") << r; } else { // one-liner: HEALTH_FOO[ thing1[; thing2 ...]] *plain = stringify(r); if (summary.size()) { *plain += sep1; *plain += summary; } *plain += "\n"; } const std::string old_fields_message = "'ceph health' JSON format has " "changed in luminous. If you see this your monitoring system is " "scraping the wrong fields. Disable this with 'mon health preluminous " "compat warning = false'"; if (f && (compat || compat_warn)) { health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r; f->open_array_section("summary"); if (compat_warn) { f->open_object_section("item"); f->dump_stream("severity") << HEALTH_WARN; f->dump_string("summary", old_fields_message); f->close_section(); } if (compat) { for (auto& svc : paxos_service) { svc->get_health_checks().dump_summary_compat(f); } } f->close_section(); f->dump_stream("overall_status") << cr; } if (want_detail) { if (f && (compat || compat_warn)) { f->open_array_section("detail"); if (compat_warn) { f->dump_string("item", old_fields_message); } } for (auto& svc : paxos_service) { svc->get_health_checks().dump_detail(f, plain, compat); } if (f && (compat || compat_warn)) { f->close_section(); } } if (f) { f->close_section(); } return r; } void Monitor::log_health( const health_check_map_t& updated, const health_check_map_t& previous, MonitorDBStore::TransactionRef t) { if (!g_conf->mon_health_to_clog) { return; } const utime_t now = ceph_clock_now(); // FIXME: log atomically as part of @t instead of using clog. dout(10) << __func__ << " updated " << updated.checks.size() << " previous " << previous.checks.size() << dendl; const auto min_log_period = g_conf->get_val( "mon_health_log_update_period"); for (auto& p : updated.checks) { auto q = previous.checks.find(p.first); bool logged = false; if (q == previous.checks.end()) { // new ostringstream ss; ss << "Health check failed: " << p.second.summary << " (" << p.first << ")"; clog->health(p.second.severity) << ss.str(); logged = true; } else { if (p.second.summary != q->second.summary || p.second.severity != q->second.severity) { auto status_iter = health_check_log_times.find(p.first); if (status_iter != health_check_log_times.end()) { if (p.second.severity == q->second.severity && now - status_iter->second.updated_at < min_log_period) { // We already logged this recently and the severity is unchanged, // so skip emitting an update of the summary string. // We'll get an update out of tick() later if the check // is still failing. continue; } } // summary or severity changed (ignore detail changes at this level) ostringstream ss; ss << "Health check update: " << p.second.summary << " (" << p.first << ")"; clog->health(p.second.severity) << ss.str(); logged = true; } } // Record the time at which we last logged, so that we can check this // when considering whether/when to print update messages. if (logged) { auto iter = health_check_log_times.find(p.first); if (iter == health_check_log_times.end()) { health_check_log_times.emplace(p.first, HealthCheckLogStatus( p.second.severity, p.second.summary, now)); } else { iter->second = HealthCheckLogStatus( p.second.severity, p.second.summary, now); } } } for (auto& p : previous.checks) { if (!updated.checks.count(p.first)) { // cleared ostringstream ss; if (p.first == "DEGRADED_OBJECTS") { clog->info() << "All degraded objects recovered"; } else if (p.first == "OSD_FLAGS") { clog->info() << "OSD flags cleared"; } else { clog->info() << "Health check cleared: " << p.first << " (was: " << p.second.summary << ")"; } if (health_check_log_times.count(p.first)) { health_check_log_times.erase(p.first); } } } if (previous.checks.size() && updated.checks.size() == 0) { // We might be going into a fully healthy state, check // other subsystems bool any_checks = false; for (auto& svc : paxos_service) { if (&(svc->get_health_checks()) == &(previous)) { // Ignore the ones we're clearing right now continue; } if (svc->get_health_checks().checks.size() > 0) { any_checks = true; break; } } if (!any_checks) { clog->info() << "Cluster is now healthy"; } } } health_status_t Monitor::get_health(list& status, bufferlist *detailbl, Formatter *f) { list > summary; list > detail; if (f) f->open_object_section("health"); for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) { PaxosService *s = *p; s->get_health(summary, detailbl ? &detail : NULL, cct); } health_monitor->get_health(summary, (detailbl ? &detail : NULL)); health_status_t overall = HEALTH_OK; if (!timecheck_skews.empty()) { list warns; for (map::iterator i = timecheck_skews.begin(); i != timecheck_skews.end(); ++i) { entity_inst_t inst = i->first; double skew = i->second; double latency = timecheck_latencies[inst]; string name = monmap->get_name(inst.addr); ostringstream tcss; health_status_t tcstatus = timecheck_status(tcss, skew, latency); if (tcstatus != HEALTH_OK) { if (overall > tcstatus) overall = tcstatus; warns.push_back(name); ostringstream tmp_ss; tmp_ss << "mon." << name << " addr " << inst.addr << " " << tcss.str() << " (latency " << latency << "s)"; detail.push_back(make_pair(tcstatus, tmp_ss.str())); } } if (!warns.empty()) { ostringstream ss; ss << "clock skew detected on"; while (!warns.empty()) { ss << " mon." << warns.front(); warns.pop_front(); if (!warns.empty()) ss << ","; } status.push_back(ss.str()); summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected ")); } } if (f) f->open_array_section("summary"); if (!summary.empty()) { while (!summary.empty()) { if (overall > summary.front().first) overall = summary.front().first; status.push_back(summary.front().second); if (f) { f->open_object_section("item"); f->dump_stream("severity") << summary.front().first; f->dump_string("summary", summary.front().second); f->close_section(); } summary.pop_front(); } } if (f) f->close_section(); stringstream fss; fss << overall; status.push_front(fss.str()); if (f) f->dump_stream("overall_status") << overall; if (f) f->open_array_section("detail"); while (!detail.empty()) { if (f) f->dump_string("item", detail.front().second); else if (detailbl != NULL) { detailbl->append(detail.front().second); detailbl->append('\n'); } detail.pop_front(); } if (f) f->close_section(); if (f) f->close_section(); return overall; } void Monitor::get_cluster_status(stringstream &ss, Formatter *f) { if (f) f->open_object_section("status"); if (f) { f->dump_stream("fsid") << monmap->get_fsid(); if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { get_health_status(false, f, nullptr); } else { list health_str; get_health(health_str, nullptr, f); } f->dump_unsigned("election_epoch", get_epoch()); { f->open_array_section("quorum"); for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) f->dump_int("rank", *p); f->close_section(); f->open_array_section("quorum_names"); for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) f->dump_string("id", monmap->get_name(*p)); f->close_section(); } f->open_object_section("monmap"); monmap->dump(f); f->close_section(); f->open_object_section("osdmap"); osdmon()->osdmap.print_summary(f, cout, string(12, ' ')); f->close_section(); f->open_object_section("pgmap"); pgservice->print_summary(f, NULL); f->close_section(); f->open_object_section("fsmap"); mdsmon()->get_fsmap().print_summary(f, NULL); f->close_section(); f->open_object_section("mgrmap"); mgrmon()->get_map().print_summary(f, nullptr); f->close_section(); f->dump_object("servicemap", mgrstatmon()->get_service_map()); f->close_section(); } else { ss << " cluster:\n"; ss << " id: " << monmap->get_fsid() << "\n"; string health; if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { get_health_status(false, nullptr, &health, "\n ", "\n "); } else { list ls; get_health(ls, NULL, f); health = joinify(ls.begin(), ls.end(), string("\n ")); } ss << " health: " << health << "\n"; ss << "\n \n services:\n"; { size_t maxlen = 3; auto& service_map = mgrstatmon()->get_service_map(); for (auto& p : service_map.services) { maxlen = std::max(maxlen, p.first.size()); } string spacing(maxlen - 3, ' '); const auto quorum_names = get_quorum_names(); const auto mon_count = monmap->mon_info.size(); ss << " mon: " << spacing << mon_count << " daemons, quorum " << quorum_names; if (quorum_names.size() != mon_count) { std::list out_of_q; for (size_t i = 0; i < monmap->ranks.size(); ++i) { if (quorum.count(i) == 0) { out_of_q.push_back(monmap->ranks[i]); } } ss << ", out of quorum: " << joinify(out_of_q.begin(), out_of_q.end(), std::string(", ")); } ss << "\n"; if (mgrmon()->in_use()) { ss << " mgr: " << spacing; mgrmon()->get_map().print_summary(nullptr, &ss); ss << "\n"; } if (mdsmon()->get_fsmap().filesystem_count() > 0) { ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n"; } ss << " osd: " << spacing; osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' ')); ss << "\n"; for (auto& p : service_map.services) { ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ') << p.second.get_summary() << "\n"; } } ss << "\n \n data:\n"; pgservice->print_summary(NULL, &ss); ss << "\n "; } } void Monitor::_generate_command_map(map& cmdmap, map ¶m_str_map) { for (map::const_iterator p = cmdmap.begin(); p != cmdmap.end(); ++p) { if (p->first == "prefix") continue; if (p->first == "caps") { vector cv; if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) && cv.size() % 2 == 0) { for (unsigned i = 0; i < cv.size(); i += 2) { string k = string("caps_") + cv[i]; param_str_map[k] = cv[i + 1]; } continue; } } param_str_map[p->first] = cmd_vartype_stringify(p->second); } } const MonCommand *Monitor::_get_moncommand( const string &cmd_prefix, const vector& cmds) { for (auto& c : cmds) { if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) { return &c; } } return nullptr; } bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix, const map& cmdmap, const map& param_str_map, const MonCommand *this_cmd) { bool cmd_r = this_cmd->requires_perm('r'); bool cmd_w = this_cmd->requires_perm('w'); bool cmd_x = this_cmd->requires_perm('x'); bool capable = s->caps.is_capable( g_ceph_context, CEPH_ENTITY_TYPE_MON, s->entity_name, module, prefix, param_str_map, cmd_r, cmd_w, cmd_x); dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl; return capable; } void Monitor::format_command_descriptions(const std::vector &commands, Formatter *f, bufferlist *rdata, bool hide_mgr_flag) { int cmdnum = 0; f->open_object_section("command_descriptions"); for (const auto &cmd : commands) { unsigned flags = cmd.flags; if (hide_mgr_flag) { flags &= ~MonCommand::FLAG_MGR; } ostringstream secname; secname << "cmd" << setfill('0') << std::setw(3) << cmdnum; dump_cmddesc_to_json(f, secname.str(), cmd.cmdstring, cmd.helpstring, cmd.module, cmd.req_perms, cmd.availability, flags); cmdnum++; } f->close_section(); // command_descriptions f->flush(*rdata); } bool Monitor::is_keyring_required() { string auth_cluster_required = g_conf->auth_supported.empty() ? g_conf->auth_cluster_required : g_conf->auth_supported; string auth_service_required = g_conf->auth_supported.empty() ? g_conf->auth_service_required : g_conf->auth_supported; return auth_service_required == "cephx" || auth_cluster_required == "cephx"; } struct C_MgrProxyCommand : public Context { Monitor *mon; MonOpRequestRef op; uint64_t size; bufferlist outbl; string outs; C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s) : mon(mon), op(op), size(s) { } void finish(int r) { Mutex::Locker l(mon->lock); mon->mgr_proxy_bytes -= size; mon->reply_command(op, r, outs, outbl, 0); } }; void Monitor::handle_command(MonOpRequestRef op) { assert(op->is_type_command()); MMonCommand *m = static_cast(op->get_req()); if (m->fsid != monmap->fsid) { dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl; reply_command(op, -EPERM, "wrong fsid", 0); return; } MonSession *session = static_cast( m->get_connection()->get_priv()); if (!session) { dout(5) << __func__ << " dropping stray message " << *m << dendl; return; } BOOST_SCOPE_EXIT_ALL(=) { session->put(); }; if (m->cmd.empty()) { string rs = "No command supplied"; reply_command(op, -EINVAL, rs, 0); return; } string prefix; vector fullcmd; map cmdmap; stringstream ss, ds; bufferlist rdata; string rs; int r = -EINVAL; rs = "unrecognized command"; if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { // ss has reason for failure r = -EINVAL; rs = ss.str(); if (!m->get_source().is_mon()) // don't reply to mon->mon commands reply_command(op, r, rs, 0); return; } // check return value. If no prefix parameter provided, // return value will be false, then return error info. if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) { reply_command(op, -EINVAL, "command prefix not found", 0); return; } // check prefix is empty if (prefix.empty()) { reply_command(op, -EINVAL, "command prefix must not be empty", 0); return; } if (prefix == "get_command_descriptions") { bufferlist rdata; Formatter *f = Formatter::create("json"); // hide mgr commands until luminous upgrade is complete bool hide_mgr_flag = osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS; std::vector commands; // only include mgr commands once all mons are upgrade (and we've dropped // the hard-coded PGMonitor commands) if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) { commands = static_cast( paxos_service[PAXOS_MGR])->get_command_descs(); } for (auto& c : leader_mon_commands) { commands.push_back(c); } format_command_descriptions(commands, f, &rdata, hide_mgr_flag); delete f; reply_command(op, 0, "", rdata, 0); return; } string module; string err; dout(0) << "handle_command " << *m << dendl; string format; cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain")); boost::scoped_ptr f(Formatter::create(format)); get_str_vec(prefix, fullcmd); // make sure fullcmd is not empty. // invalid prefix will cause empty vector fullcmd. // such as, prefix=";,,;" if (fullcmd.empty()) { reply_command(op, -EINVAL, "command requires a prefix to be valid", 0); return; } module = fullcmd[0]; // validate command is in leader map const MonCommand *leader_cmd; const auto& mgr_cmds = mgrmon()->get_command_descs(); const MonCommand *mgr_cmd = nullptr; if (!mgr_cmds.empty()) { mgr_cmd = _get_moncommand(prefix, mgr_cmds); } leader_cmd = _get_moncommand(prefix, leader_mon_commands); if (!leader_cmd) { leader_cmd = mgr_cmd; if (!leader_cmd) { reply_command(op, -EINVAL, "command not known", 0); return; } } // validate command is in our map & matches, or forward if it is allowed const MonCommand *mon_cmd = _get_moncommand( prefix, get_local_commands(quorum_mon_features)); if (!mon_cmd) { mon_cmd = mgr_cmd; } if (!is_leader()) { if (!mon_cmd) { if (leader_cmd->is_noforward()) { reply_command(op, -EINVAL, "command not locally supported and not allowed to forward", 0); return; } dout(10) << "Command not locally supported, forwarding request " << m << dendl; forward_request_leader(op); return; } else if (!mon_cmd->is_compat(leader_cmd)) { if (mon_cmd->is_noforward()) { reply_command(op, -EINVAL, "command not compatible with leader and not allowed to forward", 0); return; } dout(10) << "Command not compatible with leader, forwarding request " << m << dendl; forward_request_leader(op); return; } } if (mon_cmd->is_obsolete() || (cct->_conf->mon_debug_deprecated_as_obsolete && mon_cmd->is_deprecated())) { reply_command(op, -ENOTSUP, "command is obsolete; please check usage and/or man page", 0); return; } if (session->proxy_con && mon_cmd->is_noforward()) { dout(10) << "Got forward for noforward command " << m << dendl; reply_command(op, -EINVAL, "forward for noforward command", rdata, 0); return; } /* what we perceive as being the service the command falls under */ string service(mon_cmd->module); dout(25) << __func__ << " prefix='" << prefix << "' module='" << module << "' service='" << service << "'" << dendl; bool cmd_is_rw = (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x')); // validate user's permissions for requested command map param_str_map; _generate_command_map(cmdmap, param_str_map); if (!_allowed_command(session, service, prefix, cmdmap, param_str_map, mon_cmd)) { dout(1) << __func__ << " access denied" << dendl; (cmd_is_rw ? audit_clog->info() : audit_clog->debug()) << "from='" << session->inst << "' " << "entity='" << session->entity_name << "' " << "cmd=" << m->cmd << ": access denied"; reply_command(op, -EACCES, "access denied", 0); return; } (cmd_is_rw ? audit_clog->info() : audit_clog->debug()) << "from='" << session->inst << "' " << "entity='" << session->entity_name << "' " << "cmd=" << m->cmd << ": dispatch"; if (mon_cmd->is_mgr() && osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { const auto& hdr = m->get_header(); uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len; uint64_t max = g_conf->get_val("mon_client_bytes") * g_conf->get_val("mon_mgr_proxy_client_bytes_ratio"); if (mgr_proxy_bytes + size > max) { dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes << " + " << size << " > max " << max << dendl; reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0); return; } mgr_proxy_bytes += size; dout(10) << __func__ << " proxying mgr command (+" << size << " -> " << mgr_proxy_bytes << ")" << dendl; C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size); mgr_client.start_command(m->cmd, m->get_data(), &fin->outbl, &fin->outs, new C_OnFinisher(fin, &finisher)); return; } if ((module == "mds" || module == "fs") && prefix != "fs authorize") { mdsmon()->dispatch(op); return; } if ((module == "osd" || prefix == "pg map") && prefix != "osd last-stat-seq") { osdmon()->dispatch(op); return; } if (module == "pg") { pgmon()->dispatch(op); return; } if (module == "mon" && /* Let the Monitor class handle the following commands: * 'mon compact' * 'mon scrub' * 'mon sync force' */ prefix != "mon compact" && prefix != "mon scrub" && prefix != "mon sync force" && prefix != "mon metadata" && prefix != "mon versions" && prefix != "mon count-metadata") { monmon()->dispatch(op); return; } if (module == "auth" || prefix == "fs authorize") { authmon()->dispatch(op); return; } if (module == "log") { logmon()->dispatch(op); return; } if (module == "config-key") { config_key_service->dispatch(op); return; } if (module == "mgr") { mgrmon()->dispatch(op); return; } if (prefix == "fsid") { if (f) { f->open_object_section("fsid"); f->dump_stream("fsid") << monmap->fsid; f->close_section(); f->flush(rdata); } else { ds << monmap->fsid; rdata.append(ds); } reply_command(op, 0, "", rdata, 0); return; } if (prefix == "scrub" || prefix == "mon scrub") { wait_for_paxos_write(); if (is_leader()) { int r = scrub_start(); reply_command(op, r, "", rdata, 0); } else if (is_peon()) { forward_request_leader(op); } else { reply_command(op, -EAGAIN, "no quorum", rdata, 0); } return; } if (prefix == "compact" || prefix == "mon compact") { dout(1) << "triggering manual compaction" << dendl; utime_t start = ceph_clock_now(); store->compact(); utime_t end = ceph_clock_now(); end -= start; dout(1) << "finished manual compaction in " << end << " seconds" << dendl; ostringstream oss; oss << "compacted " << g_conf->get_val("mon_keyvaluedb") << " in " << end << " seconds"; rs = oss.str(); r = 0; } else if (prefix == "injectargs") { vector injected_args; cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args); if (!injected_args.empty()) { dout(0) << "parsing injected options '" << injected_args << "'" << dendl; ostringstream oss; r = g_conf->injectargs(str_join(injected_args, " "), &oss); ss << "injectargs:" << oss.str(); rs = ss.str(); goto out; } else { rs = "must supply options to be parsed in a single string"; r = -EINVAL; } } else if (prefix == "time-sync-status") { if (!f) f.reset(Formatter::create("json-pretty")); f->open_object_section("time_sync"); if (!timecheck_skews.empty()) { f->open_object_section("time_skew_status"); for (auto& i : timecheck_skews) { entity_inst_t inst = i.first; double skew = i.second; double latency = timecheck_latencies[inst]; string name = monmap->get_name(inst.addr); ostringstream tcss; health_status_t tcstatus = timecheck_status(tcss, skew, latency); f->open_object_section(name.c_str()); f->dump_float("skew", skew); f->dump_float("latency", latency); f->dump_stream("health") << tcstatus; if (tcstatus != HEALTH_OK) { f->dump_stream("details") << tcss.str(); } f->close_section(); } f->close_section(); } f->open_object_section("timechecks"); f->dump_unsigned("epoch", get_epoch()); f->dump_int("round", timecheck_round); f->dump_stream("round_status") << ((timecheck_round%2) ? "on-going" : "finished"); f->close_section(); f->close_section(); f->flush(rdata); r = 0; rs = ""; } else if (prefix == "config set") { std::string key; cmd_getval(cct, cmdmap, "key", key); std::string val; cmd_getval(cct, cmdmap, "value", val); r = g_conf->set_val(key, val, true, &ss); if (r == 0) { g_conf->apply_changes(nullptr); } rs = ss.str(); goto out; } else if (prefix == "status" || prefix == "health" || prefix == "df") { string detail; cmd_getval(g_ceph_context, cmdmap, "detail", detail); if (prefix == "status") { // get_cluster_status handles f == NULL get_cluster_status(ds, f.get()); if (f) { f->flush(ds); ds << '\n'; } rdata.append(ds); } else if (prefix == "health") { if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { string plain; get_health_status(detail == "detail", f.get(), f ? nullptr : &plain); if (f) { f->flush(rdata); } else { rdata.append(plain); } } else { list health_str; get_health(health_str, detail == "detail" ? &rdata : NULL, f.get()); if (f) { f->flush(ds); ds << '\n'; } else { assert(!health_str.empty()); ds << health_str.front(); health_str.pop_front(); if (!health_str.empty()) { ds << ' '; ds << joinify(health_str.begin(), health_str.end(), string("; ")); } } bufferlist comb; comb.append(ds); if (detail == "detail") comb.append(rdata); rdata = comb; } } else if (prefix == "df") { bool verbose = (detail == "detail"); if (f) f->open_object_section("stats"); pgservice->dump_fs_stats(&ds, f.get(), verbose); if (!f) ds << '\n'; pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose); if (f) { f->close_section(); f->flush(ds); ds << '\n'; } } else { assert(0 == "We should never get here!"); return; } rdata.append(ds); rs = ""; r = 0; } else if (prefix == "report") { // this must be formatted, in its current form if (!f) f.reset(Formatter::create("json-pretty")); f->open_object_section("report"); f->dump_stream("cluster_fingerprint") << fingerprint; f->dump_string("version", ceph_version_to_str()); f->dump_string("commit", git_version_to_str()); f->dump_stream("timestamp") << ceph_clock_now(); vector tagsvec; cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec); string tagstr = str_join(tagsvec, " "); if (!tagstr.empty()) tagstr = tagstr.substr(0, tagstr.find_last_of(' ')); f->dump_string("tag", tagstr); if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { get_health_status(true, f.get(), nullptr); } else { list health_str; get_health(health_str, nullptr, f.get()); } monmon()->dump_info(f.get()); osdmon()->dump_info(f.get()); mdsmon()->dump_info(f.get()); authmon()->dump_info(f.get()); pgservice->dump_info(f.get()); paxos->dump_info(f.get()); f->close_section(); f->flush(rdata); ostringstream ss2; ss2 << "report " << rdata.crc32c(CEPH_MON_PORT); rs = ss2.str(); r = 0; } else if (prefix == "osd last-stat-seq") { int64_t osd; cmd_getval(g_ceph_context, cmdmap, "id", osd); uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd); if (f) { f->dump_unsigned("seq", seq); f->flush(ds); } else { ds << seq; rdata.append(ds); } rs = ""; r = 0; } else if (prefix == "node ls") { string node_type("all"); cmd_getval(g_ceph_context, cmdmap, "type", node_type); if (!f) f.reset(Formatter::create("json-pretty")); if (node_type == "all") { f->open_object_section("nodes"); print_nodes(f.get(), ds); osdmon()->print_nodes(f.get()); mdsmon()->print_nodes(f.get()); f->close_section(); } else if (node_type == "mon") { print_nodes(f.get(), ds); } else if (node_type == "osd") { osdmon()->print_nodes(f.get()); } else if (node_type == "mds") { mdsmon()->print_nodes(f.get()); } f->flush(ds); rdata.append(ds); rs = ""; r = 0; } else if (prefix == "features") { if (!is_leader() && !is_peon()) { dout(10) << " waiting for quorum" << dendl; waitfor_quorum.push_back(new C_RetryMessage(this, op)); return; } if (!is_leader()) { forward_request_leader(op); return; } if (!f) f.reset(Formatter::create("json-pretty")); FeatureMap fm; get_combined_feature_map(&fm); f->dump_object("features", fm); f->flush(rdata); rs = ""; r = 0; } else if (prefix == "mon metadata") { if (!f) f.reset(Formatter::create("json-pretty")); string name; bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name); if (!all) { // Dump a single mon's metadata int mon = monmap->get_rank(name); if (mon < 0) { rs = "requested mon not found"; r = -ENOENT; goto out; } f->open_object_section("mon_metadata"); r = get_mon_metadata(mon, f.get(), ds); f->close_section(); } else { // Dump all mons' metadata r = 0; f->open_array_section("mon_metadata"); for (unsigned int rank = 0; rank < monmap->size(); ++rank) { std::ostringstream get_err; f->open_object_section("mon"); f->dump_string("name", monmap->get_name(rank)); r = get_mon_metadata(rank, f.get(), get_err); f->close_section(); if (r == -ENOENT || r == -EINVAL) { dout(1) << get_err.str() << dendl; // Drop error, list what metadata we do have r = 0; } else if (r != 0) { derr << "Unexpected error from get_mon_metadata: " << cpp_strerror(r) << dendl; ds << get_err.str(); break; } } f->close_section(); } f->flush(ds); rdata.append(ds); rs = ""; } else if (prefix == "mon versions") { if (!f) f.reset(Formatter::create("json-pretty")); count_metadata("ceph_version", f.get()); f->flush(ds); rdata.append(ds); rs = ""; r = 0; } else if (prefix == "mon count-metadata") { if (!f) f.reset(Formatter::create("json-pretty")); string field; cmd_getval(g_ceph_context, cmdmap, "property", field); count_metadata(field, f.get()); f->flush(ds); rdata.append(ds); rs = ""; r = 0; } else if (prefix == "quorum_status") { // make sure our map is readable and up to date if (!is_leader() && !is_peon()) { dout(10) << " waiting for quorum" << dendl; waitfor_quorum.push_back(new C_RetryMessage(this, op)); return; } _quorum_status(f.get(), ds); rdata.append(ds); rs = ""; r = 0; } else if (prefix == "mon_status") { get_mon_status(f.get(), ds); if (f) f->flush(ds); rdata.append(ds); rs = ""; r = 0; } else if (prefix == "sync force" || prefix == "mon sync force") { string validate1, validate2; cmd_getval(g_ceph_context, cmdmap, "validate1", validate1); cmd_getval(g_ceph_context, cmdmap, "validate2", validate2); if (validate1 != "--yes-i-really-mean-it" || validate2 != "--i-know-what-i-am-doing") { r = -EINVAL; rs = "are you SURE? this will mean the monitor store will be " "erased. pass '--yes-i-really-mean-it " "--i-know-what-i-am-doing' if you really do."; goto out; } sync_force(f.get(), ds); rs = ds.str(); r = 0; } else if (prefix == "heap") { if (!ceph_using_tcmalloc()) rs = "tcmalloc not enabled, can't use heap profiler commands\n"; else { string heapcmd; cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd); // XXX 1-element vector, change at callee or make vector here? vector heapcmd_vec; get_str_vec(heapcmd, heapcmd_vec); ceph_heap_profiler_handle_command(heapcmd_vec, ds); rdata.append(ds); rs = ""; r = 0; } } else if (prefix == "quorum") { string quorumcmd; cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd); if (quorumcmd == "exit") { start_election(); elector.stop_participating(); rs = "stopped responding to quorum, initiated new election"; r = 0; } else if (quorumcmd == "enter") { elector.start_participating(); start_election(); rs = "started responding to quorum, initiated new election"; r = 0; } else { rs = "needs a valid 'quorum' command"; r = -EINVAL; } } else if (prefix == "version") { if (f) { f->open_object_section("version"); f->dump_string("version", pretty_version_to_str()); f->close_section(); f->flush(ds); } else { ds << pretty_version_to_str(); } rdata.append(ds); rs = ""; r = 0; } else if (prefix == "versions") { if (!f) f.reset(Formatter::create("json-pretty")); map overall; f->open_object_section("version"); map mon, mgr, osd, mds; count_metadata("ceph_version", &mon); f->open_object_section("mon"); for (auto& p : mon) { f->dump_int(p.first.c_str(), p.second); overall[p.first] += p.second; } f->close_section(); mgrmon()->count_metadata("ceph_version", &mgr); f->open_object_section("mgr"); for (auto& p : mgr) { f->dump_int(p.first.c_str(), p.second); overall[p.first] += p.second; } f->close_section(); osdmon()->count_metadata("ceph_version", &osd); f->open_object_section("osd"); for (auto& p : osd) { f->dump_int(p.first.c_str(), p.second); overall[p.first] += p.second; } f->close_section(); mdsmon()->count_metadata("ceph_version", &mds); f->open_object_section("mds"); for (auto& p : mds) { f->dump_int(p.first.c_str(), p.second); overall[p.first] += p.second; } f->close_section(); for (auto& p : mgrstatmon()->get_service_map().services) { f->open_object_section(p.first.c_str()); map m; p.second.count_metadata("ceph_version", &m); for (auto& q : m) { f->dump_int(q.first.c_str(), q.second); overall[q.first] += q.second; } f->close_section(); } f->open_object_section("overall"); for (auto& p : overall) { f->dump_int(p.first.c_str(), p.second); } f->close_section(); f->close_section(); f->flush(rdata); rs = ""; r = 0; } out: if (!m->get_source().is_mon()) // don't reply to mon->mon commands reply_command(op, r, rs, rdata, 0); } void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version) { bufferlist rdata; reply_command(op, rc, rs, rdata, version); } void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, bufferlist& rdata, version_t version) { MMonCommand *m = static_cast(op->get_req()); assert(m->get_type() == MSG_MON_COMMAND); MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version); reply->set_tid(m->get_tid()); reply->set_data(rdata); send_reply(op, reply); } // ------------------------ // request/reply routing // // a client/mds/osd will connect to a random monitor. we need to forward any // messages requiring state updates to the leader, and then route any replies // back via the correct monitor and back to them. (the monitor will not // initiate any connections.) void Monitor::forward_request_leader(MonOpRequestRef op) { op->mark_event(__func__); int mon = get_leader(); MonSession *session = op->get_session(); PaxosServiceMessage *req = op->get_req(); if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) { dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl; } else if (session->proxy_con) { dout(10) << "forward_request won't double fwd request " << *req << dendl; } else if (!session->closed) { RoutedRequest *rr = new RoutedRequest; rr->tid = ++routed_request_tid; rr->client_inst = req->get_source_inst(); rr->con = req->get_connection(); rr->con_features = rr->con->get_features(); encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features rr->session = static_cast(session->get()); rr->op = op; routed_requests[rr->tid] = rr; session->routed_request_tids.insert(rr->tid); dout(10) << "forward_request " << rr->tid << " request " << *req << " features " << rr->con_features << dendl; MForward *forward = new MForward(rr->tid, req, rr->con_features, rr->session->caps); forward->set_priority(req->get_priority()); if (session->auth_handler) { forward->entity_name = session->entity_name; } else if (req->get_source().is_mon()) { forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON); } messenger->send_message(forward, monmap->get_inst(mon)); op->mark_forwarded(); assert(op->get_req()->get_type() != 0); } else { dout(10) << "forward_request no session for request " << *req << dendl; } } // fake connection attached to forwarded messages struct AnonConnection : public Connection { explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {} int send_message(Message *m) override { assert(!"send_message on anonymous connection"); } void send_keepalive() override { assert(!"send_keepalive on anonymous connection"); } void mark_down() override { // silently ignore } void mark_disposable() override { // silengtly ignore } bool is_connected() override { return false; } }; //extract the original message and put it into the regular dispatch function void Monitor::handle_forward(MonOpRequestRef op) { MForward *m = static_cast(op->get_req()); dout(10) << "received forwarded message from " << m->client << " via " << m->get_source_inst() << dendl; MonSession *session = op->get_session(); assert(session); if (!session->is_capable("mon", MON_CAP_X)) { dout(0) << "forward from entity with insufficient caps! " << session->caps << dendl; } else { // see PaxosService::dispatch(); we rely on this being anon // (c->msgr == NULL) PaxosServiceMessage *req = m->claim_message(); assert(req != NULL); ConnectionRef c(new AnonConnection(cct)); MonSession *s = new MonSession(req->get_source_inst(), static_cast(c.get())); c->set_priv(s->get()); c->set_peer_addr(m->client.addr); c->set_peer_type(m->client.name.type()); c->set_features(m->con_features); s->caps = m->client_caps; dout(10) << " caps are " << s->caps << dendl; s->entity_name = m->entity_name; dout(10) << " entity name '" << s->entity_name << "' type " << s->entity_name.get_type() << dendl; s->proxy_con = m->get_connection(); s->proxy_tid = m->tid; req->set_connection(c); // not super accurate, but better than nothing. req->set_recv_stamp(m->get_recv_stamp()); /* * note which election epoch this is; we will drop the message if * there is a future election since our peers will resend routed * requests in that case. */ req->rx_election_epoch = get_epoch(); /* Because this is a special fake connection, we need to break the ref loop between Connection and MonSession differently than we normally do. Here, the Message refers to the Connection which refers to the Session, and nobody else refers to the Connection or the Session. And due to the special nature of this message, nobody refers to the Connection via the Session. So, clear out that half of the ref loop.*/ s->con.reset(NULL); dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl; _ms_dispatch(req); s->put(); } } void Monitor::try_send_message(Message *m, const entity_inst_t& to) { dout(10) << "try_send_message " << *m << " to " << to << dendl; bufferlist bl; encode_message(m, quorum_con_features, bl); messenger->send_message(m, to); for (int i=0; i<(int)monmap->size(); i++) { if (i != rank) messenger->send_message(new MRoute(bl, to), monmap->get_inst(i)); } } void Monitor::send_reply(MonOpRequestRef op, Message *reply) { op->mark_event(__func__); MonSession *session = op->get_session(); assert(session); Message *req = op->get_req(); ConnectionRef con = op->get_connection(); reply->set_cct(g_ceph_context); dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl; if (!con) { dout(2) << "send_reply no connection, dropping reply " << *reply << " to " << req << " " << *req << dendl; reply->put(); op->mark_event("reply: no connection"); return; } if (!session->con && !session->proxy_con) { dout(2) << "send_reply no connection, dropping reply " << *reply << " to " << req << " " << *req << dendl; reply->put(); op->mark_event("reply: no connection"); return; } if (session->proxy_con) { dout(15) << "send_reply routing reply to " << con->get_peer_addr() << " via " << session->proxy_con->get_peer_addr() << " for request " << *req << dendl; session->proxy_con->send_message(new MRoute(session->proxy_tid, reply)); op->mark_event("reply: send routed request"); } else { session->con->send_message(reply); op->mark_event("reply: send"); } } void Monitor::no_reply(MonOpRequestRef op) { MonSession *session = op->get_session(); Message *req = op->get_req(); if (session->proxy_con) { dout(10) << "no_reply to " << req->get_source_inst() << " via " << session->proxy_con->get_peer_addr() << " for request " << *req << dendl; session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL)); op->mark_event("no_reply: send routed request"); } else { dout(10) << "no_reply to " << req->get_source_inst() << " " << *req << dendl; op->mark_event("no_reply"); } } void Monitor::handle_route(MonOpRequestRef op) { MRoute *m = static_cast(op->get_req()); MonSession *session = op->get_session(); //check privileges if (!session->is_capable("mon", MON_CAP_X)) { dout(0) << "MRoute received from entity without appropriate perms! " << dendl; return; } if (m->msg) dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl; else dout(10) << "handle_route null to " << m->dest << dendl; // look it up if (m->session_mon_tid) { if (routed_requests.count(m->session_mon_tid)) { RoutedRequest *rr = routed_requests[m->session_mon_tid]; // reset payload, in case encoding is dependent on target features if (m->msg) { m->msg->clear_payload(); rr->con->send_message(m->msg); m->msg = NULL; } if (m->send_osdmap_first) { dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl; osdmon()->send_incremental(m->send_osdmap_first, rr->session, true, MonOpRequestRef()); } assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid)); routed_requests.erase(m->session_mon_tid); rr->session->routed_request_tids.erase(m->session_mon_tid); delete rr; } else { dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl; } } else { dout(10) << " not a routed request, trying to send anyway" << dendl; if (m->msg) { messenger->send_message(m->msg, m->dest); m->msg = NULL; } } } void Monitor::resend_routed_requests() { dout(10) << "resend_routed_requests" << dendl; int mon = get_leader(); list retry; for (map::iterator p = routed_requests.begin(); p != routed_requests.end(); ++p) { RoutedRequest *rr = p->second; if (mon == rank) { dout(10) << " requeue for self tid " << rr->tid << dendl; rr->op->mark_event("retry routed request"); retry.push_back(new C_RetryMessage(this, rr->op)); if (rr->session) { assert(rr->session->routed_request_tids.count(p->first)); rr->session->routed_request_tids.erase(p->first); } delete rr; } else { bufferlist::iterator q = rr->request_bl.begin(); PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q); rr->op->mark_event("resend forwarded message to leader"); dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl; MForward *forward = new MForward(rr->tid, req, rr->con_features, rr->session->caps); req->put(); // forward takes its own ref; drop ours. forward->client = rr->client_inst; forward->set_priority(req->get_priority()); messenger->send_message(forward, monmap->get_inst(mon)); } } if (mon == rank) { routed_requests.clear(); finish_contexts(g_ceph_context, retry); } } void Monitor::remove_session(MonSession *s) { dout(10) << "remove_session " << s << " " << s->inst << " features 0x" << std::hex << s->con_features << std::dec << dendl; assert(s->con); assert(!s->closed); for (set::iterator p = s->routed_request_tids.begin(); p != s->routed_request_tids.end(); ++p) { assert(routed_requests.count(*p)); RoutedRequest *rr = routed_requests[*p]; dout(10) << " dropping routed request " << rr->tid << dendl; delete rr; routed_requests.erase(*p); } s->routed_request_tids.clear(); s->con->set_priv(NULL); session_map.remove_session(s); logger->set(l_mon_num_sessions, session_map.get_size()); logger->inc(l_mon_session_rm); } void Monitor::remove_all_sessions() { Mutex::Locker l(session_map_lock); while (!session_map.sessions.empty()) { MonSession *s = session_map.sessions.front(); remove_session(s); if (logger) logger->inc(l_mon_session_rm); } if (logger) logger->set(l_mon_num_sessions, session_map.get_size()); } void Monitor::send_command(const entity_inst_t& inst, const vector& com) { dout(10) << "send_command " << inst << "" << com << dendl; MMonCommand *c = new MMonCommand(monmap->fsid); c->cmd = com; try_send_message(c, inst); } void Monitor::waitlist_or_zap_client(MonOpRequestRef op) { /** * Wait list the new session until we're in the quorum, assuming it's * sufficiently new. * tick() will periodically send them back through so we can send * the client elsewhere if we don't think we're getting back in. * * But we whitelist a few sorts of messages: * 1) Monitors can talk to us at any time, of course. * 2) auth messages. It's unlikely to go through much faster, but * it's possible we've just lost our quorum status and we want to take... * 3) command messages. We want to accept these under all possible * circumstances. */ Message *m = op->get_req(); MonSession *s = op->get_session(); ConnectionRef con = op->get_connection(); utime_t too_old = ceph_clock_now(); too_old -= g_ceph_context->_conf->mon_lease; if (m->get_recv_stamp() > too_old && con->is_connected()) { dout(5) << "waitlisting message " << *m << dendl; maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op)); op->mark_wait_for_quorum(); } else { dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl; con->mark_down(); // proxied sessions aren't registered and don't have a con; don't remove // those. if (!s->proxy_con) { Mutex::Locker l(session_map_lock); remove_session(s); } op->mark_zap(); } } void Monitor::_ms_dispatch(Message *m) { if (is_shutdown()) { m->put(); return; } MonOpRequestRef op = op_tracker.create_request(m); bool src_is_mon = op->is_src_mon(); op->mark_event("mon:_ms_dispatch"); MonSession *s = op->get_session(); if (s && s->closed) { return; } if (src_is_mon && s) { ConnectionRef con = m->get_connection(); if (con->get_messenger() && con->get_features() != s->con_features) { // only update features if this is a non-anonymous connection dout(10) << __func__ << " feature change for " << m->get_source_inst() << " (was " << s->con_features << ", now " << con->get_features() << ")" << dendl; // connection features changed - recreate session. if (s->con && s->con != con) { dout(10) << __func__ << " connection for " << m->get_source_inst() << " changed from session; mark down and replace" << dendl; s->con->mark_down(); } if (s->item.is_on_list()) { // forwarded messages' sessions are not in the sessions map and // exist only while the op is being handled. remove_session(s); } s->put(); s = nullptr; } } if (!s) { // if the sender is not a monitor, make sure their first message for a // session is an MAuth. If it is not, assume it's a stray message, // and considering that we are creating a new session it is safe to // assume that the sender hasn't authenticated yet, so we have no way // of assessing whether we should handle it or not. if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH && m->get_type() != CEPH_MSG_MON_GET_MAP && m->get_type() != CEPH_MSG_PING)) { dout(1) << __func__ << " dropping stray message " << *m << " from " << m->get_source_inst() << dendl; return; } ConnectionRef con = m->get_connection(); { Mutex::Locker l(session_map_lock); s = session_map.new_session(m->get_source_inst(), con.get()); } assert(s); con->set_priv(s->get()); dout(10) << __func__ << " new session " << s << " " << *s << " features 0x" << std::hex << s->con_features << std::dec << dendl; op->set_session(s); logger->set(l_mon_num_sessions, session_map.get_size()); logger->inc(l_mon_session_add); if (src_is_mon) { // give it monitor caps; the peer type has been authenticated dout(5) << __func__ << " setting monitor caps on this connection" << dendl; if (!s->caps.is_allow_all()) // but no need to repeatedly copy s->caps = *mon_caps; } s->put(); } else { dout(20) << __func__ << " existing session " << s << " for " << s->inst << dendl; } assert(s); s->session_timeout = ceph_clock_now(); s->session_timeout += g_conf->mon_session_timeout; if (s->auth_handler) { s->entity_name = s->auth_handler->get_entity_name(); } dout(20) << " caps " << s->caps.get_str() << dendl; if ((is_synchronizing() || (s->global_id == 0 && !exited_quorum.is_zero())) && !src_is_mon && m->get_type() != CEPH_MSG_PING) { waitlist_or_zap_client(op); } else { dispatch_op(op); } return; } void Monitor::dispatch_op(MonOpRequestRef op) { op->mark_event("mon:dispatch_op"); MonSession *s = op->get_session(); assert(s); if (s->closed) { dout(10) << " session closed, dropping " << op->get_req() << dendl; return; } /* we will consider the default type as being 'monitor' until proven wrong */ op->set_type_monitor(); /* deal with all messages that do not necessarily need caps */ bool dealt_with = true; switch (op->get_req()->get_type()) { // auth case MSG_MON_GLOBAL_ID: case CEPH_MSG_AUTH: op->set_type_service(); /* no need to check caps here */ paxos_service[PAXOS_AUTH]->dispatch(op); break; case CEPH_MSG_PING: handle_ping(op); break; /* MMonGetMap may be used by clients to obtain a monmap *before* * authenticating with the monitor. We need to handle these without * checking caps because, even on a cluster without cephx, we only set * session caps *after* the auth handshake. A good example of this * is when a client calls MonClient::get_monmap_privately(), which does * not authenticate when obtaining a monmap. */ case CEPH_MSG_MON_GET_MAP: handle_mon_get_map(op); break; case CEPH_MSG_MON_METADATA: return handle_mon_metadata(op); default: dealt_with = false; break; } if (dealt_with) return; /* well, maybe the op belongs to a service... */ op->set_type_service(); /* deal with all messages which caps should be checked somewhere else */ dealt_with = true; switch (op->get_req()->get_type()) { // OSDs case CEPH_MSG_MON_GET_OSDMAP: case CEPH_MSG_POOLOP: case MSG_OSD_BEACON: case MSG_OSD_MARK_ME_DOWN: case MSG_OSD_FULL: case MSG_OSD_FAILURE: case MSG_OSD_BOOT: case MSG_OSD_ALIVE: case MSG_OSD_PGTEMP: case MSG_OSD_PG_CREATED: case MSG_REMOVE_SNAPS: paxos_service[PAXOS_OSDMAP]->dispatch(op); break; // MDSs case MSG_MDS_BEACON: case MSG_MDS_OFFLOAD_TARGETS: paxos_service[PAXOS_MDSMAP]->dispatch(op); break; // Mgrs case MSG_MGR_BEACON: paxos_service[PAXOS_MGR]->dispatch(op); break; // MgrStat case CEPH_MSG_STATFS: // this is an ugly hack, sorry! force the version to 1 so that we do // not run afoul of the is_readable() paxos check. the client is going // by the pgmonitor version and the MgrStatMonitor version will lag behind // that until we complete the upgrade. The paxos ordering crap really // doesn't matter for statfs results, so just kludge around it here. if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) { ((MStatfs*)op->get_req())->version = 1; } case MSG_MON_MGR_REPORT: case MSG_GETPOOLSTATS: paxos_service[PAXOS_MGRSTAT]->dispatch(op); break; // pg case MSG_PGSTATS: paxos_service[PAXOS_PGMAP]->dispatch(op); break; // log case MSG_LOG: paxos_service[PAXOS_LOG]->dispatch(op); break; // handle_command() does its own caps checking case MSG_MON_COMMAND: op->set_type_command(); handle_command(op); break; default: dealt_with = false; break; } if (dealt_with) return; /* nop, looks like it's not a service message; revert back to monitor */ op->set_type_monitor(); /* messages we, the Monitor class, need to deal with * but may be sent by clients. */ if (!op->get_session()->is_capable("mon", MON_CAP_R)) { dout(5) << __func__ << " " << op->get_req()->get_source_inst() << " not enough caps for " << *(op->get_req()) << " -- dropping" << dendl; goto drop; } dealt_with = true; switch (op->get_req()->get_type()) { // misc case CEPH_MSG_MON_GET_VERSION: handle_get_version(op); break; case CEPH_MSG_MON_SUBSCRIBE: /* FIXME: check what's being subscribed, filter accordingly */ handle_subscribe(op); break; default: dealt_with = false; break; } if (dealt_with) return; if (!op->is_src_mon()) { dout(1) << __func__ << " unexpected monitor message from" << " non-monitor entity " << op->get_req()->get_source_inst() << " " << *(op->get_req()) << " -- dropping" << dendl; goto drop; } /* messages that should only be sent by another monitor */ dealt_with = true; switch (op->get_req()->get_type()) { case MSG_ROUTE: handle_route(op); break; case MSG_MON_PROBE: handle_probe(op); break; // Sync (i.e., the new slurp, but on steroids) case MSG_MON_SYNC: handle_sync(op); break; case MSG_MON_SCRUB: handle_scrub(op); break; /* log acks are sent from a monitor we sent the MLog to, and are never sent by clients to us. */ case MSG_LOGACK: log_client.handle_log_ack((MLogAck*)op->get_req()); break; // monmap case MSG_MON_JOIN: op->set_type_service(); paxos_service[PAXOS_MONMAP]->dispatch(op); break; // paxos case MSG_MON_PAXOS: { op->set_type_paxos(); MMonPaxos *pm = static_cast(op->get_req()); if (!op->get_session()->is_capable("mon", MON_CAP_X)) { //can't send these! break; } if (state == STATE_SYNCHRONIZING) { // we are synchronizing. These messages would do us no // good, thus just drop them and ignore them. dout(10) << __func__ << " ignore paxos msg from " << pm->get_source_inst() << dendl; break; } // sanitize if (pm->epoch > get_epoch()) { bootstrap(); break; } if (pm->epoch != get_epoch()) { break; } paxos->dispatch(op); } break; // elector messages case MSG_MON_ELECTION: op->set_type_election(); //check privileges here for simplicity if (!op->get_session()->is_capable("mon", MON_CAP_X)) { dout(0) << "MMonElection received from entity without enough caps!" << op->get_session()->caps << dendl; break; } if (!is_probing() && !is_synchronizing()) { elector.dispatch(op); } break; case MSG_FORWARD: handle_forward(op); break; case MSG_TIMECHECK: handle_timecheck(op); break; case MSG_MON_HEALTH: health_monitor->dispatch(op); break; case MSG_MON_HEALTH_CHECKS: op->set_type_service(); paxos_service[PAXOS_HEALTH]->dispatch(op); break; default: dealt_with = false; break; } if (!dealt_with) { dout(1) << "dropping unexpected " << *(op->get_req()) << dendl; goto drop; } return; drop: return; } void Monitor::handle_ping(MonOpRequestRef op) { MPing *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; MPing *reply = new MPing; entity_inst_t inst = m->get_source_inst(); bufferlist payload; boost::scoped_ptr f(new JSONFormatter(true)); f->open_object_section("pong"); if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { get_health_status(false, f.get(), nullptr); } else { list health_str; get_health(health_str, nullptr, f.get()); } { stringstream ss; get_mon_status(f.get(), ss); } f->close_section(); stringstream ss; f->flush(ss); ::encode(ss.str(), payload); reply->set_payload(payload); dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl; messenger->send_message(reply, inst); } void Monitor::timecheck_start() { dout(10) << __func__ << dendl; timecheck_cleanup(); timecheck_start_round(); } void Monitor::timecheck_finish() { dout(10) << __func__ << dendl; timecheck_cleanup(); } void Monitor::timecheck_start_round() { dout(10) << __func__ << " curr " << timecheck_round << dendl; assert(is_leader()); if (monmap->size() == 1) { assert(0 == "We are alone; this shouldn't have been scheduled!"); return; } if (timecheck_round % 2) { dout(10) << __func__ << " there's a timecheck going on" << dendl; utime_t curr_time = ceph_clock_now(); double max = g_conf->mon_timecheck_interval*3; if (curr_time - timecheck_round_start < max) { dout(10) << __func__ << " keep current round going" << dendl; goto out; } else { dout(10) << __func__ << " finish current timecheck and start new" << dendl; timecheck_cancel_round(); } } assert(timecheck_round % 2 == 0); timecheck_acks = 0; timecheck_round ++; timecheck_round_start = ceph_clock_now(); dout(10) << __func__ << " new " << timecheck_round << dendl; timecheck(); out: dout(10) << __func__ << " setting up next event" << dendl; timecheck_reset_event(); } void Monitor::timecheck_finish_round(bool success) { dout(10) << __func__ << " curr " << timecheck_round << dendl; assert(timecheck_round % 2); timecheck_round ++; timecheck_round_start = utime_t(); if (success) { assert(timecheck_waiting.empty()); assert(timecheck_acks == quorum.size()); timecheck_report(); timecheck_check_skews(); return; } dout(10) << __func__ << " " << timecheck_waiting.size() << " peers still waiting:"; for (map::iterator p = timecheck_waiting.begin(); p != timecheck_waiting.end(); ++p) { *_dout << " " << p->first.name; } *_dout << dendl; timecheck_waiting.clear(); dout(10) << __func__ << " finished to " << timecheck_round << dendl; } void Monitor::timecheck_cancel_round() { timecheck_finish_round(false); } void Monitor::timecheck_cleanup() { timecheck_round = 0; timecheck_acks = 0; timecheck_round_start = utime_t(); if (timecheck_event) { timer.cancel_event(timecheck_event); timecheck_event = NULL; } timecheck_waiting.clear(); timecheck_skews.clear(); timecheck_latencies.clear(); timecheck_rounds_since_clean = 0; } void Monitor::timecheck_reset_event() { if (timecheck_event) { timer.cancel_event(timecheck_event); timecheck_event = NULL; } double delay = cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean; if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) { delay = cct->_conf->mon_timecheck_interval; } dout(10) << __func__ << " delay " << delay << " rounds_since_clean " << timecheck_rounds_since_clean << dendl; timecheck_event = timer.add_event_after( delay, new C_MonContext(this, [this](int) { timecheck_start_round(); })); } void Monitor::timecheck_check_skews() { dout(10) << __func__ << dendl; assert(is_leader()); assert((timecheck_round % 2) == 0); if (monmap->size() == 1) { assert(0 == "We are alone; we shouldn't have gotten here!"); return; } assert(timecheck_latencies.size() == timecheck_skews.size()); bool found_skew = false; for (map::iterator p = timecheck_skews.begin(); p != timecheck_skews.end(); ++p) { double abs_skew; if (timecheck_has_skew(p->second, &abs_skew)) { dout(10) << __func__ << " " << p->first << " skew " << abs_skew << dendl; found_skew = true; } } if (found_skew) { ++timecheck_rounds_since_clean; timecheck_reset_event(); } else if (timecheck_rounds_since_clean > 0) { dout(1) << __func__ << " no clock skews found after " << timecheck_rounds_since_clean << " rounds" << dendl; // make sure the skews are really gone and not just a transient success // this will run just once if not in the presence of skews again. timecheck_rounds_since_clean = 1; timecheck_reset_event(); timecheck_rounds_since_clean = 0; } } void Monitor::timecheck_report() { dout(10) << __func__ << dendl; assert(is_leader()); assert((timecheck_round % 2) == 0); if (monmap->size() == 1) { assert(0 == "We are alone; we shouldn't have gotten here!"); return; } assert(timecheck_latencies.size() == timecheck_skews.size()); bool do_output = true; // only output report once for (set::iterator q = quorum.begin(); q != quorum.end(); ++q) { if (monmap->get_name(*q) == name) continue; MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT); m->epoch = get_epoch(); m->round = timecheck_round; for (map::iterator it = timecheck_skews.begin(); it != timecheck_skews.end(); ++it) { double skew = it->second; double latency = timecheck_latencies[it->first]; m->skews[it->first] = skew; m->latencies[it->first] = latency; if (do_output) { dout(25) << __func__ << " " << it->first << " latency " << latency << " skew " << skew << dendl; } } do_output = false; entity_inst_t inst = monmap->get_inst(*q); dout(10) << __func__ << " send report to " << inst << dendl; messenger->send_message(m, inst); } } void Monitor::timecheck() { dout(10) << __func__ << dendl; assert(is_leader()); if (monmap->size() == 1) { assert(0 == "We are alone; we shouldn't have gotten here!"); return; } assert(timecheck_round % 2 != 0); timecheck_acks = 1; // we ack ourselves dout(10) << __func__ << " start timecheck epoch " << get_epoch() << " round " << timecheck_round << dendl; // we are at the eye of the storm; the point of reference timecheck_skews[messenger->get_myinst()] = 0.0; timecheck_latencies[messenger->get_myinst()] = 0.0; for (set::iterator it = quorum.begin(); it != quorum.end(); ++it) { if (monmap->get_name(*it) == name) continue; entity_inst_t inst = monmap->get_inst(*it); utime_t curr_time = ceph_clock_now(); timecheck_waiting[inst] = curr_time; MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING); m->epoch = get_epoch(); m->round = timecheck_round; dout(10) << __func__ << " send " << *m << " to " << inst << dendl; messenger->send_message(m, inst); } } health_status_t Monitor::timecheck_status(ostringstream &ss, const double skew_bound, const double latency) { health_status_t status = HEALTH_OK; assert(latency >= 0); double abs_skew; if (timecheck_has_skew(skew_bound, &abs_skew)) { status = HEALTH_WARN; ss << "clock skew " << abs_skew << "s" << " > max " << g_conf->mon_clock_drift_allowed << "s"; } return status; } void Monitor::handle_timecheck_leader(MonOpRequestRef op) { MTimeCheck *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; /* handles PONG's */ assert(m->op == MTimeCheck::OP_PONG); entity_inst_t other = m->get_source_inst(); if (m->epoch < get_epoch()) { dout(1) << __func__ << " got old timecheck epoch " << m->epoch << " from " << other << " curr " << get_epoch() << " -- severely lagged? discard" << dendl; return; } assert(m->epoch == get_epoch()); if (m->round < timecheck_round) { dout(1) << __func__ << " got old round " << m->round << " from " << other << " curr " << timecheck_round << " -- discard" << dendl; return; } utime_t curr_time = ceph_clock_now(); assert(timecheck_waiting.count(other) > 0); utime_t timecheck_sent = timecheck_waiting[other]; timecheck_waiting.erase(other); if (curr_time < timecheck_sent) { // our clock was readjusted -- drop everything until it all makes sense. dout(1) << __func__ << " our clock was readjusted --" << " bump round and drop current check" << dendl; timecheck_cancel_round(); return; } /* update peer latencies */ double latency = (double)(curr_time - timecheck_sent); if (timecheck_latencies.count(other) == 0) timecheck_latencies[other] = latency; else { double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2)); timecheck_latencies[other] = avg_latency; } /* * update skews * * some nasty thing goes on if we were to do 'a - b' between two utime_t, * and 'a' happens to be lower than 'b'; so we use double instead. * * latency is always expected to be >= 0. * * delta, the difference between theirs timestamp and ours, may either be * lower or higher than 0; will hardly ever be 0. * * The absolute skew is the absolute delta minus the latency, which is * taken as a whole instead of an rtt given that there is some queueing * and dispatch times involved and it's hard to assess how long exactly * it took for the message to travel to the other side and be handled. So * we call it a bounded skew, the worst case scenario. * * Now, to math! * * Given that the latency is always positive, we can establish that the * bounded skew will be: * * 1. positive if the absolute delta is higher than the latency and * delta is positive * 2. negative if the absolute delta is higher than the latency and * delta is negative. * 3. zero if the absolute delta is lower than the latency. * * On 3. we make a judgement call and treat the skew as non-existent. * This is because that, if the absolute delta is lower than the * latency, then the apparently existing skew is nothing more than a * side-effect of the high latency at work. * * This may not be entirely true though, as a severely skewed clock * may be masked by an even higher latency, but with high latencies * we probably have worse issues to deal with than just skewed clocks. */ assert(latency >= 0); double delta = ((double) m->timestamp) - ((double) curr_time); double abs_delta = (delta > 0 ? delta : -delta); double skew_bound = abs_delta - latency; if (skew_bound < 0) skew_bound = 0; else if (delta < 0) skew_bound = -skew_bound; ostringstream ss; health_status_t status = timecheck_status(ss, skew_bound, latency); clog->health(status) << other << " " << ss.str(); dout(10) << __func__ << " from " << other << " ts " << m->timestamp << " delta " << delta << " skew_bound " << skew_bound << " latency " << latency << dendl; timecheck_skews[other] = skew_bound; timecheck_acks++; if (timecheck_acks == quorum.size()) { dout(10) << __func__ << " got pongs from everybody (" << timecheck_acks << " total)" << dendl; assert(timecheck_skews.size() == timecheck_acks); assert(timecheck_waiting.empty()); // everyone has acked, so bump the round to finish it. timecheck_finish_round(); } } void Monitor::handle_timecheck_peon(MonOpRequestRef op) { MTimeCheck *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; assert(is_peon()); assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT); if (m->epoch != get_epoch()) { dout(1) << __func__ << " got wrong epoch " << "(ours " << get_epoch() << " theirs: " << m->epoch << ") -- discarding" << dendl; return; } if (m->round < timecheck_round) { dout(1) << __func__ << " got old round " << m->round << " current " << timecheck_round << " (epoch " << get_epoch() << ") -- discarding" << dendl; return; } timecheck_round = m->round; if (m->op == MTimeCheck::OP_REPORT) { assert((timecheck_round % 2) == 0); timecheck_latencies.swap(m->latencies); timecheck_skews.swap(m->skews); return; } assert((timecheck_round % 2) != 0); MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG); utime_t curr_time = ceph_clock_now(); reply->timestamp = curr_time; reply->epoch = m->epoch; reply->round = m->round; dout(10) << __func__ << " send " << *m << " to " << m->get_source_inst() << dendl; m->get_connection()->send_message(reply); } void Monitor::handle_timecheck(MonOpRequestRef op) { MTimeCheck *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; if (is_leader()) { if (m->op != MTimeCheck::OP_PONG) { dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl; } else { handle_timecheck_leader(op); } } else if (is_peon()) { if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) { dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl; } else { handle_timecheck_peon(op); } } else { dout(1) << __func__ << " drop unexpected msg" << dendl; } } void Monitor::handle_subscribe(MonOpRequestRef op) { MMonSubscribe *m = static_cast(op->get_req()); dout(10) << "handle_subscribe " << *m << dendl; bool reply = false; MonSession *s = op->get_session(); assert(s); for (map::iterator p = m->what.begin(); p != m->what.end(); ++p) { // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0) reply = true; // remove conflicting subscribes if (logmon()->sub_name_to_id(p->first) >= 0) { for (map::iterator it = s->sub_map.begin(); it != s->sub_map.end(); ) { if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) { Mutex::Locker l(session_map_lock); session_map.remove_sub((it++)->second); } else { ++it; } } } { Mutex::Locker l(session_map_lock); session_map.add_update_sub(s, p->first, p->second.start, p->second.flags & CEPH_SUBSCRIBE_ONETIME, m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP)); } if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) { dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl; if ((int)s->is_capable("mds", MON_CAP_R)) { Subscription *sub = s->sub_map[p->first]; assert(sub != nullptr); mdsmon()->check_sub(sub); } } else if (p->first == "osdmap") { if ((int)s->is_capable("osd", MON_CAP_R)) { if (s->osd_epoch > p->second.start) { // client needs earlier osdmaps on purpose, so reset the sent epoch s->osd_epoch = 0; } osdmon()->check_osdmap_sub(s->sub_map["osdmap"]); } } else if (p->first == "osd_pg_creates") { if ((int)s->is_capable("osd", MON_CAP_W)) { if (monmap->get_required_features().contains_all( ceph::features::mon::FEATURE_LUMINOUS)) { osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]); } else { pgmon()->check_sub(s->sub_map["osd_pg_creates"]); } } } else if (p->first == "monmap") { monmon()->check_sub(s->sub_map[p->first]); } else if (logmon()->sub_name_to_id(p->first) >= 0) { logmon()->check_sub(s->sub_map[p->first]); } else if (p->first == "mgrmap" || p->first == "mgrdigest") { mgrmon()->check_sub(s->sub_map[p->first]); } else if (p->first == "servicemap") { mgrstatmon()->check_sub(s->sub_map[p->first]); } } if (reply) { // we only need to reply if the client is old enough to think it // has to send renewals. ConnectionRef con = m->get_connection(); if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) m->get_connection()->send_message(new MMonSubscribeAck( monmap->get_fsid(), (int)g_conf->mon_subscribe_interval)); } } void Monitor::handle_get_version(MonOpRequestRef op) { MMonGetVersion *m = static_cast(op->get_req()); dout(10) << "handle_get_version " << *m << dendl; PaxosService *svc = NULL; MonSession *s = op->get_session(); assert(s); if (!is_leader() && !is_peon()) { dout(10) << " waiting for quorum" << dendl; waitfor_quorum.push_back(new C_RetryMessage(this, op)); goto out; } if (m->what == "mdsmap") { svc = mdsmon(); } else if (m->what == "fsmap") { svc = mdsmon(); } else if (m->what == "osdmap") { svc = osdmon(); } else if (m->what == "monmap") { svc = monmon(); } else { derr << "invalid map type " << m->what << dendl; } if (svc) { if (!svc->is_readable()) { svc->wait_for_readable(op, new C_RetryMessage(this, op)); goto out; } MMonGetVersionReply *reply = new MMonGetVersionReply(); reply->handle = m->handle; reply->version = svc->get_last_committed(); reply->oldest_version = svc->get_first_committed(); reply->set_tid(m->get_tid()); m->get_connection()->send_message(reply); } out: return; } bool Monitor::ms_handle_reset(Connection *con) { dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl; // ignore lossless monitor sessions if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) return false; MonSession *s = static_cast(con->get_priv()); if (!s) return false; // break any con <-> session ref cycle s->con->set_priv(NULL); if (is_shutdown()) return false; Mutex::Locker l(lock); dout(10) << "reset/close on session " << s->inst << dendl; if (!s->closed) { Mutex::Locker l(session_map_lock); remove_session(s); } s->put(); return true; } bool Monitor::ms_handle_refused(Connection *con) { // just log for now... dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl; return false; } // ----- void Monitor::send_latest_monmap(Connection *con) { bufferlist bl; monmap->encode(bl, con->get_features()); con->send_message(new MMonMap(bl)); } void Monitor::handle_mon_get_map(MonOpRequestRef op) { MMonGetMap *m = static_cast(op->get_req()); dout(10) << "handle_mon_get_map" << dendl; send_latest_monmap(m->get_connection().get()); } void Monitor::handle_mon_metadata(MonOpRequestRef op) { MMonMetadata *m = static_cast(op->get_req()); if (is_leader()) { dout(10) << __func__ << dendl; update_mon_metadata(m->get_source().num(), std::move(m->data)); } } void Monitor::update_mon_metadata(int from, Metadata&& m) { // NOTE: this is now for legacy (kraken or jewel) mons only. pending_metadata[from] = std::move(m); MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); bufferlist bl; ::encode(pending_metadata, bl); t->put(MONITOR_STORE_PREFIX, "last_metadata", bl); paxos->trigger_propose(); } int Monitor::load_metadata() { bufferlist bl; int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl); if (r) return r; bufferlist::iterator it = bl.begin(); ::decode(mon_metadata, it); pending_metadata = mon_metadata; return 0; } int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err) { assert(f); if (!mon_metadata.count(mon)) { err << "mon." << mon << " not found"; return -EINVAL; } const Metadata& m = mon_metadata[mon]; for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) { f->dump_string(p->first.c_str(), p->second); } return 0; } void Monitor::count_metadata(const string& field, map *out) { for (auto& p : mon_metadata) { auto q = p.second.find(field); if (q == p.second.end()) { (*out)["unknown"]++; } else { (*out)[q->second]++; } } } void Monitor::count_metadata(const string& field, Formatter *f) { map by_val; count_metadata(field, &by_val); f->open_object_section(field.c_str()); for (auto& p : by_val) { f->dump_int(p.first.c_str(), p.second); } f->close_section(); } int Monitor::print_nodes(Formatter *f, ostream& err) { map > mons; // hostname => mon for (map::iterator it = mon_metadata.begin(); it != mon_metadata.end(); ++it) { const Metadata& m = it->second; Metadata::const_iterator hostname = m.find("hostname"); if (hostname == m.end()) { // not likely though continue; } mons[hostname->second].push_back(it->first); } dump_services(f, mons, "mon"); return 0; } // ---------------------------------------------- // scrub int Monitor::scrub_start() { dout(10) << __func__ << dendl; assert(is_leader()); if (!scrub_result.empty()) { clog->info() << "scrub already in progress"; return -EBUSY; } scrub_event_cancel(); scrub_result.clear(); scrub_state.reset(new ScrubState); scrub(); return 0; } int Monitor::scrub() { assert(is_leader()); assert(scrub_state); scrub_cancel_timeout(); wait_for_paxos_write(); scrub_version = paxos->get_version(); // scrub all keys if we're the only monitor in the quorum int32_t num_keys = (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys); for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) { if (*p == rank) continue; MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version, num_keys); r->key = scrub_state->last_key; messenger->send_message(r, monmap->get_inst(*p)); } // scrub my keys bool r = _scrub(&scrub_result[rank], &scrub_state->last_key, &num_keys); scrub_state->finished = !r; // only after we got our scrub results do we really care whether the // other monitors are late on their results. Also, this way we avoid // triggering the timeout if we end up getting stuck in _scrub() for // longer than the duration of the timeout. scrub_reset_timeout(); if (quorum.size() == 1) { assert(scrub_state->finished == true); scrub_finish(); } return 0; } void Monitor::handle_scrub(MonOpRequestRef op) { MMonScrub *m = static_cast(op->get_req()); dout(10) << __func__ << " " << *m << dendl; switch (m->op) { case MMonScrub::OP_SCRUB: { if (!is_peon()) break; wait_for_paxos_write(); if (m->version != paxos->get_version()) break; MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT, m->version, m->num_keys); reply->key = m->key; _scrub(&reply->result, &reply->key, &reply->num_keys); m->get_connection()->send_message(reply); } break; case MMonScrub::OP_RESULT: { if (!is_leader()) break; if (m->version != scrub_version) break; // reset the timeout each time we get a result scrub_reset_timeout(); int from = m->get_source().num(); assert(scrub_result.count(from) == 0); scrub_result[from] = m->result; if (scrub_result.size() == quorum.size()) { scrub_check_results(); scrub_result.clear(); if (scrub_state->finished) scrub_finish(); else scrub(); } } break; } } bool Monitor::_scrub(ScrubResult *r, pair *start, int *num_keys) { assert(r != NULL); assert(start != NULL); assert(num_keys != NULL); set prefixes = get_sync_targets_names(); prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc. dout(10) << __func__ << " start (" << *start << ")" << " num_keys " << *num_keys << dendl; MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes); int scrubbed_keys = 0; pair last_key; while (it->has_next_chunk()) { if (*num_keys > 0 && scrubbed_keys == *num_keys) break; pair k = it->get_next_key(); if (prefixes.count(k.first) == 0) continue; if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 && (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) { dout(10) << __func__ << " inject missing key, skipping (" << k << ")" << dendl; continue; } bufferlist bl; int err = store->get(k.first, k.second, bl); assert(err == 0); uint32_t key_crc = bl.crc32c(0); dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes" << " crc " << key_crc << dendl; r->prefix_keys[k.first]++; if (r->prefix_crc.count(k.first) == 0) { r->prefix_crc[k.first] = 0; } r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]); if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 && (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) { dout(10) << __func__ << " inject failure at (" << k << ")" << dendl; r->prefix_crc[k.first] += 1; } ++scrubbed_keys; last_key = k; } dout(20) << __func__ << " last_key (" << last_key << ")" << " scrubbed_keys " << scrubbed_keys << " has_next " << it->has_next_chunk() << dendl; *start = last_key; *num_keys = scrubbed_keys; return it->has_next_chunk(); } void Monitor::scrub_check_results() { dout(10) << __func__ << dendl; // compare int errors = 0; ScrubResult& mine = scrub_result[rank]; for (map::iterator p = scrub_result.begin(); p != scrub_result.end(); ++p) { if (p->first == rank) continue; if (p->second != mine) { ++errors; clog->error() << "scrub mismatch"; clog->error() << " mon." << rank << " " << mine; clog->error() << " mon." << p->first << " " << p->second; } } if (!errors) clog->debug() << "scrub ok on " << quorum << ": " << mine; } inline void Monitor::scrub_timeout() { dout(1) << __func__ << " restarting scrub" << dendl; scrub_reset(); scrub_start(); } void Monitor::scrub_finish() { dout(10) << __func__ << dendl; scrub_reset(); scrub_event_start(); } void Monitor::scrub_reset() { dout(10) << __func__ << dendl; scrub_cancel_timeout(); scrub_version = 0; scrub_result.clear(); scrub_state.reset(); } inline void Monitor::scrub_update_interval(int secs) { // we don't care about changes if we are not the leader. // changes will be visible if we become the leader. if (!is_leader()) return; dout(1) << __func__ << " new interval = " << secs << dendl; // if scrub already in progress, all changes will already be visible during // the next round. Nothing to do. if (scrub_state != NULL) return; scrub_event_cancel(); scrub_event_start(); } void Monitor::scrub_event_start() { dout(10) << __func__ << dendl; if (scrub_event) scrub_event_cancel(); if (cct->_conf->mon_scrub_interval <= 0) { dout(1) << __func__ << " scrub event is disabled" << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval << ")" << dendl; return; } scrub_event = timer.add_event_after( cct->_conf->mon_scrub_interval, new C_MonContext(this, [this](int) { scrub_start(); })); } void Monitor::scrub_event_cancel() { dout(10) << __func__ << dendl; if (scrub_event) { timer.cancel_event(scrub_event); scrub_event = NULL; } } inline void Monitor::scrub_cancel_timeout() { if (scrub_timeout_event) { timer.cancel_event(scrub_timeout_event); scrub_timeout_event = NULL; } } void Monitor::scrub_reset_timeout() { dout(15) << __func__ << " reset timeout event" << dendl; scrub_cancel_timeout(); scrub_timeout_event = timer.add_event_after( g_conf->mon_scrub_timeout, new C_MonContext(this, [this](int) { scrub_timeout(); })); } /************ TICK ***************/ void Monitor::new_tick() { timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) { tick(); })); } void Monitor::tick() { // ok go. dout(11) << "tick" << dendl; const utime_t now = ceph_clock_now(); // Check if we need to emit any delayed health check updated messages if (is_leader()) { const auto min_period = g_conf->get_val( "mon_health_log_update_period"); for (auto& svc : paxos_service) { auto health = svc->get_health_checks(); for (const auto &i : health.checks) { const std::string &code = i.first; const std::string &summary = i.second.summary; const health_status_t severity = i.second.severity; auto status_iter = health_check_log_times.find(code); if (status_iter == health_check_log_times.end()) { continue; } auto &log_status = status_iter->second; bool const changed = log_status.last_message != summary || log_status.severity != severity; if (changed && now - log_status.updated_at > min_period) { log_status.last_message = summary; log_status.updated_at = now; log_status.severity = severity; ostringstream ss; ss << "Health check update: " << summary << " (" << code << ")"; clog->health(severity) << ss.str(); } } } } for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) { (*p)->tick(); (*p)->maybe_trim(); } // trim sessions { Mutex::Locker l(session_map_lock); auto p = session_map.sessions.begin(); bool out_for_too_long = (!exited_quorum.is_zero() && now > (exited_quorum + 2*g_conf->mon_lease)); while (!p.end()) { MonSession *s = *p; ++p; // don't trim monitors if (s->inst.name.is_mon()) continue; if (s->session_timeout < now && s->con) { // check keepalive, too s->session_timeout = s->con->get_last_keepalive(); s->session_timeout += g_conf->mon_session_timeout; } if (s->session_timeout < now) { dout(10) << " trimming session " << s->con << " " << s->inst << " (timeout " << s->session_timeout << " < now " << now << ")" << dendl; } else if (out_for_too_long) { // boot the client Session because we've taken too long getting back in dout(10) << " trimming session " << s->con << " " << s->inst << " because we've been out of quorum too long" << dendl; } else { continue; } s->con->mark_down(); remove_session(s); logger->inc(l_mon_session_trim); } } sync_trim_providers(); if (!maybe_wait_for_quorum.empty()) { finish_contexts(g_ceph_context, maybe_wait_for_quorum); } if (is_leader() && paxos->is_active() && fingerprint.is_zero()) { // this is only necessary on upgraded clusters. MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); prepare_new_fingerprint(t); paxos->trigger_propose(); } new_tick(); } void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t) { uuid_d nf; nf.generate_random(); dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl; bufferlist bl; ::encode(nf, bl); t->put(MONITOR_NAME, "cluster_fingerprint", bl); } int Monitor::check_fsid() { bufferlist ebl; int r = store->get(MONITOR_NAME, "cluster_uuid", ebl); if (r == -ENOENT) return r; assert(r == 0); string es(ebl.c_str(), ebl.length()); // only keep the first line size_t pos = es.find_first_of('\n'); if (pos != string::npos) es.resize(pos); dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl; uuid_d ondisk; if (!ondisk.parse(es.c_str())) { derr << "error: unable to parse uuid" << dendl; return -EINVAL; } if (monmap->get_fsid() != ondisk) { derr << "error: cluster_uuid file exists with value " << ondisk << ", != our uuid " << monmap->get_fsid() << dendl; return -EEXIST; } return 0; } int Monitor::write_fsid() { auto t(std::make_shared()); write_fsid(t); int r = store->apply_transaction(t); return r; } int Monitor::write_fsid(MonitorDBStore::TransactionRef t) { ostringstream ss; ss << monmap->get_fsid() << "\n"; string us = ss.str(); bufferlist b; b.append(us); t->put(MONITOR_NAME, "cluster_uuid", b); return 0; } /* * this is the closest thing to a traditional 'mkfs' for ceph. * initialize the monitor state machines to their initial values. */ int Monitor::mkfs(bufferlist& osdmapbl) { auto t(std::make_shared()); // verify cluster fsid int r = check_fsid(); if (r < 0 && r != -ENOENT) return r; bufferlist magicbl; magicbl.append(CEPH_MON_ONDISK_MAGIC); magicbl.append("\n"); t->put(MONITOR_NAME, "magic", magicbl); features = get_initial_supported_features(); write_features(t); // save monmap, osdmap, keyring. bufferlist monmapbl; monmap->encode(monmapbl, CEPH_FEATURES_ALL); monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos() t->put("mkfs", "monmap", monmapbl); if (osdmapbl.length()) { // make sure it's a valid osdmap try { OSDMap om; om.decode(osdmapbl); } catch (buffer::error& e) { derr << "error decoding provided osdmap: " << e.what() << dendl; return -EINVAL; } t->put("mkfs", "osdmap", osdmapbl); } if (is_keyring_required()) { KeyRing keyring; string keyring_filename; r = ceph_resolve_file_search(g_conf->keyring, keyring_filename); if (r) { derr << "unable to find a keyring file on " << g_conf->keyring << ": " << cpp_strerror(r) << dendl; if (g_conf->key != "") { string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key + "\n\tcaps mon = \"allow *\"\n"; bufferlist bl; bl.append(keyring_plaintext); try { bufferlist::iterator i = bl.begin(); keyring.decode_plaintext(i); } catch (const buffer::error& e) { derr << "error decoding keyring " << keyring_plaintext << ": " << e.what() << dendl; return -EINVAL; } } else { return -ENOENT; } } else { r = keyring.load(g_ceph_context, keyring_filename); if (r < 0) { derr << "unable to load initial keyring " << g_conf->keyring << dendl; return r; } } // put mon. key in external keyring; seed with everything else. extract_save_mon_key(keyring); bufferlist keyringbl; keyring.encode_plaintext(keyringbl); t->put("mkfs", "keyring", keyringbl); } write_fsid(t); store->apply_transaction(t); return 0; } int Monitor::write_default_keyring(bufferlist& bl) { ostringstream os; os << g_conf->mon_data << "/keyring"; int err = 0; int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600); if (fd < 0) { err = -errno; dout(0) << __func__ << " failed to open " << os.str() << ": " << cpp_strerror(err) << dendl; return err; } err = bl.write_fd(fd); if (!err) ::fsync(fd); VOID_TEMP_FAILURE_RETRY(::close(fd)); return err; } void Monitor::extract_save_mon_key(KeyRing& keyring) { EntityName mon_name; mon_name.set_type(CEPH_ENTITY_TYPE_MON); EntityAuth mon_key; if (keyring.get_auth(mon_name, mon_key)) { dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl; KeyRing pkey; pkey.add(mon_name, mon_key); bufferlist bl; pkey.encode_plaintext(bl); write_default_keyring(bl); keyring.remove(mon_name); } } bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer, bool force_new) { dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id) << dendl; if (is_shutdown()) return false; // we only connect to other monitors and mgr; every else connects to us. if (service_id != CEPH_ENTITY_TYPE_MON && service_id != CEPH_ENTITY_TYPE_MGR) return false; if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) { // auth_none dout(20) << __func__ << " building auth_none authorizer" << dendl; AuthNoneClientHandler handler(g_ceph_context, nullptr); handler.set_global_id(0); *authorizer = handler.build_authorizer(service_id); return true; } CephXServiceTicketInfo auth_ticket_info; CephXSessionAuthInfo info; int ret; EntityName name; name.set_type(CEPH_ENTITY_TYPE_MON); auth_ticket_info.ticket.name = name; auth_ticket_info.ticket.global_id = 0; if (service_id == CEPH_ENTITY_TYPE_MON) { // mon to mon authentication uses the private monitor shared key and not the // rotating key CryptoKey secret; if (!keyring.get_secret(name, secret) && !key_server.get_secret(name, secret)) { dout(0) << " couldn't get secret for mon service from keyring or keyserver" << dendl; stringstream ss, ds; int err = key_server.list_secrets(ds); if (err < 0) ss << "no installed auth entries!"; else ss << "installed auth entries:"; dout(0) << ss.str() << "\n" << ds.str() << dendl; return false; } ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info, secret, (uint64_t)-1); if (ret < 0) { dout(0) << __func__ << " failed to build mon session_auth_info " << cpp_strerror(ret) << dendl; return false; } } else if (service_id == CEPH_ENTITY_TYPE_MGR) { // mgr ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info); if (ret < 0) { derr << __func__ << " failed to build mgr service session_auth_info " << cpp_strerror(ret) << dendl; return false; } } else { ceph_abort(); // see check at top of fn } CephXTicketBlob blob; if (!cephx_build_service_ticket_blob(cct, info, blob)) { dout(0) << "ms_get_authorizer failed to build service ticket" << dendl; return false; } bufferlist ticket_data; ::encode(blob, ticket_data); bufferlist::iterator iter = ticket_data.begin(); CephXTicketHandler handler(g_ceph_context, service_id); ::decode(handler.ticket, iter); handler.session_key = info.session_key; *authorizer = handler.build_authorizer(0); return true; } bool Monitor::ms_verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& authorizer_data, bufferlist& authorizer_reply, bool& isvalid, CryptoKey& session_key) { dout(10) << "ms_verify_authorizer " << con->get_peer_addr() << " " << ceph_entity_type_name(peer_type) << " protocol " << protocol << dendl; if (is_shutdown()) return false; if (peer_type == CEPH_ENTITY_TYPE_MON && auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) { // monitor, and cephx is enabled isvalid = false; if (protocol == CEPH_AUTH_CEPHX) { bufferlist::iterator iter = authorizer_data.begin(); CephXServiceTicketInfo auth_ticket_info; if (authorizer_data.length()) { bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter, auth_ticket_info, authorizer_reply); if (ret) { session_key = auth_ticket_info.session_key; isvalid = true; } else { dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl; } } } else { dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl; } } else { // who cares. isvalid = true; } return true; }