X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmon%2FMonitor.cc;fp=src%2Fceph%2Fsrc%2Fmon%2FMonitor.cc;h=ce2ab39167d96732479677feaf975a58e58bfd09;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/mon/Monitor.cc b/src/ceph/src/mon/Monitor.cc new file mode 100644 index 0000000..ce2ab39 --- /dev/null +++ b/src/ceph/src/mon/Monitor.cc @@ -0,0 +1,5917 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#include +#include +#include +#include +#include +#include +#include + +#include "Monitor.h" +#include "common/version.h" + +#include "osd/OSDMap.h" + +#include "MonitorDBStore.h" + +#include "messages/PaxosServiceMessage.h" +#include "messages/MMonMap.h" +#include "messages/MMonGetMap.h" +#include "messages/MMonGetVersion.h" +#include "messages/MMonGetVersionReply.h" +#include "messages/MGenericMessage.h" +#include "messages/MMonCommand.h" +#include "messages/MMonCommandAck.h" +#include "messages/MMonHealth.h" +#include "messages/MMonMetadata.h" +#include "messages/MMonSync.h" +#include "messages/MMonScrub.h" +#include "messages/MMonProbe.h" +#include "messages/MMonJoin.h" +#include "messages/MMonPaxos.h" +#include "messages/MRoute.h" +#include "messages/MForward.h" +#include "messages/MStatfs.h" + +#include "messages/MMonSubscribe.h" +#include "messages/MMonSubscribeAck.h" + +#include "messages/MAuthReply.h" + +#include "messages/MTimeCheck.h" +#include "messages/MPing.h" + +#include "common/strtol.h" +#include "common/ceph_argparse.h" +#include "common/Timer.h" +#include "common/Clock.h" +#include "common/errno.h" +#include "common/perf_counters.h" +#include "common/admin_socket.h" +#include "global/signal_handler.h" +#include "common/Formatter.h" +#include "include/stringify.h" +#include "include/color.h" +#include "include/ceph_fs.h" +#include "include/str_list.h" + +#include "OSDMonitor.h" +#include "MDSMonitor.h" +#include "MonmapMonitor.h" +#include "PGMonitor.h" +#include "LogMonitor.h" +#include "AuthMonitor.h" +#include "MgrMonitor.h" +#include "MgrStatMonitor.h" +#include "mon/QuorumService.h" +#include "mon/OldHealthMonitor.h" +#include "mon/HealthMonitor.h" +#include "mon/ConfigKeyService.h" +#include "common/config.h" +#include "common/cmdparse.h" +#include "include/assert.h" +#include "include/compat.h" +#include "perfglue/heap_profiler.h" + +#include "auth/none/AuthNoneClientHandler.h" + +#define dout_subsys ceph_subsys_mon +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) +static ostream& _prefix(std::ostream *_dout, const Monitor *mon) { + return *_dout << "mon." << mon->name << "@" << mon->rank + << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " "; +} + +const string Monitor::MONITOR_NAME = "monitor"; +const string Monitor::MONITOR_STORE_PREFIX = "monitor_store"; + + +#undef FLAG +#undef COMMAND +#undef COMMAND_WITH_FLAG +#define FLAG(f) (MonCommand::FLAG_##f) +#define COMMAND(parsesig, helptext, modulename, req_perms, avail) \ + {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)}, +#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \ + {parsesig, helptext, modulename, req_perms, avail, flags}, +MonCommand mon_commands[] = { +#include +}; +MonCommand pgmonitor_commands[] = { +#include +}; +#undef COMMAND +#undef COMMAND_WITH_FLAG + + +void C_MonContext::finish(int r) { + if (mon->is_shutdown()) + return; + FunctionContext::finish(r); +} + +Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, + Messenger *m, Messenger *mgr_m, MonMap *map) : + Dispatcher(cct_), + name(nm), + rank(-1), + messenger(m), + con_self(m ? m->get_loopback_connection() : NULL), + lock("Monitor::lock"), + timer(cct_, lock), + finisher(cct_, "mon_finisher", "fin"), + cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads), + has_ever_joined(false), + logger(NULL), cluster_logger(NULL), cluster_logger_registered(false), + monmap(map), + log_client(cct_, messenger, monmap, LogClient::FLAG_MON), + key_server(cct, &keyring), + auth_cluster_required(cct, + cct->_conf->auth_supported.empty() ? + cct->_conf->auth_cluster_required : cct->_conf->auth_supported), + auth_service_required(cct, + cct->_conf->auth_supported.empty() ? + cct->_conf->auth_service_required : cct->_conf->auth_supported ), + mgr_messenger(mgr_m), + mgr_client(cct_, mgr_m), + pgservice(nullptr), + store(s), + + state(STATE_PROBING), + + elector(this), + required_features(0), + leader(0), + quorum_con_features(0), + // scrub + scrub_version(0), + scrub_event(NULL), + scrub_timeout_event(NULL), + + // sync state + sync_provider_count(0), + sync_cookie(0), + sync_full(false), + sync_start_version(0), + sync_timeout_event(NULL), + sync_last_committed_floor(0), + + timecheck_round(0), + timecheck_acks(0), + timecheck_rounds_since_clean(0), + timecheck_event(NULL), + + paxos_service(PAXOS_NUM), + admin_hook(NULL), + routed_request_tid(0), + op_tracker(cct, true, 1) +{ + clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER); + audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT); + + update_log_clients(); + + paxos = new Paxos(this, "paxos"); + + paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap"); + paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap"); + paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap"); + paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap"); + paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm"); + paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth"); + paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr"); + paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat"); + paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health"); + + health_monitor = new OldHealthMonitor(this); + config_key_service = new ConfigKeyService(this, paxos); + + mon_caps = new MonCap(); + bool r = mon_caps->parse("allow *", NULL); + assert(r); + + exited_quorum = ceph_clock_now(); + + // prepare local commands + local_mon_commands.resize(ARRAY_SIZE(mon_commands)); + for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) { + local_mon_commands[i] = mon_commands[i]; + } + MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl); + + local_upgrading_mon_commands = local_mon_commands; + for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) { + local_upgrading_mon_commands.push_back(pgmonitor_commands[i]); + } + MonCommand::encode_vector(local_upgrading_mon_commands, + local_upgrading_mon_commands_bl); + + // assume our commands until we have an election. this only means + // we won't reply with EINVAL before the election; any command that + // actually matters will wait until we have quorum etc and then + // retry (and revalidate). + leader_mon_commands = local_mon_commands; + + // note: OSDMonitor may update this based on the luminous flag. + pgservice = mgrstatmon()->get_pg_stat_service(); +} + +Monitor::~Monitor() +{ + for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) + delete *p; + delete health_monitor; + delete config_key_service; + delete paxos; + assert(session_map.sessions.empty()); + delete mon_caps; +} + + +class AdminHook : public AdminSocketHook { + Monitor *mon; +public: + explicit AdminHook(Monitor *m) : mon(m) {} + bool call(std::string command, cmdmap_t& cmdmap, std::string format, + bufferlist& out) override { + stringstream ss; + mon->do_admin_command(command, cmdmap, format, ss); + out.append(ss); + return true; + } +}; + +void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format, + ostream& ss) +{ + Mutex::Locker l(lock); + + boost::scoped_ptr f(Formatter::create(format)); + + string args; + for (cmdmap_t::iterator p = cmdmap.begin(); + p != cmdmap.end(); ++p) { + if (p->first == "prefix") + continue; + if (!args.empty()) + args += ", "; + args += cmd_vartype_stringify(p->second); + } + args = "[" + args + "]"; + + bool read_only = (command == "mon_status" || + command == "mon metadata" || + command == "quorum_status" || + command == "ops" || + command == "sessions"); + + (read_only ? audit_clog->debug() : audit_clog->info()) + << "from='admin socket' entity='admin socket' " + << "cmd='" << command << "' args=" << args << ": dispatch"; + + if (command == "mon_status") { + get_mon_status(f.get(), ss); + if (f) + f->flush(ss); + } else if (command == "quorum_status") { + _quorum_status(f.get(), ss); + } else if (command == "sync_force") { + string validate; + if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) || + (validate != "--yes-i-really-mean-it")) { + ss << "are you SURE? this will mean the monitor store will be erased " + "the next time the monitor is restarted. pass " + "'--yes-i-really-mean-it' if you really do."; + goto abort; + } + sync_force(f.get(), ss); + } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) { + if (!_add_bootstrap_peer_hint(command, cmdmap, ss)) + goto abort; + } else if (command == "quorum enter") { + elector.start_participating(); + start_election(); + ss << "started responding to quorum, initiated new election"; + } else if (command == "quorum exit") { + start_election(); + elector.stop_participating(); + ss << "stopped responding to quorum, initiated new election"; + } else if (command == "ops") { + (void)op_tracker.dump_ops_in_flight(f.get()); + if (f) { + f->flush(ss); + } + } else if (command == "sessions") { + + if (f) { + f->open_array_section("sessions"); + for (auto p : session_map.sessions) { + f->dump_stream("session") << *p; + } + f->close_section(); + f->flush(ss); + } + + } else { + assert(0 == "bad AdminSocket command binding"); + } + (read_only ? audit_clog->debug() : audit_clog->info()) + << "from='admin socket' " + << "entity='admin socket' " + << "cmd=" << command << " " + << "args=" << args << ": finished"; + return; + +abort: + (read_only ? audit_clog->debug() : audit_clog->info()) + << "from='admin socket' " + << "entity='admin socket' " + << "cmd=" << command << " " + << "args=" << args << ": aborted"; +} + +void Monitor::handle_signal(int signum) +{ + assert(signum == SIGINT || signum == SIGTERM); + derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl; + shutdown(); +} + +CompatSet Monitor::get_initial_supported_features() +{ + CompatSet::FeatureSet ceph_mon_feature_compat; + CompatSet::FeatureSet ceph_mon_feature_ro_compat; + CompatSet::FeatureSet ceph_mon_feature_incompat; + ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE); + ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS); + return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat, + ceph_mon_feature_incompat); +} + +CompatSet Monitor::get_supported_features() +{ + CompatSet compat = get_initial_supported_features(); + compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES); + compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC); + compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2); + compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3); + compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN); + compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS); + return compat; +} + +CompatSet Monitor::get_legacy_features() +{ + CompatSet::FeatureSet ceph_mon_feature_compat; + CompatSet::FeatureSet ceph_mon_feature_ro_compat; + CompatSet::FeatureSet ceph_mon_feature_incompat; + ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE); + return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat, + ceph_mon_feature_incompat); +} + +int Monitor::check_features(MonitorDBStore *store) +{ + CompatSet required = get_supported_features(); + CompatSet ondisk; + + read_features_off_disk(store, &ondisk); + + if (!required.writeable(ondisk)) { + CompatSet diff = required.unsupported(ondisk); + generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl; + return -EPERM; + } + + return 0; +} + +void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features) +{ + bufferlist featuresbl; + store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl); + if (featuresbl.length() == 0) { + generic_dout(0) << "WARNING: mon fs missing feature list.\n" + << "Assuming it is old-style and introducing one." << dendl; + //we only want the baseline ~v.18 features assumed to be on disk. + //If new features are introduced this code needs to disappear or + //be made smarter. + *features = get_legacy_features(); + + features->encode(featuresbl); + auto t(std::make_shared()); + t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl); + store->apply_transaction(t); + } else { + bufferlist::iterator it = featuresbl.begin(); + features->decode(it); + } +} + +void Monitor::read_features() +{ + read_features_off_disk(store, &features); + dout(10) << "features " << features << dendl; + + calc_quorum_requirements(); + dout(10) << "required_features " << required_features << dendl; +} + +void Monitor::write_features(MonitorDBStore::TransactionRef t) +{ + bufferlist bl; + features.encode(bl); + t->put(MONITOR_NAME, COMPAT_SET_LOC, bl); +} + +const char** Monitor::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "crushtool", // helpful for testing + "mon_election_timeout", + "mon_lease", + "mon_lease_renew_interval_factor", + "mon_lease_ack_timeout_factor", + "mon_accept_timeout_factor", + // clog & admin clog + "clog_to_monitors", + "clog_to_syslog", + "clog_to_syslog_facility", + "clog_to_syslog_level", + "clog_to_graylog", + "clog_to_graylog_host", + "clog_to_graylog_port", + "host", + "fsid", + // periodic health to clog + "mon_health_to_clog", + "mon_health_to_clog_interval", + "mon_health_to_clog_tick_interval", + // scrub interval + "mon_scrub_interval", + NULL + }; + return KEYS; +} + +void Monitor::handle_conf_change(const struct md_config_t *conf, + const std::set &changed) +{ + sanitize_options(); + + dout(10) << __func__ << " " << changed << dendl; + + if (changed.count("clog_to_monitors") || + changed.count("clog_to_syslog") || + changed.count("clog_to_syslog_level") || + changed.count("clog_to_syslog_facility") || + changed.count("clog_to_graylog") || + changed.count("clog_to_graylog_host") || + changed.count("clog_to_graylog_port") || + changed.count("host") || + changed.count("fsid")) { + update_log_clients(); + } + + if (changed.count("mon_health_to_clog") || + changed.count("mon_health_to_clog_interval") || + changed.count("mon_health_to_clog_tick_interval")) { + health_to_clog_update_conf(changed); + } + + if (changed.count("mon_scrub_interval")) { + scrub_update_interval(conf->mon_scrub_interval); + } +} + +void Monitor::update_log_clients() +{ + map log_to_monitors; + map log_to_syslog; + map log_channel; + map log_prio; + map log_to_graylog; + map log_to_graylog_host; + map log_to_graylog_port; + uuid_d fsid; + string host; + + if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog, + log_channel, log_prio, log_to_graylog, + log_to_graylog_host, log_to_graylog_port, + fsid, host)) + return; + + clog->update_config(log_to_monitors, log_to_syslog, + log_channel, log_prio, log_to_graylog, + log_to_graylog_host, log_to_graylog_port, + fsid, host); + + audit_clog->update_config(log_to_monitors, log_to_syslog, + log_channel, log_prio, log_to_graylog, + log_to_graylog_host, log_to_graylog_port, + fsid, host); +} + +int Monitor::sanitize_options() +{ + int r = 0; + + // mon_lease must be greater than mon_lease_renewal; otherwise we + // may incur in leases expiring before they are renewed. + if (g_conf->mon_lease_renew_interval_factor >= 1.0) { + clog->error() << "mon_lease_renew_interval_factor (" + << g_conf->mon_lease_renew_interval_factor + << ") must be less than 1.0"; + r = -EINVAL; + } + + // mon_lease_ack_timeout must be greater than mon_lease to make sure we've + // got time to renew the lease and get an ack for it. Having both options + // with the same value, for a given small vale, could mean timing out if + // the monitors happened to be overloaded -- or even under normal load for + // a small enough value. + if (g_conf->mon_lease_ack_timeout_factor <= 1.0) { + clog->error() << "mon_lease_ack_timeout_factor (" + << g_conf->mon_lease_ack_timeout_factor + << ") must be greater than 1.0"; + r = -EINVAL; + } + + return r; +} + +int Monitor::preinit() +{ + lock.Lock(); + + dout(1) << "preinit fsid " << monmap->fsid << dendl; + + int r = sanitize_options(); + if (r < 0) { + derr << "option sanitization failed!" << dendl; + lock.Unlock(); + return r; + } + + assert(!logger); + { + PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last); + pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess", + PerfCountersBuilder::PRIO_USEFUL); + pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions", + "sadd", PerfCountersBuilder::PRIO_INTERESTING); + pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions", + "srm", PerfCountersBuilder::PRIO_INTERESTING); + pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions", + "strm", PerfCountersBuilder::PRIO_USEFUL); + pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in", + "ecnt", PerfCountersBuilder::PRIO_USEFUL); + pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started", + "estt", PerfCountersBuilder::PRIO_INTERESTING); + pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won", + "ewon", PerfCountersBuilder::PRIO_INTERESTING); + pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost", + "elst", PerfCountersBuilder::PRIO_INTERESTING); + logger = pcb.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + } + + assert(!cluster_logger); + { + PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last); + pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors"); + pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum"); + pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs"); + pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up"); + pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)"); + pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map"); + pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster"); + pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space"); + pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space"); + pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools"); + pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups"); + pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state"); + pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state"); + pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state"); + pcb.add_u64(l_cluster_num_object, "num_object", "Objects"); + pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects"); + pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects"); + pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects"); + pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects"); + pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up"); + pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)"); + pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS"); + pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map"); + cluster_logger = pcb.create_perf_counters(); + } + + paxos->init_logger(); + + // verify cluster_uuid + { + int r = check_fsid(); + if (r == -ENOENT) + r = write_fsid(); + if (r < 0) { + lock.Unlock(); + return r; + } + } + + // open compatset + read_features(); + + // have we ever joined a quorum? + has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0); + dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl; + + if (!has_ever_joined) { + // impose initial quorum restrictions? + list initial_members; + get_str_list(g_conf->mon_initial_members, initial_members); + + if (!initial_members.empty()) { + dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl; + + monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(), + &extra_probe_peers); + + dout(10) << " monmap is " << *monmap << dendl; + dout(10) << " extra probe peers " << extra_probe_peers << dendl; + } + } else if (!monmap->contains(name)) { + derr << "not in monmap and have been in a quorum before; " + << "must have been removed" << dendl; + if (g_conf->mon_force_quorum_join) { + dout(0) << "we should have died but " + << "'mon_force_quorum_join' is set -- allowing boot" << dendl; + } else { + derr << "commit suicide!" << dendl; + lock.Unlock(); + return -ENOENT; + } + } + + { + // We have a potentially inconsistent store state in hands. Get rid of it + // and start fresh. + bool clear_store = false; + if (store->exists("mon_sync", "in_sync")) { + dout(1) << __func__ << " clean up potentially inconsistent store state" + << dendl; + clear_store = true; + } + + if (store->get("mon_sync", "force_sync") > 0) { + dout(1) << __func__ << " force sync by clearing store state" << dendl; + clear_store = true; + } + + if (clear_store) { + set sync_prefixes = get_sync_targets_names(); + store->clear(sync_prefixes); + } + } + + sync_last_committed_floor = store->get("mon_sync", "last_committed_floor"); + dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl; + + init_paxos(); + health_monitor->init(); + + if (is_keyring_required()) { + // we need to bootstrap authentication keys so we can form an + // initial quorum. + if (authmon()->get_last_committed() == 0) { + dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl; + bufferlist bl; + int err = store->get("mkfs", "keyring", bl); + if (err == 0 && bl.length() > 0) { + // Attempt to decode and extract keyring only if it is found. + KeyRing keyring; + bufferlist::iterator p = bl.begin(); + ::decode(keyring, p); + extract_save_mon_key(keyring); + } + } + + string keyring_loc = g_conf->mon_data + "/keyring"; + + r = keyring.load(cct, keyring_loc); + if (r < 0) { + EntityName mon_name; + mon_name.set_type(CEPH_ENTITY_TYPE_MON); + EntityAuth mon_key; + if (key_server.get_auth(mon_name, mon_key)) { + dout(1) << "copying mon. key from old db to external keyring" << dendl; + keyring.add(mon_name, mon_key); + bufferlist bl; + keyring.encode_plaintext(bl); + write_default_keyring(bl); + } else { + derr << "unable to load initial keyring " << g_conf->keyring << dendl; + lock.Unlock(); + return r; + } + } + } + + admin_hook = new AdminHook(this); + AdminSocket* admin_socket = cct->get_admin_socket(); + + // unlock while registering to avoid mon_lock -> admin socket lock dependency. + lock.Unlock(); + r = admin_socket->register_command("mon_status", "mon_status", admin_hook, + "show current monitor status"); + assert(r == 0); + r = admin_socket->register_command("quorum_status", "quorum_status", + admin_hook, "show current quorum status"); + assert(r == 0); + r = admin_socket->register_command("sync_force", + "sync_force name=validate," + "type=CephChoices," + "strings=--yes-i-really-mean-it", + admin_hook, + "force sync of and clear monitor store"); + assert(r == 0); + r = admin_socket->register_command("add_bootstrap_peer_hint", + "add_bootstrap_peer_hint name=addr," + "type=CephIPAddr", + admin_hook, + "add peer address as potential bootstrap" + " peer for cluster bringup"); + assert(r == 0); + r = admin_socket->register_command("quorum enter", "quorum enter", + admin_hook, + "force monitor back into quorum"); + assert(r == 0); + r = admin_socket->register_command("quorum exit", "quorum exit", + admin_hook, + "force monitor out of the quorum"); + assert(r == 0); + r = admin_socket->register_command("ops", + "ops", + admin_hook, + "show the ops currently in flight"); + assert(r == 0); + r = admin_socket->register_command("sessions", + "sessions", + admin_hook, + "list existing sessions"); + assert(r == 0); + + lock.Lock(); + + // add ourselves as a conf observer + g_conf->add_observer(this); + + lock.Unlock(); + return 0; +} + +int Monitor::init() +{ + dout(2) << "init" << dendl; + Mutex::Locker l(lock); + + finisher.start(); + + // start ticker + timer.init(); + new_tick(); + + cpu_tp.start(); + + // i'm ready! + messenger->add_dispatcher_tail(this); + + mgr_client.init(); + mgr_messenger->add_dispatcher_tail(&mgr_client); + mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls + + bootstrap(); + // add features of myself into feature_map + session_map.feature_map.add_mon(con_self->get_features()); + return 0; +} + +void Monitor::init_paxos() +{ + dout(10) << __func__ << dendl; + paxos->init(); + + // init services + for (int i = 0; i < PAXOS_NUM; ++i) { + paxos_service[i]->init(); + } + + refresh_from_paxos(NULL); +} + +void Monitor::refresh_from_paxos(bool *need_bootstrap) +{ + dout(10) << __func__ << dendl; + + bufferlist bl; + int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl); + if (r >= 0) { + try { + bufferlist::iterator p = bl.begin(); + ::decode(fingerprint, p); + } + catch (buffer::error& e) { + dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl; + } + } else { + dout(10) << __func__ << " no cluster_fingerprint" << dendl; + } + + for (int i = 0; i < PAXOS_NUM; ++i) { + paxos_service[i]->refresh(need_bootstrap); + } + for (int i = 0; i < PAXOS_NUM; ++i) { + paxos_service[i]->post_refresh(); + } + load_metadata(); +} + +void Monitor::register_cluster_logger() +{ + if (!cluster_logger_registered) { + dout(10) << "register_cluster_logger" << dendl; + cluster_logger_registered = true; + cct->get_perfcounters_collection()->add(cluster_logger); + } else { + dout(10) << "register_cluster_logger - already registered" << dendl; + } +} + +void Monitor::unregister_cluster_logger() +{ + if (cluster_logger_registered) { + dout(10) << "unregister_cluster_logger" << dendl; + cluster_logger_registered = false; + cct->get_perfcounters_collection()->remove(cluster_logger); + } else { + dout(10) << "unregister_cluster_logger - not registered" << dendl; + } +} + +void Monitor::update_logger() +{ + cluster_logger->set(l_cluster_num_mon, monmap->size()); + cluster_logger->set(l_cluster_num_mon_quorum, quorum.size()); +} + +void Monitor::shutdown() +{ + dout(1) << "shutdown" << dendl; + + lock.Lock(); + + wait_for_paxos_write(); + + state = STATE_SHUTDOWN; + + g_conf->remove_observer(this); + + if (admin_hook) { + AdminSocket* admin_socket = cct->get_admin_socket(); + admin_socket->unregister_command("mon_status"); + admin_socket->unregister_command("quorum_status"); + admin_socket->unregister_command("sync_force"); + admin_socket->unregister_command("add_bootstrap_peer_hint"); + admin_socket->unregister_command("quorum enter"); + admin_socket->unregister_command("quorum exit"); + admin_socket->unregister_command("ops"); + admin_socket->unregister_command("sessions"); + delete admin_hook; + admin_hook = NULL; + } + + elector.shutdown(); + + mgr_client.shutdown(); + + lock.Unlock(); + finisher.wait_for_empty(); + finisher.stop(); + lock.Lock(); + + // clean up + paxos->shutdown(); + for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) + (*p)->shutdown(); + health_monitor->shutdown(); + + finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED); + finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED); + + timer.shutdown(); + + cpu_tp.stop(); + + remove_all_sessions(); + + if (logger) { + cct->get_perfcounters_collection()->remove(logger); + delete logger; + logger = NULL; + } + if (cluster_logger) { + if (cluster_logger_registered) + cct->get_perfcounters_collection()->remove(cluster_logger); + delete cluster_logger; + cluster_logger = NULL; + } + + log_client.shutdown(); + + // unlock before msgr shutdown... + lock.Unlock(); + + messenger->shutdown(); // last thing! ceph_mon.cc will delete mon. + mgr_messenger->shutdown(); +} + +void Monitor::wait_for_paxos_write() +{ + if (paxos->is_writing() || paxos->is_writing_previous()) { + dout(10) << __func__ << " flushing pending write" << dendl; + lock.Unlock(); + store->flush(); + lock.Lock(); + dout(10) << __func__ << " flushed pending write" << dendl; + } +} + +void Monitor::bootstrap() +{ + dout(10) << "bootstrap" << dendl; + wait_for_paxos_write(); + + sync_reset_requester(); + unregister_cluster_logger(); + cancel_probe_timeout(); + + // note my rank + int newrank = monmap->get_rank(messenger->get_myaddr()); + if (newrank < 0 && rank >= 0) { + // was i ever part of the quorum? + if (has_ever_joined) { + dout(0) << " removed from monmap, suicide." << dendl; + exit(0); + } + } + if (newrank != rank) { + dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl; + messenger->set_myname(entity_name_t::MON(newrank)); + rank = newrank; + + // reset all connections, or else our peers will think we are someone else. + messenger->mark_down_all(); + } + + // reset + state = STATE_PROBING; + + _reset(); + + // sync store + if (g_conf->mon_compact_on_bootstrap) { + dout(10) << "bootstrap -- triggering compaction" << dendl; + store->compact(); + dout(10) << "bootstrap -- finished compaction" << dendl; + } + + // singleton monitor? + if (monmap->size() == 1 && rank == 0) { + win_standalone_election(); + return; + } + + reset_probe_timeout(); + + // i'm outside the quorum + if (monmap->contains(name)) + outside_quorum.insert(name); + + // probe monitors + dout(10) << "probing other monitors" << dendl; + for (unsigned i = 0; i < monmap->size(); i++) { + if ((int)i != rank) + messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), + monmap->get_inst(i)); + } + for (set::iterator p = extra_probe_peers.begin(); + p != extra_probe_peers.end(); + ++p) { + if (*p != messenger->get_myaddr()) { + entity_inst_t i; + i.name = entity_name_t::MON(-1); + i.addr = *p; + messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i); + } + } +} + +bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss) +{ + string addrstr; + if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) { + ss << "unable to parse address string value '" + << cmd_vartype_stringify(cmdmap["addr"]) << "'"; + return false; + } + dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '" + << addrstr << "'" << dendl; + + entity_addr_t addr; + const char *end = 0; + if (!addr.parse(addrstr.c_str(), &end)) { + ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'"; + return false; + } + + if (is_leader() || is_peon()) { + ss << "mon already active; ignoring bootstrap hint"; + return true; + } + + if (addr.get_port() == 0) + addr.set_port(CEPH_MON_PORT); + + extra_probe_peers.insert(addr); + ss << "adding peer " << addr << " to list: " << extra_probe_peers; + return true; +} + +// called by bootstrap(), or on leader|peon -> electing +void Monitor::_reset() +{ + dout(10) << __func__ << dendl; + + cancel_probe_timeout(); + timecheck_finish(); + health_events_cleanup(); + health_check_log_times.clear(); + scrub_event_cancel(); + + leader_since = utime_t(); + if (!quorum.empty()) { + exited_quorum = ceph_clock_now(); + } + quorum.clear(); + outside_quorum.clear(); + quorum_feature_map.clear(); + + scrub_reset(); + + paxos->restart(); + + for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) + (*p)->restart(); + health_monitor->finish(); +} + + +// ----------------------------------------------------------- +// sync + +set Monitor::get_sync_targets_names() +{ + set targets; + targets.insert(paxos->get_name()); + for (int i = 0; i < PAXOS_NUM; ++i) + paxos_service[i]->get_store_prefixes(targets); + ConfigKeyService *config_key_service_ptr = dynamic_cast(config_key_service); + assert(config_key_service_ptr); + config_key_service_ptr->get_store_prefixes(targets); + return targets; +} + + +void Monitor::sync_timeout() +{ + dout(10) << __func__ << dendl; + assert(state == STATE_SYNCHRONIZING); + bootstrap(); +} + +void Monitor::sync_obtain_latest_monmap(bufferlist &bl) +{ + dout(1) << __func__ << dendl; + + MonMap latest_monmap; + + // Grab latest monmap from MonmapMonitor + bufferlist monmon_bl; + int err = monmon()->get_monmap(monmon_bl); + if (err < 0) { + if (err != -ENOENT) { + derr << __func__ + << " something wrong happened while reading the store: " + << cpp_strerror(err) << dendl; + assert(0 == "error reading the store"); + } + } else { + latest_monmap.decode(monmon_bl); + } + + // Grab last backed up monmap (if any) and compare epochs + if (store->exists("mon_sync", "latest_monmap")) { + bufferlist backup_bl; + int err = store->get("mon_sync", "latest_monmap", backup_bl); + if (err < 0) { + derr << __func__ + << " something wrong happened while reading the store: " + << cpp_strerror(err) << dendl; + assert(0 == "error reading the store"); + } + assert(backup_bl.length() > 0); + + MonMap backup_monmap; + backup_monmap.decode(backup_bl); + + if (backup_monmap.epoch > latest_monmap.epoch) + latest_monmap = backup_monmap; + } + + // Check if our current monmap's epoch is greater than the one we've + // got so far. + if (monmap->epoch > latest_monmap.epoch) + latest_monmap = *monmap; + + dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl; + + latest_monmap.encode(bl, CEPH_FEATURES_ALL); +} + +void Monitor::sync_reset_requester() +{ + dout(10) << __func__ << dendl; + + if (sync_timeout_event) { + timer.cancel_event(sync_timeout_event); + sync_timeout_event = NULL; + } + + sync_provider = entity_inst_t(); + sync_cookie = 0; + sync_full = false; + sync_start_version = 0; +} + +void Monitor::sync_reset_provider() +{ + dout(10) << __func__ << dendl; + sync_providers.clear(); +} + +void Monitor::sync_start(entity_inst_t &other, bool full) +{ + dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl; + + assert(state == STATE_PROBING || + state == STATE_SYNCHRONIZING); + state = STATE_SYNCHRONIZING; + + // make sure are not a provider for anyone! + sync_reset_provider(); + + sync_full = full; + + if (sync_full) { + // stash key state, and mark that we are syncing + auto t(std::make_shared()); + sync_stash_critical_state(t); + t->put("mon_sync", "in_sync", 1); + + sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version()); + dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor " + << sync_last_committed_floor << dendl; + t->put("mon_sync", "last_committed_floor", sync_last_committed_floor); + + store->apply_transaction(t); + + assert(g_conf->mon_sync_requester_kill_at != 1); + + // clear the underlying store + set targets = get_sync_targets_names(); + dout(10) << __func__ << " clearing prefixes " << targets << dendl; + store->clear(targets); + + // make sure paxos knows it has been reset. this prevents a + // bootstrap and then different probe reply order from possibly + // deciding a partial or no sync is needed. + paxos->init(); + + assert(g_conf->mon_sync_requester_kill_at != 2); + } + + // assume 'other' as the leader. We will update the leader once we receive + // a reply to the sync start. + sync_provider = other; + + sync_reset_timeout(); + + MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT); + if (!sync_full) + m->last_committed = paxos->get_version(); + messenger->send_message(m, sync_provider); +} + +void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t) +{ + dout(10) << __func__ << dendl; + bufferlist backup_monmap; + sync_obtain_latest_monmap(backup_monmap); + assert(backup_monmap.length() > 0); + t->put("mon_sync", "latest_monmap", backup_monmap); +} + +void Monitor::sync_reset_timeout() +{ + dout(10) << __func__ << dendl; + if (sync_timeout_event) + timer.cancel_event(sync_timeout_event); + sync_timeout_event = timer.add_event_after( + g_conf->mon_sync_timeout, + new C_MonContext(this, [this](int) { + sync_timeout(); + })); +} + +void Monitor::sync_finish(version_t last_committed) +{ + dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl; + + assert(g_conf->mon_sync_requester_kill_at != 7); + + if (sync_full) { + // finalize the paxos commits + auto tx(std::make_shared()); + paxos->read_and_prepare_transactions(tx, sync_start_version, + last_committed); + tx->put(paxos->get_name(), "last_committed", last_committed); + + dout(30) << __func__ << " final tx dump:\n"; + JSONFormatter f(true); + tx->dump(&f); + f.flush(*_dout); + *_dout << dendl; + + store->apply_transaction(tx); + } + + assert(g_conf->mon_sync_requester_kill_at != 8); + + auto t(std::make_shared()); + t->erase("mon_sync", "in_sync"); + t->erase("mon_sync", "force_sync"); + t->erase("mon_sync", "last_committed_floor"); + store->apply_transaction(t); + + assert(g_conf->mon_sync_requester_kill_at != 9); + + init_paxos(); + + assert(g_conf->mon_sync_requester_kill_at != 10); + + bootstrap(); +} + +void Monitor::handle_sync(MonOpRequestRef op) +{ + MMonSync *m = static_cast(op->get_req()); + dout(10) << __func__ << " " << *m << dendl; + switch (m->op) { + + // provider --------- + + case MMonSync::OP_GET_COOKIE_FULL: + case MMonSync::OP_GET_COOKIE_RECENT: + handle_sync_get_cookie(op); + break; + case MMonSync::OP_GET_CHUNK: + handle_sync_get_chunk(op); + break; + + // client ----------- + + case MMonSync::OP_COOKIE: + handle_sync_cookie(op); + break; + + case MMonSync::OP_CHUNK: + case MMonSync::OP_LAST_CHUNK: + handle_sync_chunk(op); + break; + case MMonSync::OP_NO_COOKIE: + handle_sync_no_cookie(op); + break; + + default: + dout(0) << __func__ << " unknown op " << m->op << dendl; + assert(0 == "unknown op"); + } +} + +// leader + +void Monitor::_sync_reply_no_cookie(MonOpRequestRef op) +{ + MMonSync *m = static_cast(op->get_req()); + MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie); + m->get_connection()->send_message(reply); +} + +void Monitor::handle_sync_get_cookie(MonOpRequestRef op) +{ + MMonSync *m = static_cast(op->get_req()); + if (is_synchronizing()) { + _sync_reply_no_cookie(op); + return; + } + + assert(g_conf->mon_sync_provider_kill_at != 1); + + // make sure they can understand us. + if ((required_features ^ m->get_connection()->get_features()) & + required_features) { + dout(5) << " ignoring peer mon." << m->get_source().num() + << " has features " << std::hex + << m->get_connection()->get_features() + << " but we require " << required_features << std::dec << dendl; + return; + } + + // make up a unique cookie. include election epoch (which persists + // across restarts for the whole cluster) and a counter for this + // process instance. there is no need to be unique *across* + // monitors, though. + uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count; + assert(sync_providers.count(cookie) == 0); + + dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl; + + SyncProvider& sp = sync_providers[cookie]; + sp.cookie = cookie; + sp.entity = m->get_source_inst(); + sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2); + + set sync_targets; + if (m->op == MMonSync::OP_GET_COOKIE_FULL) { + // full scan + sync_targets = get_sync_targets_names(); + sp.last_committed = paxos->get_version(); + sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets); + sp.full = true; + dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl; + } else { + // just catch up paxos + sp.last_committed = m->last_committed; + } + dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl; + + MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie); + reply->last_committed = sp.last_committed; + m->get_connection()->send_message(reply); +} + +void Monitor::handle_sync_get_chunk(MonOpRequestRef op) +{ + MMonSync *m = static_cast(op->get_req()); + dout(10) << __func__ << " " << *m << dendl; + + if (sync_providers.count(m->cookie) == 0) { + dout(10) << __func__ << " no cookie " << m->cookie << dendl; + _sync_reply_no_cookie(op); + return; + } + + assert(g_conf->mon_sync_provider_kill_at != 2); + + SyncProvider& sp = sync_providers[m->cookie]; + sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2); + + if (sp.last_committed < paxos->get_first_committed() && + paxos->get_first_committed() > 1) { + dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed + << " < our fc " << paxos->get_first_committed() << dendl; + sync_providers.erase(m->cookie); + _sync_reply_no_cookie(op); + return; + } + + MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie); + auto tx(std::make_shared()); + + int left = g_conf->mon_sync_max_payload_size; + while (sp.last_committed < paxos->get_version() && left > 0) { + bufferlist bl; + sp.last_committed++; + + int err = store->get(paxos->get_name(), sp.last_committed, bl); + assert(err == 0); + + tx->put(paxos->get_name(), sp.last_committed, bl); + left -= bl.length(); + dout(20) << __func__ << " including paxos state " << sp.last_committed + << dendl; + } + reply->last_committed = sp.last_committed; + + if (sp.full && left > 0) { + sp.synchronizer->get_chunk_tx(tx, left); + sp.last_key = sp.synchronizer->get_last_key(); + reply->last_key = sp.last_key; + } + + if ((sp.full && sp.synchronizer->has_next_chunk()) || + sp.last_committed < paxos->get_version()) { + dout(10) << __func__ << " chunk, through version " << sp.last_committed + << " key " << sp.last_key << dendl; + } else { + dout(10) << __func__ << " last chunk, through version " << sp.last_committed + << " key " << sp.last_key << dendl; + reply->op = MMonSync::OP_LAST_CHUNK; + + assert(g_conf->mon_sync_provider_kill_at != 3); + + // clean up our local state + sync_providers.erase(sp.cookie); + } + + ::encode(*tx, reply->chunk_bl); + + m->get_connection()->send_message(reply); +} + +// requester + +void Monitor::handle_sync_cookie(MonOpRequestRef op) +{ + MMonSync *m = static_cast(op->get_req()); + dout(10) << __func__ << " " << *m << dendl; + if (sync_cookie) { + dout(10) << __func__ << " already have a cookie, ignoring" << dendl; + return; + } + if (m->get_source_inst() != sync_provider) { + dout(10) << __func__ << " source does not match, discarding" << dendl; + return; + } + sync_cookie = m->cookie; + sync_start_version = m->last_committed; + + sync_reset_timeout(); + sync_get_next_chunk(); + + assert(g_conf->mon_sync_requester_kill_at != 3); +} + +void Monitor::sync_get_next_chunk() +{ + dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl; + if (g_conf->mon_inject_sync_get_chunk_delay > 0) { + dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl; + usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0)); + } + MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie); + messenger->send_message(r, sync_provider); + + assert(g_conf->mon_sync_requester_kill_at != 4); +} + +void Monitor::handle_sync_chunk(MonOpRequestRef op) +{ + MMonSync *m = static_cast(op->get_req()); + dout(10) << __func__ << " " << *m << dendl; + + if (m->cookie != sync_cookie) { + dout(10) << __func__ << " cookie does not match, discarding" << dendl; + return; + } + if (m->get_source_inst() != sync_provider) { + dout(10) << __func__ << " source does not match, discarding" << dendl; + return; + } + + assert(state == STATE_SYNCHRONIZING); + assert(g_conf->mon_sync_requester_kill_at != 5); + + auto tx(std::make_shared()); + tx->append_from_encoded(m->chunk_bl); + + dout(30) << __func__ << " tx dump:\n"; + JSONFormatter f(true); + tx->dump(&f); + f.flush(*_dout); + *_dout << dendl; + + store->apply_transaction(tx); + + assert(g_conf->mon_sync_requester_kill_at != 6); + + if (!sync_full) { + dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl; + auto tx(std::make_shared()); + paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1, + m->last_committed); + tx->put(paxos->get_name(), "last_committed", m->last_committed); + + dout(30) << __func__ << " tx dump:\n"; + JSONFormatter f(true); + tx->dump(&f); + f.flush(*_dout); + *_dout << dendl; + + store->apply_transaction(tx); + paxos->init(); // to refresh what we just wrote + } + + if (m->op == MMonSync::OP_CHUNK) { + sync_reset_timeout(); + sync_get_next_chunk(); + } else if (m->op == MMonSync::OP_LAST_CHUNK) { + sync_finish(m->last_committed); + } +} + +void Monitor::handle_sync_no_cookie(MonOpRequestRef op) +{ + dout(10) << __func__ << dendl; + bootstrap(); +} + +void Monitor::sync_trim_providers() +{ + dout(20) << __func__ << dendl; + + utime_t now = ceph_clock_now(); + map::iterator p = sync_providers.begin(); + while (p != sync_providers.end()) { + if (now > p->second.timeout) { + dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl; + sync_providers.erase(p++); + } else { + ++p; + } + } +} + +// --------------------------------------------------- +// probe + +void Monitor::cancel_probe_timeout() +{ + if (probe_timeout_event) { + dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl; + timer.cancel_event(probe_timeout_event); + probe_timeout_event = NULL; + } else { + dout(10) << "cancel_probe_timeout (none scheduled)" << dendl; + } +} + +void Monitor::reset_probe_timeout() +{ + cancel_probe_timeout(); + probe_timeout_event = new C_MonContext(this, [this](int r) { + probe_timeout(r); + }); + double t = g_conf->mon_probe_timeout; + if (timer.add_event_after(t, probe_timeout_event)) { + dout(10) << "reset_probe_timeout " << probe_timeout_event + << " after " << t << " seconds" << dendl; + } else { + probe_timeout_event = nullptr; + } +} + +void Monitor::probe_timeout(int r) +{ + dout(4) << "probe_timeout " << probe_timeout_event << dendl; + assert(is_probing() || is_synchronizing()); + assert(probe_timeout_event); + probe_timeout_event = NULL; + bootstrap(); +} + +void Monitor::handle_probe(MonOpRequestRef op) +{ + MMonProbe *m = static_cast(op->get_req()); + dout(10) << "handle_probe " << *m << dendl; + + if (m->fsid != monmap->fsid) { + dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl; + return; + } + + switch (m->op) { + case MMonProbe::OP_PROBE: + handle_probe_probe(op); + break; + + case MMonProbe::OP_REPLY: + handle_probe_reply(op); + break; + + case MMonProbe::OP_MISSING_FEATURES: + derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL + << ", required " << m->required_features + << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) + << dendl; + break; + } +} + +void Monitor::handle_probe_probe(MonOpRequestRef op) +{ + MMonProbe *m = static_cast(op->get_req()); + + dout(10) << "handle_probe_probe " << m->get_source_inst() << *m + << " features " << m->get_connection()->get_features() << dendl; + uint64_t missing = required_features & ~m->get_connection()->get_features(); + if (missing) { + dout(1) << " peer " << m->get_source_addr() << " missing features " + << missing << dendl; + if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) { + MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES, + name, has_ever_joined); + m->required_features = required_features; + m->get_connection()->send_message(r); + } + goto out; + } + + if (!is_probing() && !is_synchronizing()) { + // If the probing mon is way ahead of us, we need to re-bootstrap. + // Normally we capture this case when we initially bootstrap, but + // it is possible we pass those checks (we overlap with + // quorum-to-be) but fail to join a quorum before it moves past + // us. We need to be kicked back to bootstrap so we can + // synchonize, not keep calling elections. + if (paxos->get_version() + 1 < m->paxos_first_version) { + dout(1) << " peer " << m->get_source_addr() << " has first_committed " + << "ahead of us, re-bootstrapping" << dendl; + bootstrap(); + goto out; + + } + } + + MMonProbe *r; + r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined); + r->name = name; + r->quorum = quorum; + monmap->encode(r->monmap_bl, m->get_connection()->get_features()); + r->paxos_first_version = paxos->get_first_committed(); + r->paxos_last_version = paxos->get_version(); + m->get_connection()->send_message(r); + + // did we discover a peer here? + if (!monmap->contains(m->get_source_addr())) { + dout(1) << " adding peer " << m->get_source_addr() + << " to list of hints" << dendl; + extra_probe_peers.insert(m->get_source_addr()); + } + + out: + return; +} + +void Monitor::handle_probe_reply(MonOpRequestRef op) +{ + MMonProbe *m = static_cast(op->get_req()); + dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl; + dout(10) << " monmap is " << *monmap << dendl; + + // discover name and addrs during probing or electing states. + if (!is_probing() && !is_electing()) { + return; + } + + // newer map, or they've joined a quorum and we haven't? + bufferlist mybl; + monmap->encode(mybl, m->get_connection()->get_features()); + // make sure it's actually different; the checks below err toward + // taking the other guy's map, which could cause us to loop. + if (!mybl.contents_equal(m->monmap_bl)) { + MonMap *newmap = new MonMap; + newmap->decode(m->monmap_bl); + if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() || + !has_ever_joined)) { + dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch() + << ", mine was " << monmap->get_epoch() << dendl; + delete newmap; + monmap->decode(m->monmap_bl); + + bootstrap(); + return; + } + delete newmap; + } + + // rename peer? + string peer_name = monmap->get_name(m->get_source_addr()); + if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) { + dout(10) << " renaming peer " << m->get_source_addr() << " " + << peer_name << " -> " << m->name << " in my monmap" + << dendl; + monmap->rename(peer_name, m->name); + + if (is_electing()) { + bootstrap(); + return; + } + } else { + dout(10) << " peer name is " << peer_name << dendl; + } + + // new initial peer? + if (monmap->get_epoch() == 0 && + monmap->contains(m->name) && + monmap->get_addr(m->name).is_blank_ip()) { + dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl; + monmap->set_addr(m->name, m->get_source_addr()); + + bootstrap(); + return; + } + + // end discover phase + if (!is_probing()) { + return; + } + + assert(paxos != NULL); + + if (is_synchronizing()) { + dout(10) << " currently syncing" << dendl; + return; + } + + entity_inst_t other = m->get_source_inst(); + + if (m->paxos_last_version < sync_last_committed_floor) { + dout(10) << " peer paxos versions [" << m->paxos_first_version + << "," << m->paxos_last_version << "] < my sync_last_committed_floor " + << sync_last_committed_floor << ", ignoring" + << dendl; + } else { + if (paxos->get_version() < m->paxos_first_version && + m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1. + dout(10) << " peer paxos first versions [" << m->paxos_first_version + << "," << m->paxos_last_version << "]" + << " vs my version " << paxos->get_version() + << " (too far ahead)" + << dendl; + cancel_probe_timeout(); + sync_start(other, true); + return; + } + if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) { + dout(10) << " peer paxos last version " << m->paxos_last_version + << " vs my version " << paxos->get_version() + << " (too far ahead)" + << dendl; + cancel_probe_timeout(); + sync_start(other, false); + return; + } + } + + // is there an existing quorum? + if (m->quorum.size()) { + dout(10) << " existing quorum " << m->quorum << dendl; + + dout(10) << " peer paxos version " << m->paxos_last_version + << " vs my version " << paxos->get_version() + << " (ok)" + << dendl; + + if (monmap->contains(name) && + !monmap->get_addr(name).is_blank_ip()) { + // i'm part of the cluster; just initiate a new election + start_election(); + } else { + dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl; + messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()), + monmap->get_inst(*m->quorum.begin())); + } + } else { + if (monmap->contains(m->name)) { + dout(10) << " mon." << m->name << " is outside the quorum" << dendl; + outside_quorum.insert(m->name); + } else { + dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl; + return; + } + + unsigned need = monmap->size() / 2 + 1; + dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl; + if (outside_quorum.size() >= need) { + if (outside_quorum.count(name)) { + dout(10) << " that's enough to form a new quorum, calling election" << dendl; + start_election(); + } else { + dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl; + } + } else { + dout(10) << " that's not yet enough for a new quorum, waiting" << dendl; + } + } +} + +void Monitor::join_election() +{ + dout(10) << __func__ << dendl; + wait_for_paxos_write(); + _reset(); + state = STATE_ELECTING; + + logger->inc(l_mon_num_elections); +} + +void Monitor::start_election() +{ + dout(10) << "start_election" << dendl; + wait_for_paxos_write(); + _reset(); + state = STATE_ELECTING; + + logger->inc(l_mon_num_elections); + logger->inc(l_mon_election_call); + + clog->info() << "mon." << name << " calling new monitor election"; + elector.call_election(); +} + +void Monitor::win_standalone_election() +{ + dout(1) << "win_standalone_election" << dendl; + + // bump election epoch, in case the previous epoch included other + // monitors; we need to be able to make the distinction. + elector.init(); + elector.advance_epoch(); + + rank = monmap->get_rank(name); + assert(rank == 0); + set q; + q.insert(rank); + + map metadata; + collect_metadata(&metadata[0]); + + win_election(elector.get_epoch(), q, + CEPH_FEATURES_ALL, + ceph::features::mon::get_supported(), + metadata); +} + +const utime_t& Monitor::get_leader_since() const +{ + assert(state == STATE_LEADER); + return leader_since; +} + +epoch_t Monitor::get_epoch() +{ + return elector.get_epoch(); +} + +void Monitor::_finish_svc_election() +{ + assert(state == STATE_LEADER || state == STATE_PEON); + + for (auto p : paxos_service) { + // we already called election_finished() on monmon(); avoid callig twice + if (state == STATE_LEADER && p == monmon()) + continue; + p->election_finished(); + } +} + +void Monitor::win_election(epoch_t epoch, set& active, uint64_t features, + const mon_feature_t& mon_features, + const map& metadata) +{ + dout(10) << __func__ << " epoch " << epoch << " quorum " << active + << " features " << features + << " mon_features " << mon_features + << dendl; + assert(is_electing()); + state = STATE_LEADER; + leader_since = ceph_clock_now(); + leader = rank; + quorum = active; + quorum_con_features = features; + quorum_mon_features = mon_features; + pending_metadata = metadata; + outside_quorum.clear(); + + clog->info() << "mon." << name << "@" << rank + << " won leader election with quorum " << quorum; + + set_leader_commands(get_local_commands(mon_features)); + + paxos->leader_init(); + // NOTE: tell monmap monitor first. This is important for the + // bootstrap case to ensure that the very first paxos proposal + // codifies the monmap. Otherwise any manner of chaos can ensue + // when monitors are call elections or participating in a paxos + // round without agreeing on who the participants are. + monmon()->election_finished(); + _finish_svc_election(); + health_monitor->start(epoch); + + logger->inc(l_mon_election_win); + + // inject new metadata in first transaction. + { + // include previous metadata for missing mons (that aren't part of + // the current quorum). + map m = metadata; + for (unsigned rank = 0; rank < monmap->size(); ++rank) { + if (m.count(rank) == 0 && + mon_metadata.count(rank)) { + m[rank] = mon_metadata[rank]; + } + } + + // FIXME: This is a bit sloppy because we aren't guaranteed to submit + // a new transaction immediately after the election finishes. We should + // do that anyway for other reasons, though. + MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); + bufferlist bl; + ::encode(m, bl); + t->put(MONITOR_STORE_PREFIX, "last_metadata", bl); + } + + finish_election(); + if (monmap->size() > 1 && + monmap->get_epoch() > 0) { + timecheck_start(); + health_tick_start(); + do_health_to_clog_interval(); + scrub_event_start(); + } +} + +void Monitor::lose_election(epoch_t epoch, set &q, int l, + uint64_t features, + const mon_feature_t& mon_features) +{ + state = STATE_PEON; + leader_since = utime_t(); + leader = l; + quorum = q; + outside_quorum.clear(); + quorum_con_features = features; + quorum_mon_features = mon_features; + dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader + << " quorum is " << quorum << " features are " << quorum_con_features + << " mon_features are " << quorum_mon_features + << dendl; + + paxos->peon_init(); + _finish_svc_election(); + health_monitor->start(epoch); + + logger->inc(l_mon_election_lose); + + finish_election(); + + if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) && + !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) { + // for pre-luminous mons only + Metadata sys_info; + collect_metadata(&sys_info); + messenger->send_message(new MMonMetadata(sys_info), + monmap->get_inst(get_leader())); + } +} + +void Monitor::collect_metadata(Metadata *m) +{ + collect_sys_info(m, g_ceph_context); + (*m)["addr"] = stringify(messenger->get_myaddr()); +} + +void Monitor::finish_election() +{ + apply_quorum_to_compatset_features(); + apply_monmap_to_compatset_features(); + timecheck_finish(); + exited_quorum = utime_t(); + finish_contexts(g_ceph_context, waitfor_quorum); + finish_contexts(g_ceph_context, maybe_wait_for_quorum); + resend_routed_requests(); + update_logger(); + register_cluster_logger(); + + // am i named properly? + string cur_name = monmap->get_name(messenger->get_myaddr()); + if (cur_name != name) { + dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl; + messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()), + monmap->get_inst(*quorum.begin())); + } +} + +void Monitor::_apply_compatset_features(CompatSet &new_features) +{ + if (new_features.compare(features) != 0) { + CompatSet diff = features.unsupported(new_features); + dout(1) << __func__ << " enabling new quorum features: " << diff << dendl; + features = new_features; + + auto t = std::make_shared(); + write_features(t); + store->apply_transaction(t); + + calc_quorum_requirements(); + } +} + +void Monitor::apply_quorum_to_compatset_features() +{ + CompatSet new_features(features); + if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) { + new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES); + } + if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) { + new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC); + } + if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) { + new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2); + } + if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) { + new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3); + } + dout(5) << __func__ << dendl; + _apply_compatset_features(new_features); +} + +void Monitor::apply_monmap_to_compatset_features() +{ + CompatSet new_features(features); + mon_feature_t monmap_features = monmap->get_required_features(); + + /* persistent monmap features may go into the compatset. + * optional monmap features may not - why? + * because optional monmap features may be set/unset by the admin, + * and possibly by other means that haven't yet been thought out, + * so we can't make the monitor enforce them on start - because they + * may go away. + * this, of course, does not invalidate setting a compatset feature + * for an optional feature - as long as you make sure to clean it up + * once you unset it. + */ + if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) { + assert(ceph::features::mon::get_persistent().contains_all( + ceph::features::mon::FEATURE_KRAKEN)); + // this feature should only ever be set if the quorum supports it. + assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN)); + new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN); + } + if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) { + assert(ceph::features::mon::get_persistent().contains_all( + ceph::features::mon::FEATURE_LUMINOUS)); + // this feature should only ever be set if the quorum supports it. + assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)); + new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS); + } + + dout(5) << __func__ << dendl; + _apply_compatset_features(new_features); +} + +void Monitor::calc_quorum_requirements() +{ + required_features = 0; + + // compatset + if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) { + required_features |= CEPH_FEATURE_OSD_ERASURE_CODES; + } + if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) { + required_features |= CEPH_FEATURE_OSDMAP_ENC; + } + if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) { + required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2; + } + if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) { + required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3; + } + if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) { + required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN; + } + if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) { + required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS; + } + + // monmap + if (monmap->get_required_features().contains_all( + ceph::features::mon::FEATURE_KRAKEN)) { + required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN; + } + if (monmap->get_required_features().contains_all( + ceph::features::mon::FEATURE_LUMINOUS)) { + required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS; + } + dout(10) << __func__ << " required_features " << required_features << dendl; +} + +void Monitor::get_combined_feature_map(FeatureMap *fm) +{ + *fm += session_map.feature_map; + for (auto id : quorum) { + if (id != rank) { + *fm += quorum_feature_map[id]; + } + } +} + +void Monitor::sync_force(Formatter *f, ostream& ss) +{ + bool free_formatter = false; + + if (!f) { + // louzy/lazy hack: default to json if no formatter has been defined + f = new JSONFormatter(); + free_formatter = true; + } + + auto tx(std::make_shared()); + sync_stash_critical_state(tx); + tx->put("mon_sync", "force_sync", 1); + store->apply_transaction(tx); + + f->open_object_section("sync_force"); + f->dump_int("ret", 0); + f->dump_stream("msg") << "forcing store sync the next time the monitor starts"; + f->close_section(); // sync_force + f->flush(ss); + if (free_formatter) + delete f; +} + +void Monitor::_quorum_status(Formatter *f, ostream& ss) +{ + bool free_formatter = false; + + if (!f) { + // louzy/lazy hack: default to json if no formatter has been defined + f = new JSONFormatter(); + free_formatter = true; + } + f->open_object_section("quorum_status"); + f->dump_int("election_epoch", get_epoch()); + + f->open_array_section("quorum"); + for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) + f->dump_int("mon", *p); + f->close_section(); // quorum + + list quorum_names = get_quorum_names(); + f->open_array_section("quorum_names"); + for (list::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p) + f->dump_string("mon", *p); + f->close_section(); // quorum_names + + f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin())); + + f->open_object_section("monmap"); + monmap->dump(f); + f->close_section(); // monmap + + f->close_section(); // quorum_status + f->flush(ss); + if (free_formatter) + delete f; +} + +void Monitor::get_mon_status(Formatter *f, ostream& ss) +{ + bool free_formatter = false; + + if (!f) { + // louzy/lazy hack: default to json if no formatter has been defined + f = new JSONFormatter(); + free_formatter = true; + } + + f->open_object_section("mon_status"); + f->dump_string("name", name); + f->dump_int("rank", rank); + f->dump_string("state", get_state_name()); + f->dump_int("election_epoch", get_epoch()); + + f->open_array_section("quorum"); + for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) { + f->dump_int("mon", *p); + } + + f->close_section(); // quorum + + f->open_object_section("features"); + f->dump_stream("required_con") << required_features; + mon_feature_t req_mon_features = get_required_mon_features(); + req_mon_features.dump(f, "required_mon"); + f->dump_stream("quorum_con") << quorum_con_features; + quorum_mon_features.dump(f, "quorum_mon"); + f->close_section(); // features + + f->open_array_section("outside_quorum"); + for (set::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p) + f->dump_string("mon", *p); + f->close_section(); // outside_quorum + + f->open_array_section("extra_probe_peers"); + for (set::iterator p = extra_probe_peers.begin(); + p != extra_probe_peers.end(); + ++p) + f->dump_stream("peer") << *p; + f->close_section(); // extra_probe_peers + + f->open_array_section("sync_provider"); + for (map::const_iterator p = sync_providers.begin(); + p != sync_providers.end(); + ++p) { + f->dump_unsigned("cookie", p->second.cookie); + f->dump_stream("entity") << p->second.entity; + f->dump_stream("timeout") << p->second.timeout; + f->dump_unsigned("last_committed", p->second.last_committed); + f->dump_stream("last_key") << p->second.last_key; + } + f->close_section(); + + if (is_synchronizing()) { + f->open_object_section("sync"); + f->dump_stream("sync_provider") << sync_provider; + f->dump_unsigned("sync_cookie", sync_cookie); + f->dump_unsigned("sync_start_version", sync_start_version); + f->close_section(); + } + + if (g_conf->mon_sync_provider_kill_at > 0) + f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at); + if (g_conf->mon_sync_requester_kill_at > 0) + f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at); + + f->open_object_section("monmap"); + monmap->dump(f); + f->close_section(); + + f->dump_object("feature_map", session_map.feature_map); + f->close_section(); // mon_status + + if (free_formatter) { + // flush formatter to ss and delete it iff we created the formatter + f->flush(ss); + delete f; + } +} + + +// health status to clog + +void Monitor::health_tick_start() +{ + if (!cct->_conf->mon_health_to_clog || + cct->_conf->mon_health_to_clog_tick_interval <= 0) + return; + + dout(15) << __func__ << dendl; + + health_tick_stop(); + health_tick_event = timer.add_event_after( + cct->_conf->mon_health_to_clog_tick_interval, + new C_MonContext(this, [this](int r) { + if (r < 0) + return; + do_health_to_clog(); + health_tick_start(); + })); +} + +void Monitor::health_tick_stop() +{ + dout(15) << __func__ << dendl; + + if (health_tick_event) { + timer.cancel_event(health_tick_event); + health_tick_event = NULL; + } +} + +utime_t Monitor::health_interval_calc_next_update() +{ + utime_t now = ceph_clock_now(); + + time_t secs = now.sec(); + int remainder = secs % cct->_conf->mon_health_to_clog_interval; + int adjustment = cct->_conf->mon_health_to_clog_interval - remainder; + utime_t next = utime_t(secs + adjustment, 0); + + dout(20) << __func__ + << " now: " << now << "," + << " next: " << next << "," + << " interval: " << cct->_conf->mon_health_to_clog_interval + << dendl; + + return next; +} + +void Monitor::health_interval_start() +{ + dout(15) << __func__ << dendl; + + if (!cct->_conf->mon_health_to_clog || + cct->_conf->mon_health_to_clog_interval <= 0) { + return; + } + + health_interval_stop(); + utime_t next = health_interval_calc_next_update(); + health_interval_event = new C_MonContext(this, [this](int r) { + if (r < 0) + return; + do_health_to_clog_interval(); + }); + if (!timer.add_event_at(next, health_interval_event)) { + health_interval_event = nullptr; + } +} + +void Monitor::health_interval_stop() +{ + dout(15) << __func__ << dendl; + if (health_interval_event) { + timer.cancel_event(health_interval_event); + } + health_interval_event = NULL; +} + +void Monitor::health_events_cleanup() +{ + health_tick_stop(); + health_interval_stop(); + health_status_cache.reset(); +} + +void Monitor::health_to_clog_update_conf(const std::set &changed) +{ + dout(20) << __func__ << dendl; + + if (changed.count("mon_health_to_clog")) { + if (!cct->_conf->mon_health_to_clog) { + health_events_cleanup(); + } else { + if (!health_tick_event) { + health_tick_start(); + } + if (!health_interval_event) { + health_interval_start(); + } + } + } + + if (changed.count("mon_health_to_clog_interval")) { + if (cct->_conf->mon_health_to_clog_interval <= 0) { + health_interval_stop(); + } else { + health_interval_start(); + } + } + + if (changed.count("mon_health_to_clog_tick_interval")) { + if (cct->_conf->mon_health_to_clog_tick_interval <= 0) { + health_tick_stop(); + } else { + health_tick_start(); + } + } +} + +void Monitor::do_health_to_clog_interval() +{ + // outputting to clog may have been disabled in the conf + // since we were scheduled. + if (!cct->_conf->mon_health_to_clog || + cct->_conf->mon_health_to_clog_interval <= 0) + return; + + dout(10) << __func__ << dendl; + + // do we have a cached value for next_clog_update? if not, + // do we know when the last update was? + + do_health_to_clog(true); + health_interval_start(); +} + +void Monitor::do_health_to_clog(bool force) +{ + // outputting to clog may have been disabled in the conf + // since we were scheduled. + if (!cct->_conf->mon_health_to_clog || + cct->_conf->mon_health_to_clog_interval <= 0) + return; + + dout(10) << __func__ << (force ? " (force)" : "") << dendl; + + if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + string summary; + health_status_t level = get_health_status(false, nullptr, &summary); + if (!force && + summary == health_status_cache.summary && + level == health_status_cache.overall) + return; + clog->health(level) << "overall " << summary; + health_status_cache.summary = summary; + health_status_cache.overall = level; + } else { + // for jewel only + list status; + health_status_t overall = get_health(status, NULL, NULL); + dout(25) << __func__ + << (force ? " (force)" : "") + << dendl; + + string summary = joinify(status.begin(), status.end(), string("; ")); + + if (!force && + overall == health_status_cache.overall && + !health_status_cache.summary.empty() && + health_status_cache.summary == summary) { + // we got a dup! + return; + } + + clog->info() << summary; + + health_status_cache.overall = overall; + health_status_cache.summary = summary; + } +} + +health_status_t Monitor::get_health_status( + bool want_detail, + Formatter *f, + std::string *plain, + const char *sep1, + const char *sep2) +{ + health_status_t r = HEALTH_OK; + bool compat = g_conf->mon_health_preluminous_compat; + bool compat_warn = g_conf->get_val("mon_health_preluminous_compat_warning"); + if (f) { + f->open_object_section("health"); + f->open_object_section("checks"); + } + + string summary; + string *psummary = f ? nullptr : &summary; + for (auto& svc : paxos_service) { + r = std::min(r, svc->get_health_checks().dump_summary( + f, psummary, sep2, want_detail)); + } + + if (f) { + f->close_section(); + f->dump_stream("status") << r; + } else { + // one-liner: HEALTH_FOO[ thing1[; thing2 ...]] + *plain = stringify(r); + if (summary.size()) { + *plain += sep1; + *plain += summary; + } + *plain += "\n"; + } + + const std::string old_fields_message = "'ceph health' JSON format has " + "changed in luminous. If you see this your monitoring system is " + "scraping the wrong fields. Disable this with 'mon health preluminous " + "compat warning = false'"; + + if (f && (compat || compat_warn)) { + health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r; + f->open_array_section("summary"); + if (compat_warn) { + f->open_object_section("item"); + f->dump_stream("severity") << HEALTH_WARN; + f->dump_string("summary", old_fields_message); + f->close_section(); + } + if (compat) { + for (auto& svc : paxos_service) { + svc->get_health_checks().dump_summary_compat(f); + } + } + f->close_section(); + f->dump_stream("overall_status") << cr; + } + + if (want_detail) { + if (f && (compat || compat_warn)) { + f->open_array_section("detail"); + if (compat_warn) { + f->dump_string("item", old_fields_message); + } + } + + for (auto& svc : paxos_service) { + svc->get_health_checks().dump_detail(f, plain, compat); + } + + if (f && (compat || compat_warn)) { + f->close_section(); + } + } + if (f) { + f->close_section(); + } + return r; +} + +void Monitor::log_health( + const health_check_map_t& updated, + const health_check_map_t& previous, + MonitorDBStore::TransactionRef t) +{ + if (!g_conf->mon_health_to_clog) { + return; + } + + const utime_t now = ceph_clock_now(); + + // FIXME: log atomically as part of @t instead of using clog. + dout(10) << __func__ << " updated " << updated.checks.size() + << " previous " << previous.checks.size() + << dendl; + const auto min_log_period = g_conf->get_val( + "mon_health_log_update_period"); + for (auto& p : updated.checks) { + auto q = previous.checks.find(p.first); + bool logged = false; + if (q == previous.checks.end()) { + // new + ostringstream ss; + ss << "Health check failed: " << p.second.summary << " (" + << p.first << ")"; + clog->health(p.second.severity) << ss.str(); + + logged = true; + } else { + if (p.second.summary != q->second.summary || + p.second.severity != q->second.severity) { + + auto status_iter = health_check_log_times.find(p.first); + if (status_iter != health_check_log_times.end()) { + if (p.second.severity == q->second.severity && + now - status_iter->second.updated_at < min_log_period) { + // We already logged this recently and the severity is unchanged, + // so skip emitting an update of the summary string. + // We'll get an update out of tick() later if the check + // is still failing. + continue; + } + } + + // summary or severity changed (ignore detail changes at this level) + ostringstream ss; + ss << "Health check update: " << p.second.summary << " (" << p.first << ")"; + clog->health(p.second.severity) << ss.str(); + + logged = true; + } + } + // Record the time at which we last logged, so that we can check this + // when considering whether/when to print update messages. + if (logged) { + auto iter = health_check_log_times.find(p.first); + if (iter == health_check_log_times.end()) { + health_check_log_times.emplace(p.first, HealthCheckLogStatus( + p.second.severity, p.second.summary, now)); + } else { + iter->second = HealthCheckLogStatus( + p.second.severity, p.second.summary, now); + } + } + } + for (auto& p : previous.checks) { + if (!updated.checks.count(p.first)) { + // cleared + ostringstream ss; + if (p.first == "DEGRADED_OBJECTS") { + clog->info() << "All degraded objects recovered"; + } else if (p.first == "OSD_FLAGS") { + clog->info() << "OSD flags cleared"; + } else { + clog->info() << "Health check cleared: " << p.first << " (was: " + << p.second.summary << ")"; + } + + if (health_check_log_times.count(p.first)) { + health_check_log_times.erase(p.first); + } + } + } + + if (previous.checks.size() && updated.checks.size() == 0) { + // We might be going into a fully healthy state, check + // other subsystems + bool any_checks = false; + for (auto& svc : paxos_service) { + if (&(svc->get_health_checks()) == &(previous)) { + // Ignore the ones we're clearing right now + continue; + } + + if (svc->get_health_checks().checks.size() > 0) { + any_checks = true; + break; + } + } + if (!any_checks) { + clog->info() << "Cluster is now healthy"; + } + } +} + +health_status_t Monitor::get_health(list& status, + bufferlist *detailbl, + Formatter *f) +{ + list > summary; + list > detail; + + if (f) + f->open_object_section("health"); + + for (vector::iterator p = paxos_service.begin(); + p != paxos_service.end(); + ++p) { + PaxosService *s = *p; + s->get_health(summary, detailbl ? &detail : NULL, cct); + } + + health_monitor->get_health(summary, (detailbl ? &detail : NULL)); + + health_status_t overall = HEALTH_OK; + if (!timecheck_skews.empty()) { + list warns; + for (map::iterator i = timecheck_skews.begin(); + i != timecheck_skews.end(); ++i) { + entity_inst_t inst = i->first; + double skew = i->second; + double latency = timecheck_latencies[inst]; + string name = monmap->get_name(inst.addr); + ostringstream tcss; + health_status_t tcstatus = timecheck_status(tcss, skew, latency); + if (tcstatus != HEALTH_OK) { + if (overall > tcstatus) + overall = tcstatus; + warns.push_back(name); + ostringstream tmp_ss; + tmp_ss << "mon." << name + << " addr " << inst.addr << " " << tcss.str() + << " (latency " << latency << "s)"; + detail.push_back(make_pair(tcstatus, tmp_ss.str())); + } + } + if (!warns.empty()) { + ostringstream ss; + ss << "clock skew detected on"; + while (!warns.empty()) { + ss << " mon." << warns.front(); + warns.pop_front(); + if (!warns.empty()) + ss << ","; + } + status.push_back(ss.str()); + summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected ")); + } + } + + if (f) + f->open_array_section("summary"); + if (!summary.empty()) { + while (!summary.empty()) { + if (overall > summary.front().first) + overall = summary.front().first; + status.push_back(summary.front().second); + if (f) { + f->open_object_section("item"); + f->dump_stream("severity") << summary.front().first; + f->dump_string("summary", summary.front().second); + f->close_section(); + } + summary.pop_front(); + } + } + if (f) + f->close_section(); + + stringstream fss; + fss << overall; + status.push_front(fss.str()); + if (f) + f->dump_stream("overall_status") << overall; + + if (f) + f->open_array_section("detail"); + while (!detail.empty()) { + if (f) + f->dump_string("item", detail.front().second); + else if (detailbl != NULL) { + detailbl->append(detail.front().second); + detailbl->append('\n'); + } + detail.pop_front(); + } + if (f) + f->close_section(); + + if (f) + f->close_section(); + + return overall; +} + +void Monitor::get_cluster_status(stringstream &ss, Formatter *f) +{ + if (f) + f->open_object_section("status"); + + if (f) { + f->dump_stream("fsid") << monmap->get_fsid(); + if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + get_health_status(false, f, nullptr); + } else { + list health_str; + get_health(health_str, nullptr, f); + } + f->dump_unsigned("election_epoch", get_epoch()); + { + f->open_array_section("quorum"); + for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) + f->dump_int("rank", *p); + f->close_section(); + f->open_array_section("quorum_names"); + for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) + f->dump_string("id", monmap->get_name(*p)); + f->close_section(); + } + f->open_object_section("monmap"); + monmap->dump(f); + f->close_section(); + f->open_object_section("osdmap"); + osdmon()->osdmap.print_summary(f, cout, string(12, ' ')); + f->close_section(); + f->open_object_section("pgmap"); + pgservice->print_summary(f, NULL); + f->close_section(); + f->open_object_section("fsmap"); + mdsmon()->get_fsmap().print_summary(f, NULL); + f->close_section(); + f->open_object_section("mgrmap"); + mgrmon()->get_map().print_summary(f, nullptr); + f->close_section(); + + f->dump_object("servicemap", mgrstatmon()->get_service_map()); + f->close_section(); + } else { + ss << " cluster:\n"; + ss << " id: " << monmap->get_fsid() << "\n"; + + string health; + if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + get_health_status(false, nullptr, &health, + "\n ", "\n "); + } else { + list ls; + get_health(ls, NULL, f); + health = joinify(ls.begin(), ls.end(), + string("\n ")); + } + ss << " health: " << health << "\n"; + + ss << "\n \n services:\n"; + { + size_t maxlen = 3; + auto& service_map = mgrstatmon()->get_service_map(); + for (auto& p : service_map.services) { + maxlen = std::max(maxlen, p.first.size()); + } + string spacing(maxlen - 3, ' '); + const auto quorum_names = get_quorum_names(); + const auto mon_count = monmap->mon_info.size(); + ss << " mon: " << spacing << mon_count << " daemons, quorum " + << quorum_names; + if (quorum_names.size() != mon_count) { + std::list out_of_q; + for (size_t i = 0; i < monmap->ranks.size(); ++i) { + if (quorum.count(i) == 0) { + out_of_q.push_back(monmap->ranks[i]); + } + } + ss << ", out of quorum: " << joinify(out_of_q.begin(), + out_of_q.end(), std::string(", ")); + } + ss << "\n"; + if (mgrmon()->in_use()) { + ss << " mgr: " << spacing; + mgrmon()->get_map().print_summary(nullptr, &ss); + ss << "\n"; + } + if (mdsmon()->get_fsmap().filesystem_count() > 0) { + ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n"; + } + ss << " osd: " << spacing; + osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' ')); + ss << "\n"; + for (auto& p : service_map.services) { + ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ') + << p.second.get_summary() << "\n"; + } + } + + ss << "\n \n data:\n"; + pgservice->print_summary(NULL, &ss); + ss << "\n "; + } +} + +void Monitor::_generate_command_map(map& cmdmap, + map ¶m_str_map) +{ + for (map::const_iterator p = cmdmap.begin(); + p != cmdmap.end(); ++p) { + if (p->first == "prefix") + continue; + if (p->first == "caps") { + vector cv; + if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) && + cv.size() % 2 == 0) { + for (unsigned i = 0; i < cv.size(); i += 2) { + string k = string("caps_") + cv[i]; + param_str_map[k] = cv[i + 1]; + } + continue; + } + } + param_str_map[p->first] = cmd_vartype_stringify(p->second); + } +} + +const MonCommand *Monitor::_get_moncommand( + const string &cmd_prefix, + const vector& cmds) +{ + for (auto& c : cmds) { + if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) { + return &c; + } + } + return nullptr; +} + +bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix, + const map& cmdmap, + const map& param_str_map, + const MonCommand *this_cmd) { + + bool cmd_r = this_cmd->requires_perm('r'); + bool cmd_w = this_cmd->requires_perm('w'); + bool cmd_x = this_cmd->requires_perm('x'); + + bool capable = s->caps.is_capable( + g_ceph_context, + CEPH_ENTITY_TYPE_MON, + s->entity_name, + module, prefix, param_str_map, + cmd_r, cmd_w, cmd_x); + + dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl; + return capable; +} + +void Monitor::format_command_descriptions(const std::vector &commands, + Formatter *f, + bufferlist *rdata, + bool hide_mgr_flag) +{ + int cmdnum = 0; + f->open_object_section("command_descriptions"); + for (const auto &cmd : commands) { + unsigned flags = cmd.flags; + if (hide_mgr_flag) { + flags &= ~MonCommand::FLAG_MGR; + } + ostringstream secname; + secname << "cmd" << setfill('0') << std::setw(3) << cmdnum; + dump_cmddesc_to_json(f, secname.str(), + cmd.cmdstring, cmd.helpstring, cmd.module, + cmd.req_perms, cmd.availability, flags); + cmdnum++; + } + f->close_section(); // command_descriptions + + f->flush(*rdata); +} + +bool Monitor::is_keyring_required() +{ + string auth_cluster_required = g_conf->auth_supported.empty() ? + g_conf->auth_cluster_required : g_conf->auth_supported; + string auth_service_required = g_conf->auth_supported.empty() ? + g_conf->auth_service_required : g_conf->auth_supported; + + return auth_service_required == "cephx" || + auth_cluster_required == "cephx"; +} + +struct C_MgrProxyCommand : public Context { + Monitor *mon; + MonOpRequestRef op; + uint64_t size; + bufferlist outbl; + string outs; + C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s) + : mon(mon), op(op), size(s) { } + void finish(int r) { + Mutex::Locker l(mon->lock); + mon->mgr_proxy_bytes -= size; + mon->reply_command(op, r, outs, outbl, 0); + } +}; + +void Monitor::handle_command(MonOpRequestRef op) +{ + assert(op->is_type_command()); + MMonCommand *m = static_cast(op->get_req()); + if (m->fsid != monmap->fsid) { + dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl; + reply_command(op, -EPERM, "wrong fsid", 0); + return; + } + + MonSession *session = static_cast( + m->get_connection()->get_priv()); + if (!session) { + dout(5) << __func__ << " dropping stray message " << *m << dendl; + return; + } + BOOST_SCOPE_EXIT_ALL(=) { + session->put(); + }; + + if (m->cmd.empty()) { + string rs = "No command supplied"; + reply_command(op, -EINVAL, rs, 0); + return; + } + + string prefix; + vector fullcmd; + map cmdmap; + stringstream ss, ds; + bufferlist rdata; + string rs; + int r = -EINVAL; + rs = "unrecognized command"; + + if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { + // ss has reason for failure + r = -EINVAL; + rs = ss.str(); + if (!m->get_source().is_mon()) // don't reply to mon->mon commands + reply_command(op, r, rs, 0); + return; + } + + // check return value. If no prefix parameter provided, + // return value will be false, then return error info. + if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) { + reply_command(op, -EINVAL, "command prefix not found", 0); + return; + } + + // check prefix is empty + if (prefix.empty()) { + reply_command(op, -EINVAL, "command prefix must not be empty", 0); + return; + } + + if (prefix == "get_command_descriptions") { + bufferlist rdata; + Formatter *f = Formatter::create("json"); + // hide mgr commands until luminous upgrade is complete + bool hide_mgr_flag = + osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS; + + std::vector commands; + + // only include mgr commands once all mons are upgrade (and we've dropped + // the hard-coded PGMonitor commands) + if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) { + commands = static_cast( + paxos_service[PAXOS_MGR])->get_command_descs(); + } + + for (auto& c : leader_mon_commands) { + commands.push_back(c); + } + + format_command_descriptions(commands, f, &rdata, hide_mgr_flag); + delete f; + reply_command(op, 0, "", rdata, 0); + return; + } + + string module; + string err; + + dout(0) << "handle_command " << *m << dendl; + + string format; + cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain")); + boost::scoped_ptr f(Formatter::create(format)); + + get_str_vec(prefix, fullcmd); + + // make sure fullcmd is not empty. + // invalid prefix will cause empty vector fullcmd. + // such as, prefix=";,,;" + if (fullcmd.empty()) { + reply_command(op, -EINVAL, "command requires a prefix to be valid", 0); + return; + } + + module = fullcmd[0]; + + // validate command is in leader map + + const MonCommand *leader_cmd; + const auto& mgr_cmds = mgrmon()->get_command_descs(); + const MonCommand *mgr_cmd = nullptr; + if (!mgr_cmds.empty()) { + mgr_cmd = _get_moncommand(prefix, mgr_cmds); + } + leader_cmd = _get_moncommand(prefix, leader_mon_commands); + if (!leader_cmd) { + leader_cmd = mgr_cmd; + if (!leader_cmd) { + reply_command(op, -EINVAL, "command not known", 0); + return; + } + } + // validate command is in our map & matches, or forward if it is allowed + const MonCommand *mon_cmd = _get_moncommand( + prefix, + get_local_commands(quorum_mon_features)); + if (!mon_cmd) { + mon_cmd = mgr_cmd; + } + if (!is_leader()) { + if (!mon_cmd) { + if (leader_cmd->is_noforward()) { + reply_command(op, -EINVAL, + "command not locally supported and not allowed to forward", + 0); + return; + } + dout(10) << "Command not locally supported, forwarding request " + << m << dendl; + forward_request_leader(op); + return; + } else if (!mon_cmd->is_compat(leader_cmd)) { + if (mon_cmd->is_noforward()) { + reply_command(op, -EINVAL, + "command not compatible with leader and not allowed to forward", + 0); + return; + } + dout(10) << "Command not compatible with leader, forwarding request " + << m << dendl; + forward_request_leader(op); + return; + } + } + + if (mon_cmd->is_obsolete() || + (cct->_conf->mon_debug_deprecated_as_obsolete + && mon_cmd->is_deprecated())) { + reply_command(op, -ENOTSUP, + "command is obsolete; please check usage and/or man page", + 0); + return; + } + + if (session->proxy_con && mon_cmd->is_noforward()) { + dout(10) << "Got forward for noforward command " << m << dendl; + reply_command(op, -EINVAL, "forward for noforward command", rdata, 0); + return; + } + + /* what we perceive as being the service the command falls under */ + string service(mon_cmd->module); + + dout(25) << __func__ << " prefix='" << prefix + << "' module='" << module + << "' service='" << service << "'" << dendl; + + bool cmd_is_rw = + (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x')); + + // validate user's permissions for requested command + map param_str_map; + _generate_command_map(cmdmap, param_str_map); + if (!_allowed_command(session, service, prefix, cmdmap, + param_str_map, mon_cmd)) { + dout(1) << __func__ << " access denied" << dendl; + (cmd_is_rw ? audit_clog->info() : audit_clog->debug()) + << "from='" << session->inst << "' " + << "entity='" << session->entity_name << "' " + << "cmd=" << m->cmd << ": access denied"; + reply_command(op, -EACCES, "access denied", 0); + return; + } + + (cmd_is_rw ? audit_clog->info() : audit_clog->debug()) + << "from='" << session->inst << "' " + << "entity='" << session->entity_name << "' " + << "cmd=" << m->cmd << ": dispatch"; + + if (mon_cmd->is_mgr() && + osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + const auto& hdr = m->get_header(); + uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len; + uint64_t max = g_conf->get_val("mon_client_bytes") + * g_conf->get_val("mon_mgr_proxy_client_bytes_ratio"); + if (mgr_proxy_bytes + size > max) { + dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes + << " + " << size << " > max " << max << dendl; + reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0); + return; + } + mgr_proxy_bytes += size; + dout(10) << __func__ << " proxying mgr command (+" << size + << " -> " << mgr_proxy_bytes << ")" << dendl; + C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size); + mgr_client.start_command(m->cmd, + m->get_data(), + &fin->outbl, + &fin->outs, + new C_OnFinisher(fin, &finisher)); + return; + } + + if ((module == "mds" || module == "fs") && + prefix != "fs authorize") { + mdsmon()->dispatch(op); + return; + } + if ((module == "osd" || prefix == "pg map") && + prefix != "osd last-stat-seq") { + osdmon()->dispatch(op); + return; + } + + if (module == "pg") { + pgmon()->dispatch(op); + return; + } + if (module == "mon" && + /* Let the Monitor class handle the following commands: + * 'mon compact' + * 'mon scrub' + * 'mon sync force' + */ + prefix != "mon compact" && + prefix != "mon scrub" && + prefix != "mon sync force" && + prefix != "mon metadata" && + prefix != "mon versions" && + prefix != "mon count-metadata") { + monmon()->dispatch(op); + return; + } + if (module == "auth" || prefix == "fs authorize") { + authmon()->dispatch(op); + return; + } + if (module == "log") { + logmon()->dispatch(op); + return; + } + + if (module == "config-key") { + config_key_service->dispatch(op); + return; + } + + if (module == "mgr") { + mgrmon()->dispatch(op); + return; + } + + if (prefix == "fsid") { + if (f) { + f->open_object_section("fsid"); + f->dump_stream("fsid") << monmap->fsid; + f->close_section(); + f->flush(rdata); + } else { + ds << monmap->fsid; + rdata.append(ds); + } + reply_command(op, 0, "", rdata, 0); + return; + } + + if (prefix == "scrub" || prefix == "mon scrub") { + wait_for_paxos_write(); + if (is_leader()) { + int r = scrub_start(); + reply_command(op, r, "", rdata, 0); + } else if (is_peon()) { + forward_request_leader(op); + } else { + reply_command(op, -EAGAIN, "no quorum", rdata, 0); + } + return; + } + + if (prefix == "compact" || prefix == "mon compact") { + dout(1) << "triggering manual compaction" << dendl; + utime_t start = ceph_clock_now(); + store->compact(); + utime_t end = ceph_clock_now(); + end -= start; + dout(1) << "finished manual compaction in " << end << " seconds" << dendl; + ostringstream oss; + oss << "compacted " << g_conf->get_val("mon_keyvaluedb") << " in " << end << " seconds"; + rs = oss.str(); + r = 0; + } + else if (prefix == "injectargs") { + vector injected_args; + cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args); + if (!injected_args.empty()) { + dout(0) << "parsing injected options '" << injected_args << "'" << dendl; + ostringstream oss; + r = g_conf->injectargs(str_join(injected_args, " "), &oss); + ss << "injectargs:" << oss.str(); + rs = ss.str(); + goto out; + } else { + rs = "must supply options to be parsed in a single string"; + r = -EINVAL; + } + } else if (prefix == "time-sync-status") { + if (!f) + f.reset(Formatter::create("json-pretty")); + f->open_object_section("time_sync"); + if (!timecheck_skews.empty()) { + f->open_object_section("time_skew_status"); + for (auto& i : timecheck_skews) { + entity_inst_t inst = i.first; + double skew = i.second; + double latency = timecheck_latencies[inst]; + string name = monmap->get_name(inst.addr); + ostringstream tcss; + health_status_t tcstatus = timecheck_status(tcss, skew, latency); + f->open_object_section(name.c_str()); + f->dump_float("skew", skew); + f->dump_float("latency", latency); + f->dump_stream("health") << tcstatus; + if (tcstatus != HEALTH_OK) { + f->dump_stream("details") << tcss.str(); + } + f->close_section(); + } + f->close_section(); + } + f->open_object_section("timechecks"); + f->dump_unsigned("epoch", get_epoch()); + f->dump_int("round", timecheck_round); + f->dump_stream("round_status") << ((timecheck_round%2) ? + "on-going" : "finished"); + f->close_section(); + f->close_section(); + f->flush(rdata); + r = 0; + rs = ""; + } else if (prefix == "config set") { + std::string key; + cmd_getval(cct, cmdmap, "key", key); + std::string val; + cmd_getval(cct, cmdmap, "value", val); + r = g_conf->set_val(key, val, true, &ss); + if (r == 0) { + g_conf->apply_changes(nullptr); + } + rs = ss.str(); + goto out; + } else if (prefix == "status" || + prefix == "health" || + prefix == "df") { + string detail; + cmd_getval(g_ceph_context, cmdmap, "detail", detail); + + if (prefix == "status") { + // get_cluster_status handles f == NULL + get_cluster_status(ds, f.get()); + + if (f) { + f->flush(ds); + ds << '\n'; + } + rdata.append(ds); + } else if (prefix == "health") { + if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + string plain; + get_health_status(detail == "detail", f.get(), f ? nullptr : &plain); + if (f) { + f->flush(rdata); + } else { + rdata.append(plain); + } + } else { + list health_str; + get_health(health_str, detail == "detail" ? &rdata : NULL, f.get()); + if (f) { + f->flush(ds); + ds << '\n'; + } else { + assert(!health_str.empty()); + ds << health_str.front(); + health_str.pop_front(); + if (!health_str.empty()) { + ds << ' '; + ds << joinify(health_str.begin(), health_str.end(), string("; ")); + } + } + bufferlist comb; + comb.append(ds); + if (detail == "detail") + comb.append(rdata); + rdata = comb; + } + } else if (prefix == "df") { + bool verbose = (detail == "detail"); + if (f) + f->open_object_section("stats"); + + pgservice->dump_fs_stats(&ds, f.get(), verbose); + if (!f) + ds << '\n'; + pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose); + + if (f) { + f->close_section(); + f->flush(ds); + ds << '\n'; + } + } else { + assert(0 == "We should never get here!"); + return; + } + rdata.append(ds); + rs = ""; + r = 0; + } else if (prefix == "report") { + + // this must be formatted, in its current form + if (!f) + f.reset(Formatter::create("json-pretty")); + f->open_object_section("report"); + f->dump_stream("cluster_fingerprint") << fingerprint; + f->dump_string("version", ceph_version_to_str()); + f->dump_string("commit", git_version_to_str()); + f->dump_stream("timestamp") << ceph_clock_now(); + + vector tagsvec; + cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec); + string tagstr = str_join(tagsvec, " "); + if (!tagstr.empty()) + tagstr = tagstr.substr(0, tagstr.find_last_of(' ')); + f->dump_string("tag", tagstr); + + if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + get_health_status(true, f.get(), nullptr); + } else { + list health_str; + get_health(health_str, nullptr, f.get()); + } + + monmon()->dump_info(f.get()); + osdmon()->dump_info(f.get()); + mdsmon()->dump_info(f.get()); + authmon()->dump_info(f.get()); + pgservice->dump_info(f.get()); + + paxos->dump_info(f.get()); + + f->close_section(); + f->flush(rdata); + + ostringstream ss2; + ss2 << "report " << rdata.crc32c(CEPH_MON_PORT); + rs = ss2.str(); + r = 0; + } else if (prefix == "osd last-stat-seq") { + int64_t osd; + cmd_getval(g_ceph_context, cmdmap, "id", osd); + uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd); + if (f) { + f->dump_unsigned("seq", seq); + f->flush(ds); + } else { + ds << seq; + rdata.append(ds); + } + rs = ""; + r = 0; + } else if (prefix == "node ls") { + string node_type("all"); + cmd_getval(g_ceph_context, cmdmap, "type", node_type); + if (!f) + f.reset(Formatter::create("json-pretty")); + if (node_type == "all") { + f->open_object_section("nodes"); + print_nodes(f.get(), ds); + osdmon()->print_nodes(f.get()); + mdsmon()->print_nodes(f.get()); + f->close_section(); + } else if (node_type == "mon") { + print_nodes(f.get(), ds); + } else if (node_type == "osd") { + osdmon()->print_nodes(f.get()); + } else if (node_type == "mds") { + mdsmon()->print_nodes(f.get()); + } + f->flush(ds); + rdata.append(ds); + rs = ""; + r = 0; + } else if (prefix == "features") { + if (!is_leader() && !is_peon()) { + dout(10) << " waiting for quorum" << dendl; + waitfor_quorum.push_back(new C_RetryMessage(this, op)); + return; + } + if (!is_leader()) { + forward_request_leader(op); + return; + } + if (!f) + f.reset(Formatter::create("json-pretty")); + FeatureMap fm; + get_combined_feature_map(&fm); + f->dump_object("features", fm); + f->flush(rdata); + rs = ""; + r = 0; + } else if (prefix == "mon metadata") { + if (!f) + f.reset(Formatter::create("json-pretty")); + + string name; + bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name); + if (!all) { + // Dump a single mon's metadata + int mon = monmap->get_rank(name); + if (mon < 0) { + rs = "requested mon not found"; + r = -ENOENT; + goto out; + } + f->open_object_section("mon_metadata"); + r = get_mon_metadata(mon, f.get(), ds); + f->close_section(); + } else { + // Dump all mons' metadata + r = 0; + f->open_array_section("mon_metadata"); + for (unsigned int rank = 0; rank < monmap->size(); ++rank) { + std::ostringstream get_err; + f->open_object_section("mon"); + f->dump_string("name", monmap->get_name(rank)); + r = get_mon_metadata(rank, f.get(), get_err); + f->close_section(); + if (r == -ENOENT || r == -EINVAL) { + dout(1) << get_err.str() << dendl; + // Drop error, list what metadata we do have + r = 0; + } else if (r != 0) { + derr << "Unexpected error from get_mon_metadata: " + << cpp_strerror(r) << dendl; + ds << get_err.str(); + break; + } + } + f->close_section(); + } + + f->flush(ds); + rdata.append(ds); + rs = ""; + } else if (prefix == "mon versions") { + if (!f) + f.reset(Formatter::create("json-pretty")); + count_metadata("ceph_version", f.get()); + f->flush(ds); + rdata.append(ds); + rs = ""; + r = 0; + } else if (prefix == "mon count-metadata") { + if (!f) + f.reset(Formatter::create("json-pretty")); + string field; + cmd_getval(g_ceph_context, cmdmap, "property", field); + count_metadata(field, f.get()); + f->flush(ds); + rdata.append(ds); + rs = ""; + r = 0; + } else if (prefix == "quorum_status") { + // make sure our map is readable and up to date + if (!is_leader() && !is_peon()) { + dout(10) << " waiting for quorum" << dendl; + waitfor_quorum.push_back(new C_RetryMessage(this, op)); + return; + } + _quorum_status(f.get(), ds); + rdata.append(ds); + rs = ""; + r = 0; + } else if (prefix == "mon_status") { + get_mon_status(f.get(), ds); + if (f) + f->flush(ds); + rdata.append(ds); + rs = ""; + r = 0; + } else if (prefix == "sync force" || + prefix == "mon sync force") { + string validate1, validate2; + cmd_getval(g_ceph_context, cmdmap, "validate1", validate1); + cmd_getval(g_ceph_context, cmdmap, "validate2", validate2); + if (validate1 != "--yes-i-really-mean-it" || + validate2 != "--i-know-what-i-am-doing") { + r = -EINVAL; + rs = "are you SURE? this will mean the monitor store will be " + "erased. pass '--yes-i-really-mean-it " + "--i-know-what-i-am-doing' if you really do."; + goto out; + } + sync_force(f.get(), ds); + rs = ds.str(); + r = 0; + } else if (prefix == "heap") { + if (!ceph_using_tcmalloc()) + rs = "tcmalloc not enabled, can't use heap profiler commands\n"; + else { + string heapcmd; + cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd); + // XXX 1-element vector, change at callee or make vector here? + vector heapcmd_vec; + get_str_vec(heapcmd, heapcmd_vec); + ceph_heap_profiler_handle_command(heapcmd_vec, ds); + rdata.append(ds); + rs = ""; + r = 0; + } + } else if (prefix == "quorum") { + string quorumcmd; + cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd); + if (quorumcmd == "exit") { + start_election(); + elector.stop_participating(); + rs = "stopped responding to quorum, initiated new election"; + r = 0; + } else if (quorumcmd == "enter") { + elector.start_participating(); + start_election(); + rs = "started responding to quorum, initiated new election"; + r = 0; + } else { + rs = "needs a valid 'quorum' command"; + r = -EINVAL; + } + } else if (prefix == "version") { + if (f) { + f->open_object_section("version"); + f->dump_string("version", pretty_version_to_str()); + f->close_section(); + f->flush(ds); + } else { + ds << pretty_version_to_str(); + } + rdata.append(ds); + rs = ""; + r = 0; + } else if (prefix == "versions") { + if (!f) + f.reset(Formatter::create("json-pretty")); + map overall; + f->open_object_section("version"); + map mon, mgr, osd, mds; + + count_metadata("ceph_version", &mon); + f->open_object_section("mon"); + for (auto& p : mon) { + f->dump_int(p.first.c_str(), p.second); + overall[p.first] += p.second; + } + f->close_section(); + + mgrmon()->count_metadata("ceph_version", &mgr); + f->open_object_section("mgr"); + for (auto& p : mgr) { + f->dump_int(p.first.c_str(), p.second); + overall[p.first] += p.second; + } + f->close_section(); + + osdmon()->count_metadata("ceph_version", &osd); + f->open_object_section("osd"); + for (auto& p : osd) { + f->dump_int(p.first.c_str(), p.second); + overall[p.first] += p.second; + } + f->close_section(); + + mdsmon()->count_metadata("ceph_version", &mds); + f->open_object_section("mds"); + for (auto& p : mds) { + f->dump_int(p.first.c_str(), p.second); + overall[p.first] += p.second; + } + f->close_section(); + + for (auto& p : mgrstatmon()->get_service_map().services) { + f->open_object_section(p.first.c_str()); + map m; + p.second.count_metadata("ceph_version", &m); + for (auto& q : m) { + f->dump_int(q.first.c_str(), q.second); + overall[q.first] += q.second; + } + f->close_section(); + } + + f->open_object_section("overall"); + for (auto& p : overall) { + f->dump_int(p.first.c_str(), p.second); + } + f->close_section(); + f->close_section(); + f->flush(rdata); + rs = ""; + r = 0; + } + + out: + if (!m->get_source().is_mon()) // don't reply to mon->mon commands + reply_command(op, r, rs, rdata, 0); +} + +void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version) +{ + bufferlist rdata; + reply_command(op, rc, rs, rdata, version); +} + +void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, + bufferlist& rdata, version_t version) +{ + MMonCommand *m = static_cast(op->get_req()); + assert(m->get_type() == MSG_MON_COMMAND); + MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version); + reply->set_tid(m->get_tid()); + reply->set_data(rdata); + send_reply(op, reply); +} + + +// ------------------------ +// request/reply routing +// +// a client/mds/osd will connect to a random monitor. we need to forward any +// messages requiring state updates to the leader, and then route any replies +// back via the correct monitor and back to them. (the monitor will not +// initiate any connections.) + +void Monitor::forward_request_leader(MonOpRequestRef op) +{ + op->mark_event(__func__); + + int mon = get_leader(); + MonSession *session = op->get_session(); + PaxosServiceMessage *req = op->get_req(); + + if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) { + dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl; + } else if (session->proxy_con) { + dout(10) << "forward_request won't double fwd request " << *req << dendl; + } else if (!session->closed) { + RoutedRequest *rr = new RoutedRequest; + rr->tid = ++routed_request_tid; + rr->client_inst = req->get_source_inst(); + rr->con = req->get_connection(); + rr->con_features = rr->con->get_features(); + encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features + rr->session = static_cast(session->get()); + rr->op = op; + routed_requests[rr->tid] = rr; + session->routed_request_tids.insert(rr->tid); + + dout(10) << "forward_request " << rr->tid << " request " << *req + << " features " << rr->con_features << dendl; + + MForward *forward = new MForward(rr->tid, + req, + rr->con_features, + rr->session->caps); + forward->set_priority(req->get_priority()); + if (session->auth_handler) { + forward->entity_name = session->entity_name; + } else if (req->get_source().is_mon()) { + forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON); + } + messenger->send_message(forward, monmap->get_inst(mon)); + op->mark_forwarded(); + assert(op->get_req()->get_type() != 0); + } else { + dout(10) << "forward_request no session for request " << *req << dendl; + } +} + +// fake connection attached to forwarded messages +struct AnonConnection : public Connection { + explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {} + + int send_message(Message *m) override { + assert(!"send_message on anonymous connection"); + } + void send_keepalive() override { + assert(!"send_keepalive on anonymous connection"); + } + void mark_down() override { + // silently ignore + } + void mark_disposable() override { + // silengtly ignore + } + bool is_connected() override { return false; } +}; + +//extract the original message and put it into the regular dispatch function +void Monitor::handle_forward(MonOpRequestRef op) +{ + MForward *m = static_cast(op->get_req()); + dout(10) << "received forwarded message from " << m->client + << " via " << m->get_source_inst() << dendl; + MonSession *session = op->get_session(); + assert(session); + + if (!session->is_capable("mon", MON_CAP_X)) { + dout(0) << "forward from entity with insufficient caps! " + << session->caps << dendl; + } else { + // see PaxosService::dispatch(); we rely on this being anon + // (c->msgr == NULL) + PaxosServiceMessage *req = m->claim_message(); + assert(req != NULL); + + ConnectionRef c(new AnonConnection(cct)); + MonSession *s = new MonSession(req->get_source_inst(), + static_cast(c.get())); + c->set_priv(s->get()); + c->set_peer_addr(m->client.addr); + c->set_peer_type(m->client.name.type()); + c->set_features(m->con_features); + + s->caps = m->client_caps; + dout(10) << " caps are " << s->caps << dendl; + s->entity_name = m->entity_name; + dout(10) << " entity name '" << s->entity_name << "' type " + << s->entity_name.get_type() << dendl; + s->proxy_con = m->get_connection(); + s->proxy_tid = m->tid; + + req->set_connection(c); + + // not super accurate, but better than nothing. + req->set_recv_stamp(m->get_recv_stamp()); + + /* + * note which election epoch this is; we will drop the message if + * there is a future election since our peers will resend routed + * requests in that case. + */ + req->rx_election_epoch = get_epoch(); + + /* Because this is a special fake connection, we need to break + the ref loop between Connection and MonSession differently + than we normally do. Here, the Message refers to the Connection + which refers to the Session, and nobody else refers to the Connection + or the Session. And due to the special nature of this message, + nobody refers to the Connection via the Session. So, clear out that + half of the ref loop.*/ + s->con.reset(NULL); + + dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl; + + _ms_dispatch(req); + s->put(); + } +} + +void Monitor::try_send_message(Message *m, const entity_inst_t& to) +{ + dout(10) << "try_send_message " << *m << " to " << to << dendl; + + bufferlist bl; + encode_message(m, quorum_con_features, bl); + + messenger->send_message(m, to); + + for (int i=0; i<(int)monmap->size(); i++) { + if (i != rank) + messenger->send_message(new MRoute(bl, to), monmap->get_inst(i)); + } +} + +void Monitor::send_reply(MonOpRequestRef op, Message *reply) +{ + op->mark_event(__func__); + + MonSession *session = op->get_session(); + assert(session); + Message *req = op->get_req(); + ConnectionRef con = op->get_connection(); + + reply->set_cct(g_ceph_context); + dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl; + + if (!con) { + dout(2) << "send_reply no connection, dropping reply " << *reply + << " to " << req << " " << *req << dendl; + reply->put(); + op->mark_event("reply: no connection"); + return; + } + + if (!session->con && !session->proxy_con) { + dout(2) << "send_reply no connection, dropping reply " << *reply + << " to " << req << " " << *req << dendl; + reply->put(); + op->mark_event("reply: no connection"); + return; + } + + if (session->proxy_con) { + dout(15) << "send_reply routing reply to " << con->get_peer_addr() + << " via " << session->proxy_con->get_peer_addr() + << " for request " << *req << dendl; + session->proxy_con->send_message(new MRoute(session->proxy_tid, reply)); + op->mark_event("reply: send routed request"); + } else { + session->con->send_message(reply); + op->mark_event("reply: send"); + } +} + +void Monitor::no_reply(MonOpRequestRef op) +{ + MonSession *session = op->get_session(); + Message *req = op->get_req(); + + if (session->proxy_con) { + dout(10) << "no_reply to " << req->get_source_inst() + << " via " << session->proxy_con->get_peer_addr() + << " for request " << *req << dendl; + session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL)); + op->mark_event("no_reply: send routed request"); + } else { + dout(10) << "no_reply to " << req->get_source_inst() + << " " << *req << dendl; + op->mark_event("no_reply"); + } +} + +void Monitor::handle_route(MonOpRequestRef op) +{ + MRoute *m = static_cast(op->get_req()); + MonSession *session = op->get_session(); + //check privileges + if (!session->is_capable("mon", MON_CAP_X)) { + dout(0) << "MRoute received from entity without appropriate perms! " + << dendl; + return; + } + if (m->msg) + dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl; + else + dout(10) << "handle_route null to " << m->dest << dendl; + + // look it up + if (m->session_mon_tid) { + if (routed_requests.count(m->session_mon_tid)) { + RoutedRequest *rr = routed_requests[m->session_mon_tid]; + + // reset payload, in case encoding is dependent on target features + if (m->msg) { + m->msg->clear_payload(); + rr->con->send_message(m->msg); + m->msg = NULL; + } + if (m->send_osdmap_first) { + dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl; + osdmon()->send_incremental(m->send_osdmap_first, rr->session, + true, MonOpRequestRef()); + } + assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid)); + routed_requests.erase(m->session_mon_tid); + rr->session->routed_request_tids.erase(m->session_mon_tid); + delete rr; + } else { + dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl; + } + } else { + dout(10) << " not a routed request, trying to send anyway" << dendl; + if (m->msg) { + messenger->send_message(m->msg, m->dest); + m->msg = NULL; + } + } +} + +void Monitor::resend_routed_requests() +{ + dout(10) << "resend_routed_requests" << dendl; + int mon = get_leader(); + list retry; + for (map::iterator p = routed_requests.begin(); + p != routed_requests.end(); + ++p) { + RoutedRequest *rr = p->second; + + if (mon == rank) { + dout(10) << " requeue for self tid " << rr->tid << dendl; + rr->op->mark_event("retry routed request"); + retry.push_back(new C_RetryMessage(this, rr->op)); + if (rr->session) { + assert(rr->session->routed_request_tids.count(p->first)); + rr->session->routed_request_tids.erase(p->first); + } + delete rr; + } else { + bufferlist::iterator q = rr->request_bl.begin(); + PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q); + rr->op->mark_event("resend forwarded message to leader"); + dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl; + MForward *forward = new MForward(rr->tid, req, rr->con_features, + rr->session->caps); + req->put(); // forward takes its own ref; drop ours. + forward->client = rr->client_inst; + forward->set_priority(req->get_priority()); + messenger->send_message(forward, monmap->get_inst(mon)); + } + } + if (mon == rank) { + routed_requests.clear(); + finish_contexts(g_ceph_context, retry); + } +} + +void Monitor::remove_session(MonSession *s) +{ + dout(10) << "remove_session " << s << " " << s->inst + << " features 0x" << std::hex << s->con_features << std::dec << dendl; + assert(s->con); + assert(!s->closed); + for (set::iterator p = s->routed_request_tids.begin(); + p != s->routed_request_tids.end(); + ++p) { + assert(routed_requests.count(*p)); + RoutedRequest *rr = routed_requests[*p]; + dout(10) << " dropping routed request " << rr->tid << dendl; + delete rr; + routed_requests.erase(*p); + } + s->routed_request_tids.clear(); + s->con->set_priv(NULL); + session_map.remove_session(s); + logger->set(l_mon_num_sessions, session_map.get_size()); + logger->inc(l_mon_session_rm); +} + +void Monitor::remove_all_sessions() +{ + Mutex::Locker l(session_map_lock); + while (!session_map.sessions.empty()) { + MonSession *s = session_map.sessions.front(); + remove_session(s); + if (logger) + logger->inc(l_mon_session_rm); + } + if (logger) + logger->set(l_mon_num_sessions, session_map.get_size()); +} + +void Monitor::send_command(const entity_inst_t& inst, + const vector& com) +{ + dout(10) << "send_command " << inst << "" << com << dendl; + MMonCommand *c = new MMonCommand(monmap->fsid); + c->cmd = com; + try_send_message(c, inst); +} + +void Monitor::waitlist_or_zap_client(MonOpRequestRef op) +{ + /** + * Wait list the new session until we're in the quorum, assuming it's + * sufficiently new. + * tick() will periodically send them back through so we can send + * the client elsewhere if we don't think we're getting back in. + * + * But we whitelist a few sorts of messages: + * 1) Monitors can talk to us at any time, of course. + * 2) auth messages. It's unlikely to go through much faster, but + * it's possible we've just lost our quorum status and we want to take... + * 3) command messages. We want to accept these under all possible + * circumstances. + */ + Message *m = op->get_req(); + MonSession *s = op->get_session(); + ConnectionRef con = op->get_connection(); + utime_t too_old = ceph_clock_now(); + too_old -= g_ceph_context->_conf->mon_lease; + if (m->get_recv_stamp() > too_old && + con->is_connected()) { + dout(5) << "waitlisting message " << *m << dendl; + maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op)); + op->mark_wait_for_quorum(); + } else { + dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl; + con->mark_down(); + // proxied sessions aren't registered and don't have a con; don't remove + // those. + if (!s->proxy_con) { + Mutex::Locker l(session_map_lock); + remove_session(s); + } + op->mark_zap(); + } +} + +void Monitor::_ms_dispatch(Message *m) +{ + if (is_shutdown()) { + m->put(); + return; + } + + MonOpRequestRef op = op_tracker.create_request(m); + bool src_is_mon = op->is_src_mon(); + op->mark_event("mon:_ms_dispatch"); + MonSession *s = op->get_session(); + if (s && s->closed) { + return; + } + + if (src_is_mon && s) { + ConnectionRef con = m->get_connection(); + if (con->get_messenger() && con->get_features() != s->con_features) { + // only update features if this is a non-anonymous connection + dout(10) << __func__ << " feature change for " << m->get_source_inst() + << " (was " << s->con_features + << ", now " << con->get_features() << ")" << dendl; + // connection features changed - recreate session. + if (s->con && s->con != con) { + dout(10) << __func__ << " connection for " << m->get_source_inst() + << " changed from session; mark down and replace" << dendl; + s->con->mark_down(); + } + if (s->item.is_on_list()) { + // forwarded messages' sessions are not in the sessions map and + // exist only while the op is being handled. + remove_session(s); + } + s->put(); + s = nullptr; + } + } + + if (!s) { + // if the sender is not a monitor, make sure their first message for a + // session is an MAuth. If it is not, assume it's a stray message, + // and considering that we are creating a new session it is safe to + // assume that the sender hasn't authenticated yet, so we have no way + // of assessing whether we should handle it or not. + if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH && + m->get_type() != CEPH_MSG_MON_GET_MAP && + m->get_type() != CEPH_MSG_PING)) { + dout(1) << __func__ << " dropping stray message " << *m + << " from " << m->get_source_inst() << dendl; + return; + } + + ConnectionRef con = m->get_connection(); + { + Mutex::Locker l(session_map_lock); + s = session_map.new_session(m->get_source_inst(), con.get()); + } + assert(s); + con->set_priv(s->get()); + dout(10) << __func__ << " new session " << s << " " << *s + << " features 0x" << std::hex + << s->con_features << std::dec << dendl; + op->set_session(s); + + logger->set(l_mon_num_sessions, session_map.get_size()); + logger->inc(l_mon_session_add); + + if (src_is_mon) { + // give it monitor caps; the peer type has been authenticated + dout(5) << __func__ << " setting monitor caps on this connection" << dendl; + if (!s->caps.is_allow_all()) // but no need to repeatedly copy + s->caps = *mon_caps; + } + s->put(); + } else { + dout(20) << __func__ << " existing session " << s << " for " << s->inst + << dendl; + } + + assert(s); + + s->session_timeout = ceph_clock_now(); + s->session_timeout += g_conf->mon_session_timeout; + + if (s->auth_handler) { + s->entity_name = s->auth_handler->get_entity_name(); + } + dout(20) << " caps " << s->caps.get_str() << dendl; + + if ((is_synchronizing() || + (s->global_id == 0 && !exited_quorum.is_zero())) && + !src_is_mon && + m->get_type() != CEPH_MSG_PING) { + waitlist_or_zap_client(op); + } else { + dispatch_op(op); + } + return; +} + +void Monitor::dispatch_op(MonOpRequestRef op) +{ + op->mark_event("mon:dispatch_op"); + MonSession *s = op->get_session(); + assert(s); + if (s->closed) { + dout(10) << " session closed, dropping " << op->get_req() << dendl; + return; + } + + /* we will consider the default type as being 'monitor' until proven wrong */ + op->set_type_monitor(); + /* deal with all messages that do not necessarily need caps */ + bool dealt_with = true; + switch (op->get_req()->get_type()) { + // auth + case MSG_MON_GLOBAL_ID: + case CEPH_MSG_AUTH: + op->set_type_service(); + /* no need to check caps here */ + paxos_service[PAXOS_AUTH]->dispatch(op); + break; + + case CEPH_MSG_PING: + handle_ping(op); + break; + + /* MMonGetMap may be used by clients to obtain a monmap *before* + * authenticating with the monitor. We need to handle these without + * checking caps because, even on a cluster without cephx, we only set + * session caps *after* the auth handshake. A good example of this + * is when a client calls MonClient::get_monmap_privately(), which does + * not authenticate when obtaining a monmap. + */ + case CEPH_MSG_MON_GET_MAP: + handle_mon_get_map(op); + break; + + case CEPH_MSG_MON_METADATA: + return handle_mon_metadata(op); + + default: + dealt_with = false; + break; + } + if (dealt_with) + return; + + /* well, maybe the op belongs to a service... */ + op->set_type_service(); + /* deal with all messages which caps should be checked somewhere else */ + dealt_with = true; + switch (op->get_req()->get_type()) { + + // OSDs + case CEPH_MSG_MON_GET_OSDMAP: + case CEPH_MSG_POOLOP: + case MSG_OSD_BEACON: + case MSG_OSD_MARK_ME_DOWN: + case MSG_OSD_FULL: + case MSG_OSD_FAILURE: + case MSG_OSD_BOOT: + case MSG_OSD_ALIVE: + case MSG_OSD_PGTEMP: + case MSG_OSD_PG_CREATED: + case MSG_REMOVE_SNAPS: + paxos_service[PAXOS_OSDMAP]->dispatch(op); + break; + + // MDSs + case MSG_MDS_BEACON: + case MSG_MDS_OFFLOAD_TARGETS: + paxos_service[PAXOS_MDSMAP]->dispatch(op); + break; + + // Mgrs + case MSG_MGR_BEACON: + paxos_service[PAXOS_MGR]->dispatch(op); + break; + + // MgrStat + case CEPH_MSG_STATFS: + // this is an ugly hack, sorry! force the version to 1 so that we do + // not run afoul of the is_readable() paxos check. the client is going + // by the pgmonitor version and the MgrStatMonitor version will lag behind + // that until we complete the upgrade. The paxos ordering crap really + // doesn't matter for statfs results, so just kludge around it here. + if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) { + ((MStatfs*)op->get_req())->version = 1; + } + case MSG_MON_MGR_REPORT: + case MSG_GETPOOLSTATS: + paxos_service[PAXOS_MGRSTAT]->dispatch(op); + break; + + // pg + case MSG_PGSTATS: + paxos_service[PAXOS_PGMAP]->dispatch(op); + break; + + // log + case MSG_LOG: + paxos_service[PAXOS_LOG]->dispatch(op); + break; + + // handle_command() does its own caps checking + case MSG_MON_COMMAND: + op->set_type_command(); + handle_command(op); + break; + + default: + dealt_with = false; + break; + } + if (dealt_with) + return; + + /* nop, looks like it's not a service message; revert back to monitor */ + op->set_type_monitor(); + + /* messages we, the Monitor class, need to deal with + * but may be sent by clients. */ + + if (!op->get_session()->is_capable("mon", MON_CAP_R)) { + dout(5) << __func__ << " " << op->get_req()->get_source_inst() + << " not enough caps for " << *(op->get_req()) << " -- dropping" + << dendl; + goto drop; + } + + dealt_with = true; + switch (op->get_req()->get_type()) { + + // misc + case CEPH_MSG_MON_GET_VERSION: + handle_get_version(op); + break; + + case CEPH_MSG_MON_SUBSCRIBE: + /* FIXME: check what's being subscribed, filter accordingly */ + handle_subscribe(op); + break; + + default: + dealt_with = false; + break; + } + if (dealt_with) + return; + + if (!op->is_src_mon()) { + dout(1) << __func__ << " unexpected monitor message from" + << " non-monitor entity " << op->get_req()->get_source_inst() + << " " << *(op->get_req()) << " -- dropping" << dendl; + goto drop; + } + + /* messages that should only be sent by another monitor */ + dealt_with = true; + switch (op->get_req()->get_type()) { + + case MSG_ROUTE: + handle_route(op); + break; + + case MSG_MON_PROBE: + handle_probe(op); + break; + + // Sync (i.e., the new slurp, but on steroids) + case MSG_MON_SYNC: + handle_sync(op); + break; + case MSG_MON_SCRUB: + handle_scrub(op); + break; + + /* log acks are sent from a monitor we sent the MLog to, and are + never sent by clients to us. */ + case MSG_LOGACK: + log_client.handle_log_ack((MLogAck*)op->get_req()); + break; + + // monmap + case MSG_MON_JOIN: + op->set_type_service(); + paxos_service[PAXOS_MONMAP]->dispatch(op); + break; + + // paxos + case MSG_MON_PAXOS: + { + op->set_type_paxos(); + MMonPaxos *pm = static_cast(op->get_req()); + if (!op->get_session()->is_capable("mon", MON_CAP_X)) { + //can't send these! + break; + } + + if (state == STATE_SYNCHRONIZING) { + // we are synchronizing. These messages would do us no + // good, thus just drop them and ignore them. + dout(10) << __func__ << " ignore paxos msg from " + << pm->get_source_inst() << dendl; + break; + } + + // sanitize + if (pm->epoch > get_epoch()) { + bootstrap(); + break; + } + if (pm->epoch != get_epoch()) { + break; + } + + paxos->dispatch(op); + } + break; + + // elector messages + case MSG_MON_ELECTION: + op->set_type_election(); + //check privileges here for simplicity + if (!op->get_session()->is_capable("mon", MON_CAP_X)) { + dout(0) << "MMonElection received from entity without enough caps!" + << op->get_session()->caps << dendl; + break; + } + if (!is_probing() && !is_synchronizing()) { + elector.dispatch(op); + } + break; + + case MSG_FORWARD: + handle_forward(op); + break; + + case MSG_TIMECHECK: + handle_timecheck(op); + break; + + case MSG_MON_HEALTH: + health_monitor->dispatch(op); + break; + + case MSG_MON_HEALTH_CHECKS: + op->set_type_service(); + paxos_service[PAXOS_HEALTH]->dispatch(op); + break; + + default: + dealt_with = false; + break; + } + if (!dealt_with) { + dout(1) << "dropping unexpected " << *(op->get_req()) << dendl; + goto drop; + } + return; + +drop: + return; +} + +void Monitor::handle_ping(MonOpRequestRef op) +{ + MPing *m = static_cast(op->get_req()); + dout(10) << __func__ << " " << *m << dendl; + MPing *reply = new MPing; + entity_inst_t inst = m->get_source_inst(); + bufferlist payload; + boost::scoped_ptr f(new JSONFormatter(true)); + f->open_object_section("pong"); + + if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { + get_health_status(false, f.get(), nullptr); + } else { + list health_str; + get_health(health_str, nullptr, f.get()); + } + + { + stringstream ss; + get_mon_status(f.get(), ss); + } + + f->close_section(); + stringstream ss; + f->flush(ss); + ::encode(ss.str(), payload); + reply->set_payload(payload); + dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl; + messenger->send_message(reply, inst); +} + +void Monitor::timecheck_start() +{ + dout(10) << __func__ << dendl; + timecheck_cleanup(); + timecheck_start_round(); +} + +void Monitor::timecheck_finish() +{ + dout(10) << __func__ << dendl; + timecheck_cleanup(); +} + +void Monitor::timecheck_start_round() +{ + dout(10) << __func__ << " curr " << timecheck_round << dendl; + assert(is_leader()); + + if (monmap->size() == 1) { + assert(0 == "We are alone; this shouldn't have been scheduled!"); + return; + } + + if (timecheck_round % 2) { + dout(10) << __func__ << " there's a timecheck going on" << dendl; + utime_t curr_time = ceph_clock_now(); + double max = g_conf->mon_timecheck_interval*3; + if (curr_time - timecheck_round_start < max) { + dout(10) << __func__ << " keep current round going" << dendl; + goto out; + } else { + dout(10) << __func__ + << " finish current timecheck and start new" << dendl; + timecheck_cancel_round(); + } + } + + assert(timecheck_round % 2 == 0); + timecheck_acks = 0; + timecheck_round ++; + timecheck_round_start = ceph_clock_now(); + dout(10) << __func__ << " new " << timecheck_round << dendl; + + timecheck(); +out: + dout(10) << __func__ << " setting up next event" << dendl; + timecheck_reset_event(); +} + +void Monitor::timecheck_finish_round(bool success) +{ + dout(10) << __func__ << " curr " << timecheck_round << dendl; + assert(timecheck_round % 2); + timecheck_round ++; + timecheck_round_start = utime_t(); + + if (success) { + assert(timecheck_waiting.empty()); + assert(timecheck_acks == quorum.size()); + timecheck_report(); + timecheck_check_skews(); + return; + } + + dout(10) << __func__ << " " << timecheck_waiting.size() + << " peers still waiting:"; + for (map::iterator p = timecheck_waiting.begin(); + p != timecheck_waiting.end(); ++p) { + *_dout << " " << p->first.name; + } + *_dout << dendl; + timecheck_waiting.clear(); + + dout(10) << __func__ << " finished to " << timecheck_round << dendl; +} + +void Monitor::timecheck_cancel_round() +{ + timecheck_finish_round(false); +} + +void Monitor::timecheck_cleanup() +{ + timecheck_round = 0; + timecheck_acks = 0; + timecheck_round_start = utime_t(); + + if (timecheck_event) { + timer.cancel_event(timecheck_event); + timecheck_event = NULL; + } + timecheck_waiting.clear(); + timecheck_skews.clear(); + timecheck_latencies.clear(); + + timecheck_rounds_since_clean = 0; +} + +void Monitor::timecheck_reset_event() +{ + if (timecheck_event) { + timer.cancel_event(timecheck_event); + timecheck_event = NULL; + } + + double delay = + cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean; + + if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) { + delay = cct->_conf->mon_timecheck_interval; + } + + dout(10) << __func__ << " delay " << delay + << " rounds_since_clean " << timecheck_rounds_since_clean + << dendl; + + timecheck_event = timer.add_event_after( + delay, + new C_MonContext(this, [this](int) { + timecheck_start_round(); + })); +} + +void Monitor::timecheck_check_skews() +{ + dout(10) << __func__ << dendl; + assert(is_leader()); + assert((timecheck_round % 2) == 0); + if (monmap->size() == 1) { + assert(0 == "We are alone; we shouldn't have gotten here!"); + return; + } + assert(timecheck_latencies.size() == timecheck_skews.size()); + + bool found_skew = false; + for (map::iterator p = timecheck_skews.begin(); + p != timecheck_skews.end(); ++p) { + + double abs_skew; + if (timecheck_has_skew(p->second, &abs_skew)) { + dout(10) << __func__ + << " " << p->first << " skew " << abs_skew << dendl; + found_skew = true; + } + } + + if (found_skew) { + ++timecheck_rounds_since_clean; + timecheck_reset_event(); + } else if (timecheck_rounds_since_clean > 0) { + dout(1) << __func__ + << " no clock skews found after " << timecheck_rounds_since_clean + << " rounds" << dendl; + // make sure the skews are really gone and not just a transient success + // this will run just once if not in the presence of skews again. + timecheck_rounds_since_clean = 1; + timecheck_reset_event(); + timecheck_rounds_since_clean = 0; + } + +} + +void Monitor::timecheck_report() +{ + dout(10) << __func__ << dendl; + assert(is_leader()); + assert((timecheck_round % 2) == 0); + if (monmap->size() == 1) { + assert(0 == "We are alone; we shouldn't have gotten here!"); + return; + } + + assert(timecheck_latencies.size() == timecheck_skews.size()); + bool do_output = true; // only output report once + for (set::iterator q = quorum.begin(); q != quorum.end(); ++q) { + if (monmap->get_name(*q) == name) + continue; + + MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT); + m->epoch = get_epoch(); + m->round = timecheck_round; + + for (map::iterator it = timecheck_skews.begin(); + it != timecheck_skews.end(); ++it) { + double skew = it->second; + double latency = timecheck_latencies[it->first]; + + m->skews[it->first] = skew; + m->latencies[it->first] = latency; + + if (do_output) { + dout(25) << __func__ << " " << it->first + << " latency " << latency + << " skew " << skew << dendl; + } + } + do_output = false; + entity_inst_t inst = monmap->get_inst(*q); + dout(10) << __func__ << " send report to " << inst << dendl; + messenger->send_message(m, inst); + } +} + +void Monitor::timecheck() +{ + dout(10) << __func__ << dendl; + assert(is_leader()); + if (monmap->size() == 1) { + assert(0 == "We are alone; we shouldn't have gotten here!"); + return; + } + assert(timecheck_round % 2 != 0); + + timecheck_acks = 1; // we ack ourselves + + dout(10) << __func__ << " start timecheck epoch " << get_epoch() + << " round " << timecheck_round << dendl; + + // we are at the eye of the storm; the point of reference + timecheck_skews[messenger->get_myinst()] = 0.0; + timecheck_latencies[messenger->get_myinst()] = 0.0; + + for (set::iterator it = quorum.begin(); it != quorum.end(); ++it) { + if (monmap->get_name(*it) == name) + continue; + + entity_inst_t inst = monmap->get_inst(*it); + utime_t curr_time = ceph_clock_now(); + timecheck_waiting[inst] = curr_time; + MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING); + m->epoch = get_epoch(); + m->round = timecheck_round; + dout(10) << __func__ << " send " << *m << " to " << inst << dendl; + messenger->send_message(m, inst); + } +} + +health_status_t Monitor::timecheck_status(ostringstream &ss, + const double skew_bound, + const double latency) +{ + health_status_t status = HEALTH_OK; + assert(latency >= 0); + + double abs_skew; + if (timecheck_has_skew(skew_bound, &abs_skew)) { + status = HEALTH_WARN; + ss << "clock skew " << abs_skew << "s" + << " > max " << g_conf->mon_clock_drift_allowed << "s"; + } + + return status; +} + +void Monitor::handle_timecheck_leader(MonOpRequestRef op) +{ + MTimeCheck *m = static_cast(op->get_req()); + dout(10) << __func__ << " " << *m << dendl; + /* handles PONG's */ + assert(m->op == MTimeCheck::OP_PONG); + + entity_inst_t other = m->get_source_inst(); + if (m->epoch < get_epoch()) { + dout(1) << __func__ << " got old timecheck epoch " << m->epoch + << " from " << other + << " curr " << get_epoch() + << " -- severely lagged? discard" << dendl; + return; + } + assert(m->epoch == get_epoch()); + + if (m->round < timecheck_round) { + dout(1) << __func__ << " got old round " << m->round + << " from " << other + << " curr " << timecheck_round << " -- discard" << dendl; + return; + } + + utime_t curr_time = ceph_clock_now(); + + assert(timecheck_waiting.count(other) > 0); + utime_t timecheck_sent = timecheck_waiting[other]; + timecheck_waiting.erase(other); + if (curr_time < timecheck_sent) { + // our clock was readjusted -- drop everything until it all makes sense. + dout(1) << __func__ << " our clock was readjusted --" + << " bump round and drop current check" + << dendl; + timecheck_cancel_round(); + return; + } + + /* update peer latencies */ + double latency = (double)(curr_time - timecheck_sent); + + if (timecheck_latencies.count(other) == 0) + timecheck_latencies[other] = latency; + else { + double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2)); + timecheck_latencies[other] = avg_latency; + } + + /* + * update skews + * + * some nasty thing goes on if we were to do 'a - b' between two utime_t, + * and 'a' happens to be lower than 'b'; so we use double instead. + * + * latency is always expected to be >= 0. + * + * delta, the difference between theirs timestamp and ours, may either be + * lower or higher than 0; will hardly ever be 0. + * + * The absolute skew is the absolute delta minus the latency, which is + * taken as a whole instead of an rtt given that there is some queueing + * and dispatch times involved and it's hard to assess how long exactly + * it took for the message to travel to the other side and be handled. So + * we call it a bounded skew, the worst case scenario. + * + * Now, to math! + * + * Given that the latency is always positive, we can establish that the + * bounded skew will be: + * + * 1. positive if the absolute delta is higher than the latency and + * delta is positive + * 2. negative if the absolute delta is higher than the latency and + * delta is negative. + * 3. zero if the absolute delta is lower than the latency. + * + * On 3. we make a judgement call and treat the skew as non-existent. + * This is because that, if the absolute delta is lower than the + * latency, then the apparently existing skew is nothing more than a + * side-effect of the high latency at work. + * + * This may not be entirely true though, as a severely skewed clock + * may be masked by an even higher latency, but with high latencies + * we probably have worse issues to deal with than just skewed clocks. + */ + assert(latency >= 0); + + double delta = ((double) m->timestamp) - ((double) curr_time); + double abs_delta = (delta > 0 ? delta : -delta); + double skew_bound = abs_delta - latency; + if (skew_bound < 0) + skew_bound = 0; + else if (delta < 0) + skew_bound = -skew_bound; + + ostringstream ss; + health_status_t status = timecheck_status(ss, skew_bound, latency); + clog->health(status) << other << " " << ss.str(); + + dout(10) << __func__ << " from " << other << " ts " << m->timestamp + << " delta " << delta << " skew_bound " << skew_bound + << " latency " << latency << dendl; + + timecheck_skews[other] = skew_bound; + + timecheck_acks++; + if (timecheck_acks == quorum.size()) { + dout(10) << __func__ << " got pongs from everybody (" + << timecheck_acks << " total)" << dendl; + assert(timecheck_skews.size() == timecheck_acks); + assert(timecheck_waiting.empty()); + // everyone has acked, so bump the round to finish it. + timecheck_finish_round(); + } +} + +void Monitor::handle_timecheck_peon(MonOpRequestRef op) +{ + MTimeCheck *m = static_cast(op->get_req()); + dout(10) << __func__ << " " << *m << dendl; + + assert(is_peon()); + assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT); + + if (m->epoch != get_epoch()) { + dout(1) << __func__ << " got wrong epoch " + << "(ours " << get_epoch() + << " theirs: " << m->epoch << ") -- discarding" << dendl; + return; + } + + if (m->round < timecheck_round) { + dout(1) << __func__ << " got old round " << m->round + << " current " << timecheck_round + << " (epoch " << get_epoch() << ") -- discarding" << dendl; + return; + } + + timecheck_round = m->round; + + if (m->op == MTimeCheck::OP_REPORT) { + assert((timecheck_round % 2) == 0); + timecheck_latencies.swap(m->latencies); + timecheck_skews.swap(m->skews); + return; + } + + assert((timecheck_round % 2) != 0); + MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG); + utime_t curr_time = ceph_clock_now(); + reply->timestamp = curr_time; + reply->epoch = m->epoch; + reply->round = m->round; + dout(10) << __func__ << " send " << *m + << " to " << m->get_source_inst() << dendl; + m->get_connection()->send_message(reply); +} + +void Monitor::handle_timecheck(MonOpRequestRef op) +{ + MTimeCheck *m = static_cast(op->get_req()); + dout(10) << __func__ << " " << *m << dendl; + + if (is_leader()) { + if (m->op != MTimeCheck::OP_PONG) { + dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl; + } else { + handle_timecheck_leader(op); + } + } else if (is_peon()) { + if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) { + dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl; + } else { + handle_timecheck_peon(op); + } + } else { + dout(1) << __func__ << " drop unexpected msg" << dendl; + } +} + +void Monitor::handle_subscribe(MonOpRequestRef op) +{ + MMonSubscribe *m = static_cast(op->get_req()); + dout(10) << "handle_subscribe " << *m << dendl; + + bool reply = false; + + MonSession *s = op->get_session(); + assert(s); + + for (map::iterator p = m->what.begin(); + p != m->what.end(); + ++p) { + // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer + if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0) + reply = true; + + // remove conflicting subscribes + if (logmon()->sub_name_to_id(p->first) >= 0) { + for (map::iterator it = s->sub_map.begin(); + it != s->sub_map.end(); ) { + if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) { + Mutex::Locker l(session_map_lock); + session_map.remove_sub((it++)->second); + } else { + ++it; + } + } + } + + { + Mutex::Locker l(session_map_lock); + session_map.add_update_sub(s, p->first, p->second.start, + p->second.flags & CEPH_SUBSCRIBE_ONETIME, + m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP)); + } + + if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) { + dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl; + if ((int)s->is_capable("mds", MON_CAP_R)) { + Subscription *sub = s->sub_map[p->first]; + assert(sub != nullptr); + mdsmon()->check_sub(sub); + } + } else if (p->first == "osdmap") { + if ((int)s->is_capable("osd", MON_CAP_R)) { + if (s->osd_epoch > p->second.start) { + // client needs earlier osdmaps on purpose, so reset the sent epoch + s->osd_epoch = 0; + } + osdmon()->check_osdmap_sub(s->sub_map["osdmap"]); + } + } else if (p->first == "osd_pg_creates") { + if ((int)s->is_capable("osd", MON_CAP_W)) { + if (monmap->get_required_features().contains_all( + ceph::features::mon::FEATURE_LUMINOUS)) { + osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]); + } else { + pgmon()->check_sub(s->sub_map["osd_pg_creates"]); + } + } + } else if (p->first == "monmap") { + monmon()->check_sub(s->sub_map[p->first]); + } else if (logmon()->sub_name_to_id(p->first) >= 0) { + logmon()->check_sub(s->sub_map[p->first]); + } else if (p->first == "mgrmap" || p->first == "mgrdigest") { + mgrmon()->check_sub(s->sub_map[p->first]); + } else if (p->first == "servicemap") { + mgrstatmon()->check_sub(s->sub_map[p->first]); + } + } + + if (reply) { + // we only need to reply if the client is old enough to think it + // has to send renewals. + ConnectionRef con = m->get_connection(); + if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) + m->get_connection()->send_message(new MMonSubscribeAck( + monmap->get_fsid(), (int)g_conf->mon_subscribe_interval)); + } + +} + +void Monitor::handle_get_version(MonOpRequestRef op) +{ + MMonGetVersion *m = static_cast(op->get_req()); + dout(10) << "handle_get_version " << *m << dendl; + PaxosService *svc = NULL; + + MonSession *s = op->get_session(); + assert(s); + + if (!is_leader() && !is_peon()) { + dout(10) << " waiting for quorum" << dendl; + waitfor_quorum.push_back(new C_RetryMessage(this, op)); + goto out; + } + + if (m->what == "mdsmap") { + svc = mdsmon(); + } else if (m->what == "fsmap") { + svc = mdsmon(); + } else if (m->what == "osdmap") { + svc = osdmon(); + } else if (m->what == "monmap") { + svc = monmon(); + } else { + derr << "invalid map type " << m->what << dendl; + } + + if (svc) { + if (!svc->is_readable()) { + svc->wait_for_readable(op, new C_RetryMessage(this, op)); + goto out; + } + + MMonGetVersionReply *reply = new MMonGetVersionReply(); + reply->handle = m->handle; + reply->version = svc->get_last_committed(); + reply->oldest_version = svc->get_first_committed(); + reply->set_tid(m->get_tid()); + + m->get_connection()->send_message(reply); + } + out: + return; +} + +bool Monitor::ms_handle_reset(Connection *con) +{ + dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl; + + // ignore lossless monitor sessions + if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) + return false; + + MonSession *s = static_cast(con->get_priv()); + if (!s) + return false; + + // break any con <-> session ref cycle + s->con->set_priv(NULL); + + if (is_shutdown()) + return false; + + Mutex::Locker l(lock); + + dout(10) << "reset/close on session " << s->inst << dendl; + if (!s->closed) { + Mutex::Locker l(session_map_lock); + remove_session(s); + } + s->put(); + return true; +} + +bool Monitor::ms_handle_refused(Connection *con) +{ + // just log for now... + dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl; + return false; +} + +// ----- + +void Monitor::send_latest_monmap(Connection *con) +{ + bufferlist bl; + monmap->encode(bl, con->get_features()); + con->send_message(new MMonMap(bl)); +} + +void Monitor::handle_mon_get_map(MonOpRequestRef op) +{ + MMonGetMap *m = static_cast(op->get_req()); + dout(10) << "handle_mon_get_map" << dendl; + send_latest_monmap(m->get_connection().get()); +} + +void Monitor::handle_mon_metadata(MonOpRequestRef op) +{ + MMonMetadata *m = static_cast(op->get_req()); + if (is_leader()) { + dout(10) << __func__ << dendl; + update_mon_metadata(m->get_source().num(), std::move(m->data)); + } +} + +void Monitor::update_mon_metadata(int from, Metadata&& m) +{ + // NOTE: this is now for legacy (kraken or jewel) mons only. + pending_metadata[from] = std::move(m); + + MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); + bufferlist bl; + ::encode(pending_metadata, bl); + t->put(MONITOR_STORE_PREFIX, "last_metadata", bl); + paxos->trigger_propose(); +} + +int Monitor::load_metadata() +{ + bufferlist bl; + int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl); + if (r) + return r; + bufferlist::iterator it = bl.begin(); + ::decode(mon_metadata, it); + + pending_metadata = mon_metadata; + return 0; +} + +int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err) +{ + assert(f); + if (!mon_metadata.count(mon)) { + err << "mon." << mon << " not found"; + return -EINVAL; + } + const Metadata& m = mon_metadata[mon]; + for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) { + f->dump_string(p->first.c_str(), p->second); + } + return 0; +} + +void Monitor::count_metadata(const string& field, map *out) +{ + for (auto& p : mon_metadata) { + auto q = p.second.find(field); + if (q == p.second.end()) { + (*out)["unknown"]++; + } else { + (*out)[q->second]++; + } + } +} + +void Monitor::count_metadata(const string& field, Formatter *f) +{ + map by_val; + count_metadata(field, &by_val); + f->open_object_section(field.c_str()); + for (auto& p : by_val) { + f->dump_int(p.first.c_str(), p.second); + } + f->close_section(); +} + +int Monitor::print_nodes(Formatter *f, ostream& err) +{ + map > mons; // hostname => mon + for (map::iterator it = mon_metadata.begin(); + it != mon_metadata.end(); ++it) { + const Metadata& m = it->second; + Metadata::const_iterator hostname = m.find("hostname"); + if (hostname == m.end()) { + // not likely though + continue; + } + mons[hostname->second].push_back(it->first); + } + + dump_services(f, mons, "mon"); + return 0; +} + +// ---------------------------------------------- +// scrub + +int Monitor::scrub_start() +{ + dout(10) << __func__ << dendl; + assert(is_leader()); + + if (!scrub_result.empty()) { + clog->info() << "scrub already in progress"; + return -EBUSY; + } + + scrub_event_cancel(); + scrub_result.clear(); + scrub_state.reset(new ScrubState); + + scrub(); + return 0; +} + +int Monitor::scrub() +{ + assert(is_leader()); + assert(scrub_state); + + scrub_cancel_timeout(); + wait_for_paxos_write(); + scrub_version = paxos->get_version(); + + + // scrub all keys if we're the only monitor in the quorum + int32_t num_keys = + (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys); + + for (set::iterator p = quorum.begin(); + p != quorum.end(); + ++p) { + if (*p == rank) + continue; + MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version, + num_keys); + r->key = scrub_state->last_key; + messenger->send_message(r, monmap->get_inst(*p)); + } + + // scrub my keys + bool r = _scrub(&scrub_result[rank], + &scrub_state->last_key, + &num_keys); + + scrub_state->finished = !r; + + // only after we got our scrub results do we really care whether the + // other monitors are late on their results. Also, this way we avoid + // triggering the timeout if we end up getting stuck in _scrub() for + // longer than the duration of the timeout. + scrub_reset_timeout(); + + if (quorum.size() == 1) { + assert(scrub_state->finished == true); + scrub_finish(); + } + return 0; +} + +void Monitor::handle_scrub(MonOpRequestRef op) +{ + MMonScrub *m = static_cast(op->get_req()); + dout(10) << __func__ << " " << *m << dendl; + switch (m->op) { + case MMonScrub::OP_SCRUB: + { + if (!is_peon()) + break; + + wait_for_paxos_write(); + + if (m->version != paxos->get_version()) + break; + + MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT, + m->version, + m->num_keys); + + reply->key = m->key; + _scrub(&reply->result, &reply->key, &reply->num_keys); + m->get_connection()->send_message(reply); + } + break; + + case MMonScrub::OP_RESULT: + { + if (!is_leader()) + break; + if (m->version != scrub_version) + break; + // reset the timeout each time we get a result + scrub_reset_timeout(); + + int from = m->get_source().num(); + assert(scrub_result.count(from) == 0); + scrub_result[from] = m->result; + + if (scrub_result.size() == quorum.size()) { + scrub_check_results(); + scrub_result.clear(); + if (scrub_state->finished) + scrub_finish(); + else + scrub(); + } + } + break; + } +} + +bool Monitor::_scrub(ScrubResult *r, + pair *start, + int *num_keys) +{ + assert(r != NULL); + assert(start != NULL); + assert(num_keys != NULL); + + set prefixes = get_sync_targets_names(); + prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc. + + dout(10) << __func__ << " start (" << *start << ")" + << " num_keys " << *num_keys << dendl; + + MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes); + + int scrubbed_keys = 0; + pair last_key; + + while (it->has_next_chunk()) { + + if (*num_keys > 0 && scrubbed_keys == *num_keys) + break; + + pair k = it->get_next_key(); + if (prefixes.count(k.first) == 0) + continue; + + if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 && + (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) { + dout(10) << __func__ << " inject missing key, skipping (" << k << ")" + << dendl; + continue; + } + + bufferlist bl; + int err = store->get(k.first, k.second, bl); + assert(err == 0); + + uint32_t key_crc = bl.crc32c(0); + dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes" + << " crc " << key_crc << dendl; + r->prefix_keys[k.first]++; + if (r->prefix_crc.count(k.first) == 0) { + r->prefix_crc[k.first] = 0; + } + r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]); + + if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 && + (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) { + dout(10) << __func__ << " inject failure at (" << k << ")" << dendl; + r->prefix_crc[k.first] += 1; + } + + ++scrubbed_keys; + last_key = k; + } + + dout(20) << __func__ << " last_key (" << last_key << ")" + << " scrubbed_keys " << scrubbed_keys + << " has_next " << it->has_next_chunk() << dendl; + + *start = last_key; + *num_keys = scrubbed_keys; + + return it->has_next_chunk(); +} + +void Monitor::scrub_check_results() +{ + dout(10) << __func__ << dendl; + + // compare + int errors = 0; + ScrubResult& mine = scrub_result[rank]; + for (map::iterator p = scrub_result.begin(); + p != scrub_result.end(); + ++p) { + if (p->first == rank) + continue; + if (p->second != mine) { + ++errors; + clog->error() << "scrub mismatch"; + clog->error() << " mon." << rank << " " << mine; + clog->error() << " mon." << p->first << " " << p->second; + } + } + if (!errors) + clog->debug() << "scrub ok on " << quorum << ": " << mine; +} + +inline void Monitor::scrub_timeout() +{ + dout(1) << __func__ << " restarting scrub" << dendl; + scrub_reset(); + scrub_start(); +} + +void Monitor::scrub_finish() +{ + dout(10) << __func__ << dendl; + scrub_reset(); + scrub_event_start(); +} + +void Monitor::scrub_reset() +{ + dout(10) << __func__ << dendl; + scrub_cancel_timeout(); + scrub_version = 0; + scrub_result.clear(); + scrub_state.reset(); +} + +inline void Monitor::scrub_update_interval(int secs) +{ + // we don't care about changes if we are not the leader. + // changes will be visible if we become the leader. + if (!is_leader()) + return; + + dout(1) << __func__ << " new interval = " << secs << dendl; + + // if scrub already in progress, all changes will already be visible during + // the next round. Nothing to do. + if (scrub_state != NULL) + return; + + scrub_event_cancel(); + scrub_event_start(); +} + +void Monitor::scrub_event_start() +{ + dout(10) << __func__ << dendl; + + if (scrub_event) + scrub_event_cancel(); + + if (cct->_conf->mon_scrub_interval <= 0) { + dout(1) << __func__ << " scrub event is disabled" + << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval + << ")" << dendl; + return; + } + + scrub_event = timer.add_event_after( + cct->_conf->mon_scrub_interval, + new C_MonContext(this, [this](int) { + scrub_start(); + })); +} + +void Monitor::scrub_event_cancel() +{ + dout(10) << __func__ << dendl; + if (scrub_event) { + timer.cancel_event(scrub_event); + scrub_event = NULL; + } +} + +inline void Monitor::scrub_cancel_timeout() +{ + if (scrub_timeout_event) { + timer.cancel_event(scrub_timeout_event); + scrub_timeout_event = NULL; + } +} + +void Monitor::scrub_reset_timeout() +{ + dout(15) << __func__ << " reset timeout event" << dendl; + scrub_cancel_timeout(); + scrub_timeout_event = timer.add_event_after( + g_conf->mon_scrub_timeout, + new C_MonContext(this, [this](int) { + scrub_timeout(); + })); +} + +/************ TICK ***************/ +void Monitor::new_tick() +{ + timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) { + tick(); + })); +} + +void Monitor::tick() +{ + // ok go. + dout(11) << "tick" << dendl; + const utime_t now = ceph_clock_now(); + + // Check if we need to emit any delayed health check updated messages + if (is_leader()) { + const auto min_period = g_conf->get_val( + "mon_health_log_update_period"); + for (auto& svc : paxos_service) { + auto health = svc->get_health_checks(); + + for (const auto &i : health.checks) { + const std::string &code = i.first; + const std::string &summary = i.second.summary; + const health_status_t severity = i.second.severity; + + auto status_iter = health_check_log_times.find(code); + if (status_iter == health_check_log_times.end()) { + continue; + } + + auto &log_status = status_iter->second; + bool const changed = log_status.last_message != summary + || log_status.severity != severity; + + if (changed && now - log_status.updated_at > min_period) { + log_status.last_message = summary; + log_status.updated_at = now; + log_status.severity = severity; + + ostringstream ss; + ss << "Health check update: " << summary << " (" << code << ")"; + clog->health(severity) << ss.str(); + } + } + } + } + + + for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) { + (*p)->tick(); + (*p)->maybe_trim(); + } + + // trim sessions + { + Mutex::Locker l(session_map_lock); + auto p = session_map.sessions.begin(); + + bool out_for_too_long = (!exited_quorum.is_zero() && + now > (exited_quorum + 2*g_conf->mon_lease)); + + while (!p.end()) { + MonSession *s = *p; + ++p; + + // don't trim monitors + if (s->inst.name.is_mon()) + continue; + + if (s->session_timeout < now && s->con) { + // check keepalive, too + s->session_timeout = s->con->get_last_keepalive(); + s->session_timeout += g_conf->mon_session_timeout; + } + if (s->session_timeout < now) { + dout(10) << " trimming session " << s->con << " " << s->inst + << " (timeout " << s->session_timeout + << " < now " << now << ")" << dendl; + } else if (out_for_too_long) { + // boot the client Session because we've taken too long getting back in + dout(10) << " trimming session " << s->con << " " << s->inst + << " because we've been out of quorum too long" << dendl; + } else { + continue; + } + + s->con->mark_down(); + remove_session(s); + logger->inc(l_mon_session_trim); + } + } + sync_trim_providers(); + + if (!maybe_wait_for_quorum.empty()) { + finish_contexts(g_ceph_context, maybe_wait_for_quorum); + } + + if (is_leader() && paxos->is_active() && fingerprint.is_zero()) { + // this is only necessary on upgraded clusters. + MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); + prepare_new_fingerprint(t); + paxos->trigger_propose(); + } + + new_tick(); +} + +void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t) +{ + uuid_d nf; + nf.generate_random(); + dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl; + + bufferlist bl; + ::encode(nf, bl); + t->put(MONITOR_NAME, "cluster_fingerprint", bl); +} + +int Monitor::check_fsid() +{ + bufferlist ebl; + int r = store->get(MONITOR_NAME, "cluster_uuid", ebl); + if (r == -ENOENT) + return r; + assert(r == 0); + + string es(ebl.c_str(), ebl.length()); + + // only keep the first line + size_t pos = es.find_first_of('\n'); + if (pos != string::npos) + es.resize(pos); + + dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl; + uuid_d ondisk; + if (!ondisk.parse(es.c_str())) { + derr << "error: unable to parse uuid" << dendl; + return -EINVAL; + } + + if (monmap->get_fsid() != ondisk) { + derr << "error: cluster_uuid file exists with value " << ondisk + << ", != our uuid " << monmap->get_fsid() << dendl; + return -EEXIST; + } + + return 0; +} + +int Monitor::write_fsid() +{ + auto t(std::make_shared()); + write_fsid(t); + int r = store->apply_transaction(t); + return r; +} + +int Monitor::write_fsid(MonitorDBStore::TransactionRef t) +{ + ostringstream ss; + ss << monmap->get_fsid() << "\n"; + string us = ss.str(); + + bufferlist b; + b.append(us); + + t->put(MONITOR_NAME, "cluster_uuid", b); + return 0; +} + +/* + * this is the closest thing to a traditional 'mkfs' for ceph. + * initialize the monitor state machines to their initial values. + */ +int Monitor::mkfs(bufferlist& osdmapbl) +{ + auto t(std::make_shared()); + + // verify cluster fsid + int r = check_fsid(); + if (r < 0 && r != -ENOENT) + return r; + + bufferlist magicbl; + magicbl.append(CEPH_MON_ONDISK_MAGIC); + magicbl.append("\n"); + t->put(MONITOR_NAME, "magic", magicbl); + + + features = get_initial_supported_features(); + write_features(t); + + // save monmap, osdmap, keyring. + bufferlist monmapbl; + monmap->encode(monmapbl, CEPH_FEATURES_ALL); + monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos() + t->put("mkfs", "monmap", monmapbl); + + if (osdmapbl.length()) { + // make sure it's a valid osdmap + try { + OSDMap om; + om.decode(osdmapbl); + } + catch (buffer::error& e) { + derr << "error decoding provided osdmap: " << e.what() << dendl; + return -EINVAL; + } + t->put("mkfs", "osdmap", osdmapbl); + } + + if (is_keyring_required()) { + KeyRing keyring; + string keyring_filename; + + r = ceph_resolve_file_search(g_conf->keyring, keyring_filename); + if (r) { + derr << "unable to find a keyring file on " << g_conf->keyring + << ": " << cpp_strerror(r) << dendl; + if (g_conf->key != "") { + string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key + + "\n\tcaps mon = \"allow *\"\n"; + bufferlist bl; + bl.append(keyring_plaintext); + try { + bufferlist::iterator i = bl.begin(); + keyring.decode_plaintext(i); + } + catch (const buffer::error& e) { + derr << "error decoding keyring " << keyring_plaintext + << ": " << e.what() << dendl; + return -EINVAL; + } + } else { + return -ENOENT; + } + } else { + r = keyring.load(g_ceph_context, keyring_filename); + if (r < 0) { + derr << "unable to load initial keyring " << g_conf->keyring << dendl; + return r; + } + } + + // put mon. key in external keyring; seed with everything else. + extract_save_mon_key(keyring); + + bufferlist keyringbl; + keyring.encode_plaintext(keyringbl); + t->put("mkfs", "keyring", keyringbl); + } + write_fsid(t); + store->apply_transaction(t); + + return 0; +} + +int Monitor::write_default_keyring(bufferlist& bl) +{ + ostringstream os; + os << g_conf->mon_data << "/keyring"; + + int err = 0; + int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600); + if (fd < 0) { + err = -errno; + dout(0) << __func__ << " failed to open " << os.str() + << ": " << cpp_strerror(err) << dendl; + return err; + } + + err = bl.write_fd(fd); + if (!err) + ::fsync(fd); + VOID_TEMP_FAILURE_RETRY(::close(fd)); + + return err; +} + +void Monitor::extract_save_mon_key(KeyRing& keyring) +{ + EntityName mon_name; + mon_name.set_type(CEPH_ENTITY_TYPE_MON); + EntityAuth mon_key; + if (keyring.get_auth(mon_name, mon_key)) { + dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl; + KeyRing pkey; + pkey.add(mon_name, mon_key); + bufferlist bl; + pkey.encode_plaintext(bl); + write_default_keyring(bl); + keyring.remove(mon_name); + } +} + +bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer, + bool force_new) +{ + dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id) + << dendl; + + if (is_shutdown()) + return false; + + // we only connect to other monitors and mgr; every else connects to us. + if (service_id != CEPH_ENTITY_TYPE_MON && + service_id != CEPH_ENTITY_TYPE_MGR) + return false; + + if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) { + // auth_none + dout(20) << __func__ << " building auth_none authorizer" << dendl; + AuthNoneClientHandler handler(g_ceph_context, nullptr); + handler.set_global_id(0); + *authorizer = handler.build_authorizer(service_id); + return true; + } + + CephXServiceTicketInfo auth_ticket_info; + CephXSessionAuthInfo info; + int ret; + + EntityName name; + name.set_type(CEPH_ENTITY_TYPE_MON); + auth_ticket_info.ticket.name = name; + auth_ticket_info.ticket.global_id = 0; + + if (service_id == CEPH_ENTITY_TYPE_MON) { + // mon to mon authentication uses the private monitor shared key and not the + // rotating key + CryptoKey secret; + if (!keyring.get_secret(name, secret) && + !key_server.get_secret(name, secret)) { + dout(0) << " couldn't get secret for mon service from keyring or keyserver" + << dendl; + stringstream ss, ds; + int err = key_server.list_secrets(ds); + if (err < 0) + ss << "no installed auth entries!"; + else + ss << "installed auth entries:"; + dout(0) << ss.str() << "\n" << ds.str() << dendl; + return false; + } + + ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info, + secret, (uint64_t)-1); + if (ret < 0) { + dout(0) << __func__ << " failed to build mon session_auth_info " + << cpp_strerror(ret) << dendl; + return false; + } + } else if (service_id == CEPH_ENTITY_TYPE_MGR) { + // mgr + ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info); + if (ret < 0) { + derr << __func__ << " failed to build mgr service session_auth_info " + << cpp_strerror(ret) << dendl; + return false; + } + } else { + ceph_abort(); // see check at top of fn + } + + CephXTicketBlob blob; + if (!cephx_build_service_ticket_blob(cct, info, blob)) { + dout(0) << "ms_get_authorizer failed to build service ticket" << dendl; + return false; + } + bufferlist ticket_data; + ::encode(blob, ticket_data); + + bufferlist::iterator iter = ticket_data.begin(); + CephXTicketHandler handler(g_ceph_context, service_id); + ::decode(handler.ticket, iter); + + handler.session_key = info.session_key; + + *authorizer = handler.build_authorizer(0); + + return true; +} + +bool Monitor::ms_verify_authorizer(Connection *con, int peer_type, + int protocol, bufferlist& authorizer_data, + bufferlist& authorizer_reply, + bool& isvalid, CryptoKey& session_key) +{ + dout(10) << "ms_verify_authorizer " << con->get_peer_addr() + << " " << ceph_entity_type_name(peer_type) + << " protocol " << protocol << dendl; + + if (is_shutdown()) + return false; + + if (peer_type == CEPH_ENTITY_TYPE_MON && + auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) { + // monitor, and cephx is enabled + isvalid = false; + if (protocol == CEPH_AUTH_CEPHX) { + bufferlist::iterator iter = authorizer_data.begin(); + CephXServiceTicketInfo auth_ticket_info; + + if (authorizer_data.length()) { + bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter, + auth_ticket_info, authorizer_reply); + if (ret) { + session_key = auth_ticket_info.session_key; + isvalid = true; + } else { + dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl; + } + } + } else { + dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl; + } + } else { + // who cares. + isvalid = true; + } + return true; +}