X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmon%2FMonitor.cc;fp=src%2Fceph%2Fsrc%2Fmon%2FMonitor.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=ce2ab39167d96732479677feaf975a58e58bfd09;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/mon/Monitor.cc b/src/ceph/src/mon/Monitor.cc deleted file mode 100644 index ce2ab39..0000000 --- a/src/ceph/src/mon/Monitor.cc +++ /dev/null @@ -1,5917 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#include -#include -#include -#include -#include -#include -#include - -#include "Monitor.h" -#include "common/version.h" - -#include "osd/OSDMap.h" - -#include "MonitorDBStore.h" - -#include "messages/PaxosServiceMessage.h" -#include "messages/MMonMap.h" -#include "messages/MMonGetMap.h" -#include "messages/MMonGetVersion.h" -#include "messages/MMonGetVersionReply.h" -#include "messages/MGenericMessage.h" -#include "messages/MMonCommand.h" -#include "messages/MMonCommandAck.h" -#include "messages/MMonHealth.h" -#include "messages/MMonMetadata.h" -#include "messages/MMonSync.h" -#include "messages/MMonScrub.h" -#include "messages/MMonProbe.h" -#include "messages/MMonJoin.h" -#include "messages/MMonPaxos.h" -#include "messages/MRoute.h" -#include "messages/MForward.h" -#include "messages/MStatfs.h" - -#include "messages/MMonSubscribe.h" -#include "messages/MMonSubscribeAck.h" - -#include "messages/MAuthReply.h" - -#include "messages/MTimeCheck.h" -#include "messages/MPing.h" - -#include "common/strtol.h" -#include "common/ceph_argparse.h" -#include "common/Timer.h" -#include "common/Clock.h" -#include "common/errno.h" -#include "common/perf_counters.h" -#include "common/admin_socket.h" -#include "global/signal_handler.h" -#include "common/Formatter.h" -#include "include/stringify.h" -#include "include/color.h" -#include "include/ceph_fs.h" -#include "include/str_list.h" - -#include "OSDMonitor.h" -#include "MDSMonitor.h" -#include "MonmapMonitor.h" -#include "PGMonitor.h" -#include "LogMonitor.h" -#include "AuthMonitor.h" -#include "MgrMonitor.h" -#include "MgrStatMonitor.h" -#include "mon/QuorumService.h" -#include "mon/OldHealthMonitor.h" -#include "mon/HealthMonitor.h" -#include "mon/ConfigKeyService.h" -#include "common/config.h" -#include "common/cmdparse.h" -#include "include/assert.h" -#include "include/compat.h" -#include "perfglue/heap_profiler.h" - -#include "auth/none/AuthNoneClientHandler.h" - -#define dout_subsys ceph_subsys_mon -#undef dout_prefix -#define dout_prefix _prefix(_dout, this) -static ostream& _prefix(std::ostream *_dout, const Monitor *mon) { - return *_dout << "mon." << mon->name << "@" << mon->rank - << "(" << mon->get_state_name() << ") e" << mon->monmap->get_epoch() << " "; -} - -const string Monitor::MONITOR_NAME = "monitor"; -const string Monitor::MONITOR_STORE_PREFIX = "monitor_store"; - - -#undef FLAG -#undef COMMAND -#undef COMMAND_WITH_FLAG -#define FLAG(f) (MonCommand::FLAG_##f) -#define COMMAND(parsesig, helptext, modulename, req_perms, avail) \ - {parsesig, helptext, modulename, req_perms, avail, FLAG(NONE)}, -#define COMMAND_WITH_FLAG(parsesig, helptext, modulename, req_perms, avail, flags) \ - {parsesig, helptext, modulename, req_perms, avail, flags}, -MonCommand mon_commands[] = { -#include -}; -MonCommand pgmonitor_commands[] = { -#include -}; -#undef COMMAND -#undef COMMAND_WITH_FLAG - - -void C_MonContext::finish(int r) { - if (mon->is_shutdown()) - return; - FunctionContext::finish(r); -} - -Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s, - Messenger *m, Messenger *mgr_m, MonMap *map) : - Dispatcher(cct_), - name(nm), - rank(-1), - messenger(m), - con_self(m ? m->get_loopback_connection() : NULL), - lock("Monitor::lock"), - timer(cct_, lock), - finisher(cct_, "mon_finisher", "fin"), - cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf->mon_cpu_threads), - has_ever_joined(false), - logger(NULL), cluster_logger(NULL), cluster_logger_registered(false), - monmap(map), - log_client(cct_, messenger, monmap, LogClient::FLAG_MON), - key_server(cct, &keyring), - auth_cluster_required(cct, - cct->_conf->auth_supported.empty() ? - cct->_conf->auth_cluster_required : cct->_conf->auth_supported), - auth_service_required(cct, - cct->_conf->auth_supported.empty() ? - cct->_conf->auth_service_required : cct->_conf->auth_supported ), - mgr_messenger(mgr_m), - mgr_client(cct_, mgr_m), - pgservice(nullptr), - store(s), - - state(STATE_PROBING), - - elector(this), - required_features(0), - leader(0), - quorum_con_features(0), - // scrub - scrub_version(0), - scrub_event(NULL), - scrub_timeout_event(NULL), - - // sync state - sync_provider_count(0), - sync_cookie(0), - sync_full(false), - sync_start_version(0), - sync_timeout_event(NULL), - sync_last_committed_floor(0), - - timecheck_round(0), - timecheck_acks(0), - timecheck_rounds_since_clean(0), - timecheck_event(NULL), - - paxos_service(PAXOS_NUM), - admin_hook(NULL), - routed_request_tid(0), - op_tracker(cct, true, 1) -{ - clog = log_client.create_channel(CLOG_CHANNEL_CLUSTER); - audit_clog = log_client.create_channel(CLOG_CHANNEL_AUDIT); - - update_log_clients(); - - paxos = new Paxos(this, "paxos"); - - paxos_service[PAXOS_MDSMAP] = new MDSMonitor(this, paxos, "mdsmap"); - paxos_service[PAXOS_MONMAP] = new MonmapMonitor(this, paxos, "monmap"); - paxos_service[PAXOS_OSDMAP] = new OSDMonitor(cct, this, paxos, "osdmap"); - paxos_service[PAXOS_PGMAP] = new PGMonitor(this, paxos, "pgmap"); - paxos_service[PAXOS_LOG] = new LogMonitor(this, paxos, "logm"); - paxos_service[PAXOS_AUTH] = new AuthMonitor(this, paxos, "auth"); - paxos_service[PAXOS_MGR] = new MgrMonitor(this, paxos, "mgr"); - paxos_service[PAXOS_MGRSTAT] = new MgrStatMonitor(this, paxos, "mgrstat"); - paxos_service[PAXOS_HEALTH] = new HealthMonitor(this, paxos, "health"); - - health_monitor = new OldHealthMonitor(this); - config_key_service = new ConfigKeyService(this, paxos); - - mon_caps = new MonCap(); - bool r = mon_caps->parse("allow *", NULL); - assert(r); - - exited_quorum = ceph_clock_now(); - - // prepare local commands - local_mon_commands.resize(ARRAY_SIZE(mon_commands)); - for (unsigned i = 0; i < ARRAY_SIZE(mon_commands); ++i) { - local_mon_commands[i] = mon_commands[i]; - } - MonCommand::encode_vector(local_mon_commands, local_mon_commands_bl); - - local_upgrading_mon_commands = local_mon_commands; - for (unsigned i = 0; i < ARRAY_SIZE(pgmonitor_commands); ++i) { - local_upgrading_mon_commands.push_back(pgmonitor_commands[i]); - } - MonCommand::encode_vector(local_upgrading_mon_commands, - local_upgrading_mon_commands_bl); - - // assume our commands until we have an election. this only means - // we won't reply with EINVAL before the election; any command that - // actually matters will wait until we have quorum etc and then - // retry (and revalidate). - leader_mon_commands = local_mon_commands; - - // note: OSDMonitor may update this based on the luminous flag. - pgservice = mgrstatmon()->get_pg_stat_service(); -} - -Monitor::~Monitor() -{ - for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) - delete *p; - delete health_monitor; - delete config_key_service; - delete paxos; - assert(session_map.sessions.empty()); - delete mon_caps; -} - - -class AdminHook : public AdminSocketHook { - Monitor *mon; -public: - explicit AdminHook(Monitor *m) : mon(m) {} - bool call(std::string command, cmdmap_t& cmdmap, std::string format, - bufferlist& out) override { - stringstream ss; - mon->do_admin_command(command, cmdmap, format, ss); - out.append(ss); - return true; - } -}; - -void Monitor::do_admin_command(string command, cmdmap_t& cmdmap, string format, - ostream& ss) -{ - Mutex::Locker l(lock); - - boost::scoped_ptr f(Formatter::create(format)); - - string args; - for (cmdmap_t::iterator p = cmdmap.begin(); - p != cmdmap.end(); ++p) { - if (p->first == "prefix") - continue; - if (!args.empty()) - args += ", "; - args += cmd_vartype_stringify(p->second); - } - args = "[" + args + "]"; - - bool read_only = (command == "mon_status" || - command == "mon metadata" || - command == "quorum_status" || - command == "ops" || - command == "sessions"); - - (read_only ? audit_clog->debug() : audit_clog->info()) - << "from='admin socket' entity='admin socket' " - << "cmd='" << command << "' args=" << args << ": dispatch"; - - if (command == "mon_status") { - get_mon_status(f.get(), ss); - if (f) - f->flush(ss); - } else if (command == "quorum_status") { - _quorum_status(f.get(), ss); - } else if (command == "sync_force") { - string validate; - if ((!cmd_getval(g_ceph_context, cmdmap, "validate", validate)) || - (validate != "--yes-i-really-mean-it")) { - ss << "are you SURE? this will mean the monitor store will be erased " - "the next time the monitor is restarted. pass " - "'--yes-i-really-mean-it' if you really do."; - goto abort; - } - sync_force(f.get(), ss); - } else if (command.compare(0, 23, "add_bootstrap_peer_hint") == 0) { - if (!_add_bootstrap_peer_hint(command, cmdmap, ss)) - goto abort; - } else if (command == "quorum enter") { - elector.start_participating(); - start_election(); - ss << "started responding to quorum, initiated new election"; - } else if (command == "quorum exit") { - start_election(); - elector.stop_participating(); - ss << "stopped responding to quorum, initiated new election"; - } else if (command == "ops") { - (void)op_tracker.dump_ops_in_flight(f.get()); - if (f) { - f->flush(ss); - } - } else if (command == "sessions") { - - if (f) { - f->open_array_section("sessions"); - for (auto p : session_map.sessions) { - f->dump_stream("session") << *p; - } - f->close_section(); - f->flush(ss); - } - - } else { - assert(0 == "bad AdminSocket command binding"); - } - (read_only ? audit_clog->debug() : audit_clog->info()) - << "from='admin socket' " - << "entity='admin socket' " - << "cmd=" << command << " " - << "args=" << args << ": finished"; - return; - -abort: - (read_only ? audit_clog->debug() : audit_clog->info()) - << "from='admin socket' " - << "entity='admin socket' " - << "cmd=" << command << " " - << "args=" << args << ": aborted"; -} - -void Monitor::handle_signal(int signum) -{ - assert(signum == SIGINT || signum == SIGTERM); - derr << "*** Got Signal " << sig_str(signum) << " ***" << dendl; - shutdown(); -} - -CompatSet Monitor::get_initial_supported_features() -{ - CompatSet::FeatureSet ceph_mon_feature_compat; - CompatSet::FeatureSet ceph_mon_feature_ro_compat; - CompatSet::FeatureSet ceph_mon_feature_incompat; - ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE); - ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_SINGLE_PAXOS); - return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat, - ceph_mon_feature_incompat); -} - -CompatSet Monitor::get_supported_features() -{ - CompatSet compat = get_initial_supported_features(); - compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES); - compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC); - compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2); - compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3); - compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN); - compat.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS); - return compat; -} - -CompatSet Monitor::get_legacy_features() -{ - CompatSet::FeatureSet ceph_mon_feature_compat; - CompatSet::FeatureSet ceph_mon_feature_ro_compat; - CompatSet::FeatureSet ceph_mon_feature_incompat; - ceph_mon_feature_incompat.insert(CEPH_MON_FEATURE_INCOMPAT_BASE); - return CompatSet(ceph_mon_feature_compat, ceph_mon_feature_ro_compat, - ceph_mon_feature_incompat); -} - -int Monitor::check_features(MonitorDBStore *store) -{ - CompatSet required = get_supported_features(); - CompatSet ondisk; - - read_features_off_disk(store, &ondisk); - - if (!required.writeable(ondisk)) { - CompatSet diff = required.unsupported(ondisk); - generic_derr << "ERROR: on disk data includes unsupported features: " << diff << dendl; - return -EPERM; - } - - return 0; -} - -void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features) -{ - bufferlist featuresbl; - store->get(MONITOR_NAME, COMPAT_SET_LOC, featuresbl); - if (featuresbl.length() == 0) { - generic_dout(0) << "WARNING: mon fs missing feature list.\n" - << "Assuming it is old-style and introducing one." << dendl; - //we only want the baseline ~v.18 features assumed to be on disk. - //If new features are introduced this code needs to disappear or - //be made smarter. - *features = get_legacy_features(); - - features->encode(featuresbl); - auto t(std::make_shared()); - t->put(MONITOR_NAME, COMPAT_SET_LOC, featuresbl); - store->apply_transaction(t); - } else { - bufferlist::iterator it = featuresbl.begin(); - features->decode(it); - } -} - -void Monitor::read_features() -{ - read_features_off_disk(store, &features); - dout(10) << "features " << features << dendl; - - calc_quorum_requirements(); - dout(10) << "required_features " << required_features << dendl; -} - -void Monitor::write_features(MonitorDBStore::TransactionRef t) -{ - bufferlist bl; - features.encode(bl); - t->put(MONITOR_NAME, COMPAT_SET_LOC, bl); -} - -const char** Monitor::get_tracked_conf_keys() const -{ - static const char* KEYS[] = { - "crushtool", // helpful for testing - "mon_election_timeout", - "mon_lease", - "mon_lease_renew_interval_factor", - "mon_lease_ack_timeout_factor", - "mon_accept_timeout_factor", - // clog & admin clog - "clog_to_monitors", - "clog_to_syslog", - "clog_to_syslog_facility", - "clog_to_syslog_level", - "clog_to_graylog", - "clog_to_graylog_host", - "clog_to_graylog_port", - "host", - "fsid", - // periodic health to clog - "mon_health_to_clog", - "mon_health_to_clog_interval", - "mon_health_to_clog_tick_interval", - // scrub interval - "mon_scrub_interval", - NULL - }; - return KEYS; -} - -void Monitor::handle_conf_change(const struct md_config_t *conf, - const std::set &changed) -{ - sanitize_options(); - - dout(10) << __func__ << " " << changed << dendl; - - if (changed.count("clog_to_monitors") || - changed.count("clog_to_syslog") || - changed.count("clog_to_syslog_level") || - changed.count("clog_to_syslog_facility") || - changed.count("clog_to_graylog") || - changed.count("clog_to_graylog_host") || - changed.count("clog_to_graylog_port") || - changed.count("host") || - changed.count("fsid")) { - update_log_clients(); - } - - if (changed.count("mon_health_to_clog") || - changed.count("mon_health_to_clog_interval") || - changed.count("mon_health_to_clog_tick_interval")) { - health_to_clog_update_conf(changed); - } - - if (changed.count("mon_scrub_interval")) { - scrub_update_interval(conf->mon_scrub_interval); - } -} - -void Monitor::update_log_clients() -{ - map log_to_monitors; - map log_to_syslog; - map log_channel; - map log_prio; - map log_to_graylog; - map log_to_graylog_host; - map log_to_graylog_port; - uuid_d fsid; - string host; - - if (parse_log_client_options(g_ceph_context, log_to_monitors, log_to_syslog, - log_channel, log_prio, log_to_graylog, - log_to_graylog_host, log_to_graylog_port, - fsid, host)) - return; - - clog->update_config(log_to_monitors, log_to_syslog, - log_channel, log_prio, log_to_graylog, - log_to_graylog_host, log_to_graylog_port, - fsid, host); - - audit_clog->update_config(log_to_monitors, log_to_syslog, - log_channel, log_prio, log_to_graylog, - log_to_graylog_host, log_to_graylog_port, - fsid, host); -} - -int Monitor::sanitize_options() -{ - int r = 0; - - // mon_lease must be greater than mon_lease_renewal; otherwise we - // may incur in leases expiring before they are renewed. - if (g_conf->mon_lease_renew_interval_factor >= 1.0) { - clog->error() << "mon_lease_renew_interval_factor (" - << g_conf->mon_lease_renew_interval_factor - << ") must be less than 1.0"; - r = -EINVAL; - } - - // mon_lease_ack_timeout must be greater than mon_lease to make sure we've - // got time to renew the lease and get an ack for it. Having both options - // with the same value, for a given small vale, could mean timing out if - // the monitors happened to be overloaded -- or even under normal load for - // a small enough value. - if (g_conf->mon_lease_ack_timeout_factor <= 1.0) { - clog->error() << "mon_lease_ack_timeout_factor (" - << g_conf->mon_lease_ack_timeout_factor - << ") must be greater than 1.0"; - r = -EINVAL; - } - - return r; -} - -int Monitor::preinit() -{ - lock.Lock(); - - dout(1) << "preinit fsid " << monmap->fsid << dendl; - - int r = sanitize_options(); - if (r < 0) { - derr << "option sanitization failed!" << dendl; - lock.Unlock(); - return r; - } - - assert(!logger); - { - PerfCountersBuilder pcb(g_ceph_context, "mon", l_mon_first, l_mon_last); - pcb.add_u64(l_mon_num_sessions, "num_sessions", "Open sessions", "sess", - PerfCountersBuilder::PRIO_USEFUL); - pcb.add_u64_counter(l_mon_session_add, "session_add", "Created sessions", - "sadd", PerfCountersBuilder::PRIO_INTERESTING); - pcb.add_u64_counter(l_mon_session_rm, "session_rm", "Removed sessions", - "srm", PerfCountersBuilder::PRIO_INTERESTING); - pcb.add_u64_counter(l_mon_session_trim, "session_trim", "Trimmed sessions", - "strm", PerfCountersBuilder::PRIO_USEFUL); - pcb.add_u64_counter(l_mon_num_elections, "num_elections", "Elections participated in", - "ecnt", PerfCountersBuilder::PRIO_USEFUL); - pcb.add_u64_counter(l_mon_election_call, "election_call", "Elections started", - "estt", PerfCountersBuilder::PRIO_INTERESTING); - pcb.add_u64_counter(l_mon_election_win, "election_win", "Elections won", - "ewon", PerfCountersBuilder::PRIO_INTERESTING); - pcb.add_u64_counter(l_mon_election_lose, "election_lose", "Elections lost", - "elst", PerfCountersBuilder::PRIO_INTERESTING); - logger = pcb.create_perf_counters(); - cct->get_perfcounters_collection()->add(logger); - } - - assert(!cluster_logger); - { - PerfCountersBuilder pcb(g_ceph_context, "cluster", l_cluster_first, l_cluster_last); - pcb.add_u64(l_cluster_num_mon, "num_mon", "Monitors"); - pcb.add_u64(l_cluster_num_mon_quorum, "num_mon_quorum", "Monitors in quorum"); - pcb.add_u64(l_cluster_num_osd, "num_osd", "OSDs"); - pcb.add_u64(l_cluster_num_osd_up, "num_osd_up", "OSDs that are up"); - pcb.add_u64(l_cluster_num_osd_in, "num_osd_in", "OSD in state \"in\" (they are in cluster)"); - pcb.add_u64(l_cluster_osd_epoch, "osd_epoch", "Current epoch of OSD map"); - pcb.add_u64(l_cluster_osd_bytes, "osd_bytes", "Total capacity of cluster"); - pcb.add_u64(l_cluster_osd_bytes_used, "osd_bytes_used", "Used space"); - pcb.add_u64(l_cluster_osd_bytes_avail, "osd_bytes_avail", "Available space"); - pcb.add_u64(l_cluster_num_pool, "num_pool", "Pools"); - pcb.add_u64(l_cluster_num_pg, "num_pg", "Placement groups"); - pcb.add_u64(l_cluster_num_pg_active_clean, "num_pg_active_clean", "Placement groups in active+clean state"); - pcb.add_u64(l_cluster_num_pg_active, "num_pg_active", "Placement groups in active state"); - pcb.add_u64(l_cluster_num_pg_peering, "num_pg_peering", "Placement groups in peering state"); - pcb.add_u64(l_cluster_num_object, "num_object", "Objects"); - pcb.add_u64(l_cluster_num_object_degraded, "num_object_degraded", "Degraded (missing replicas) objects"); - pcb.add_u64(l_cluster_num_object_misplaced, "num_object_misplaced", "Misplaced (wrong location in the cluster) objects"); - pcb.add_u64(l_cluster_num_object_unfound, "num_object_unfound", "Unfound objects"); - pcb.add_u64(l_cluster_num_bytes, "num_bytes", "Size of all objects"); - pcb.add_u64(l_cluster_num_mds_up, "num_mds_up", "MDSs that are up"); - pcb.add_u64(l_cluster_num_mds_in, "num_mds_in", "MDS in state \"in\" (they are in cluster)"); - pcb.add_u64(l_cluster_num_mds_failed, "num_mds_failed", "Failed MDS"); - pcb.add_u64(l_cluster_mds_epoch, "mds_epoch", "Current epoch of MDS map"); - cluster_logger = pcb.create_perf_counters(); - } - - paxos->init_logger(); - - // verify cluster_uuid - { - int r = check_fsid(); - if (r == -ENOENT) - r = write_fsid(); - if (r < 0) { - lock.Unlock(); - return r; - } - } - - // open compatset - read_features(); - - // have we ever joined a quorum? - has_ever_joined = (store->get(MONITOR_NAME, "joined") != 0); - dout(10) << "has_ever_joined = " << (int)has_ever_joined << dendl; - - if (!has_ever_joined) { - // impose initial quorum restrictions? - list initial_members; - get_str_list(g_conf->mon_initial_members, initial_members); - - if (!initial_members.empty()) { - dout(1) << " initial_members " << initial_members << ", filtering seed monmap" << dendl; - - monmap->set_initial_members(g_ceph_context, initial_members, name, messenger->get_myaddr(), - &extra_probe_peers); - - dout(10) << " monmap is " << *monmap << dendl; - dout(10) << " extra probe peers " << extra_probe_peers << dendl; - } - } else if (!monmap->contains(name)) { - derr << "not in monmap and have been in a quorum before; " - << "must have been removed" << dendl; - if (g_conf->mon_force_quorum_join) { - dout(0) << "we should have died but " - << "'mon_force_quorum_join' is set -- allowing boot" << dendl; - } else { - derr << "commit suicide!" << dendl; - lock.Unlock(); - return -ENOENT; - } - } - - { - // We have a potentially inconsistent store state in hands. Get rid of it - // and start fresh. - bool clear_store = false; - if (store->exists("mon_sync", "in_sync")) { - dout(1) << __func__ << " clean up potentially inconsistent store state" - << dendl; - clear_store = true; - } - - if (store->get("mon_sync", "force_sync") > 0) { - dout(1) << __func__ << " force sync by clearing store state" << dendl; - clear_store = true; - } - - if (clear_store) { - set sync_prefixes = get_sync_targets_names(); - store->clear(sync_prefixes); - } - } - - sync_last_committed_floor = store->get("mon_sync", "last_committed_floor"); - dout(10) << "sync_last_committed_floor " << sync_last_committed_floor << dendl; - - init_paxos(); - health_monitor->init(); - - if (is_keyring_required()) { - // we need to bootstrap authentication keys so we can form an - // initial quorum. - if (authmon()->get_last_committed() == 0) { - dout(10) << "loading initial keyring to bootstrap authentication for mkfs" << dendl; - bufferlist bl; - int err = store->get("mkfs", "keyring", bl); - if (err == 0 && bl.length() > 0) { - // Attempt to decode and extract keyring only if it is found. - KeyRing keyring; - bufferlist::iterator p = bl.begin(); - ::decode(keyring, p); - extract_save_mon_key(keyring); - } - } - - string keyring_loc = g_conf->mon_data + "/keyring"; - - r = keyring.load(cct, keyring_loc); - if (r < 0) { - EntityName mon_name; - mon_name.set_type(CEPH_ENTITY_TYPE_MON); - EntityAuth mon_key; - if (key_server.get_auth(mon_name, mon_key)) { - dout(1) << "copying mon. key from old db to external keyring" << dendl; - keyring.add(mon_name, mon_key); - bufferlist bl; - keyring.encode_plaintext(bl); - write_default_keyring(bl); - } else { - derr << "unable to load initial keyring " << g_conf->keyring << dendl; - lock.Unlock(); - return r; - } - } - } - - admin_hook = new AdminHook(this); - AdminSocket* admin_socket = cct->get_admin_socket(); - - // unlock while registering to avoid mon_lock -> admin socket lock dependency. - lock.Unlock(); - r = admin_socket->register_command("mon_status", "mon_status", admin_hook, - "show current monitor status"); - assert(r == 0); - r = admin_socket->register_command("quorum_status", "quorum_status", - admin_hook, "show current quorum status"); - assert(r == 0); - r = admin_socket->register_command("sync_force", - "sync_force name=validate," - "type=CephChoices," - "strings=--yes-i-really-mean-it", - admin_hook, - "force sync of and clear monitor store"); - assert(r == 0); - r = admin_socket->register_command("add_bootstrap_peer_hint", - "add_bootstrap_peer_hint name=addr," - "type=CephIPAddr", - admin_hook, - "add peer address as potential bootstrap" - " peer for cluster bringup"); - assert(r == 0); - r = admin_socket->register_command("quorum enter", "quorum enter", - admin_hook, - "force monitor back into quorum"); - assert(r == 0); - r = admin_socket->register_command("quorum exit", "quorum exit", - admin_hook, - "force monitor out of the quorum"); - assert(r == 0); - r = admin_socket->register_command("ops", - "ops", - admin_hook, - "show the ops currently in flight"); - assert(r == 0); - r = admin_socket->register_command("sessions", - "sessions", - admin_hook, - "list existing sessions"); - assert(r == 0); - - lock.Lock(); - - // add ourselves as a conf observer - g_conf->add_observer(this); - - lock.Unlock(); - return 0; -} - -int Monitor::init() -{ - dout(2) << "init" << dendl; - Mutex::Locker l(lock); - - finisher.start(); - - // start ticker - timer.init(); - new_tick(); - - cpu_tp.start(); - - // i'm ready! - messenger->add_dispatcher_tail(this); - - mgr_client.init(); - mgr_messenger->add_dispatcher_tail(&mgr_client); - mgr_messenger->add_dispatcher_tail(this); // for auth ms_* calls - - bootstrap(); - // add features of myself into feature_map - session_map.feature_map.add_mon(con_self->get_features()); - return 0; -} - -void Monitor::init_paxos() -{ - dout(10) << __func__ << dendl; - paxos->init(); - - // init services - for (int i = 0; i < PAXOS_NUM; ++i) { - paxos_service[i]->init(); - } - - refresh_from_paxos(NULL); -} - -void Monitor::refresh_from_paxos(bool *need_bootstrap) -{ - dout(10) << __func__ << dendl; - - bufferlist bl; - int r = store->get(MONITOR_NAME, "cluster_fingerprint", bl); - if (r >= 0) { - try { - bufferlist::iterator p = bl.begin(); - ::decode(fingerprint, p); - } - catch (buffer::error& e) { - dout(10) << __func__ << " failed to decode cluster_fingerprint" << dendl; - } - } else { - dout(10) << __func__ << " no cluster_fingerprint" << dendl; - } - - for (int i = 0; i < PAXOS_NUM; ++i) { - paxos_service[i]->refresh(need_bootstrap); - } - for (int i = 0; i < PAXOS_NUM; ++i) { - paxos_service[i]->post_refresh(); - } - load_metadata(); -} - -void Monitor::register_cluster_logger() -{ - if (!cluster_logger_registered) { - dout(10) << "register_cluster_logger" << dendl; - cluster_logger_registered = true; - cct->get_perfcounters_collection()->add(cluster_logger); - } else { - dout(10) << "register_cluster_logger - already registered" << dendl; - } -} - -void Monitor::unregister_cluster_logger() -{ - if (cluster_logger_registered) { - dout(10) << "unregister_cluster_logger" << dendl; - cluster_logger_registered = false; - cct->get_perfcounters_collection()->remove(cluster_logger); - } else { - dout(10) << "unregister_cluster_logger - not registered" << dendl; - } -} - -void Monitor::update_logger() -{ - cluster_logger->set(l_cluster_num_mon, monmap->size()); - cluster_logger->set(l_cluster_num_mon_quorum, quorum.size()); -} - -void Monitor::shutdown() -{ - dout(1) << "shutdown" << dendl; - - lock.Lock(); - - wait_for_paxos_write(); - - state = STATE_SHUTDOWN; - - g_conf->remove_observer(this); - - if (admin_hook) { - AdminSocket* admin_socket = cct->get_admin_socket(); - admin_socket->unregister_command("mon_status"); - admin_socket->unregister_command("quorum_status"); - admin_socket->unregister_command("sync_force"); - admin_socket->unregister_command("add_bootstrap_peer_hint"); - admin_socket->unregister_command("quorum enter"); - admin_socket->unregister_command("quorum exit"); - admin_socket->unregister_command("ops"); - admin_socket->unregister_command("sessions"); - delete admin_hook; - admin_hook = NULL; - } - - elector.shutdown(); - - mgr_client.shutdown(); - - lock.Unlock(); - finisher.wait_for_empty(); - finisher.stop(); - lock.Lock(); - - // clean up - paxos->shutdown(); - for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) - (*p)->shutdown(); - health_monitor->shutdown(); - - finish_contexts(g_ceph_context, waitfor_quorum, -ECANCELED); - finish_contexts(g_ceph_context, maybe_wait_for_quorum, -ECANCELED); - - timer.shutdown(); - - cpu_tp.stop(); - - remove_all_sessions(); - - if (logger) { - cct->get_perfcounters_collection()->remove(logger); - delete logger; - logger = NULL; - } - if (cluster_logger) { - if (cluster_logger_registered) - cct->get_perfcounters_collection()->remove(cluster_logger); - delete cluster_logger; - cluster_logger = NULL; - } - - log_client.shutdown(); - - // unlock before msgr shutdown... - lock.Unlock(); - - messenger->shutdown(); // last thing! ceph_mon.cc will delete mon. - mgr_messenger->shutdown(); -} - -void Monitor::wait_for_paxos_write() -{ - if (paxos->is_writing() || paxos->is_writing_previous()) { - dout(10) << __func__ << " flushing pending write" << dendl; - lock.Unlock(); - store->flush(); - lock.Lock(); - dout(10) << __func__ << " flushed pending write" << dendl; - } -} - -void Monitor::bootstrap() -{ - dout(10) << "bootstrap" << dendl; - wait_for_paxos_write(); - - sync_reset_requester(); - unregister_cluster_logger(); - cancel_probe_timeout(); - - // note my rank - int newrank = monmap->get_rank(messenger->get_myaddr()); - if (newrank < 0 && rank >= 0) { - // was i ever part of the quorum? - if (has_ever_joined) { - dout(0) << " removed from monmap, suicide." << dendl; - exit(0); - } - } - if (newrank != rank) { - dout(0) << " my rank is now " << newrank << " (was " << rank << ")" << dendl; - messenger->set_myname(entity_name_t::MON(newrank)); - rank = newrank; - - // reset all connections, or else our peers will think we are someone else. - messenger->mark_down_all(); - } - - // reset - state = STATE_PROBING; - - _reset(); - - // sync store - if (g_conf->mon_compact_on_bootstrap) { - dout(10) << "bootstrap -- triggering compaction" << dendl; - store->compact(); - dout(10) << "bootstrap -- finished compaction" << dendl; - } - - // singleton monitor? - if (monmap->size() == 1 && rank == 0) { - win_standalone_election(); - return; - } - - reset_probe_timeout(); - - // i'm outside the quorum - if (monmap->contains(name)) - outside_quorum.insert(name); - - // probe monitors - dout(10) << "probing other monitors" << dendl; - for (unsigned i = 0; i < monmap->size(); i++) { - if ((int)i != rank) - messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), - monmap->get_inst(i)); - } - for (set::iterator p = extra_probe_peers.begin(); - p != extra_probe_peers.end(); - ++p) { - if (*p != messenger->get_myaddr()) { - entity_inst_t i; - i.name = entity_name_t::MON(-1); - i.addr = *p; - messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i); - } - } -} - -bool Monitor::_add_bootstrap_peer_hint(string cmd, cmdmap_t& cmdmap, ostream& ss) -{ - string addrstr; - if (!cmd_getval(g_ceph_context, cmdmap, "addr", addrstr)) { - ss << "unable to parse address string value '" - << cmd_vartype_stringify(cmdmap["addr"]) << "'"; - return false; - } - dout(10) << "_add_bootstrap_peer_hint '" << cmd << "' '" - << addrstr << "'" << dendl; - - entity_addr_t addr; - const char *end = 0; - if (!addr.parse(addrstr.c_str(), &end)) { - ss << "failed to parse addr '" << addrstr << "'; syntax is 'add_bootstrap_peer_hint ip[:port]'"; - return false; - } - - if (is_leader() || is_peon()) { - ss << "mon already active; ignoring bootstrap hint"; - return true; - } - - if (addr.get_port() == 0) - addr.set_port(CEPH_MON_PORT); - - extra_probe_peers.insert(addr); - ss << "adding peer " << addr << " to list: " << extra_probe_peers; - return true; -} - -// called by bootstrap(), or on leader|peon -> electing -void Monitor::_reset() -{ - dout(10) << __func__ << dendl; - - cancel_probe_timeout(); - timecheck_finish(); - health_events_cleanup(); - health_check_log_times.clear(); - scrub_event_cancel(); - - leader_since = utime_t(); - if (!quorum.empty()) { - exited_quorum = ceph_clock_now(); - } - quorum.clear(); - outside_quorum.clear(); - quorum_feature_map.clear(); - - scrub_reset(); - - paxos->restart(); - - for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) - (*p)->restart(); - health_monitor->finish(); -} - - -// ----------------------------------------------------------- -// sync - -set Monitor::get_sync_targets_names() -{ - set targets; - targets.insert(paxos->get_name()); - for (int i = 0; i < PAXOS_NUM; ++i) - paxos_service[i]->get_store_prefixes(targets); - ConfigKeyService *config_key_service_ptr = dynamic_cast(config_key_service); - assert(config_key_service_ptr); - config_key_service_ptr->get_store_prefixes(targets); - return targets; -} - - -void Monitor::sync_timeout() -{ - dout(10) << __func__ << dendl; - assert(state == STATE_SYNCHRONIZING); - bootstrap(); -} - -void Monitor::sync_obtain_latest_monmap(bufferlist &bl) -{ - dout(1) << __func__ << dendl; - - MonMap latest_monmap; - - // Grab latest monmap from MonmapMonitor - bufferlist monmon_bl; - int err = monmon()->get_monmap(monmon_bl); - if (err < 0) { - if (err != -ENOENT) { - derr << __func__ - << " something wrong happened while reading the store: " - << cpp_strerror(err) << dendl; - assert(0 == "error reading the store"); - } - } else { - latest_monmap.decode(monmon_bl); - } - - // Grab last backed up monmap (if any) and compare epochs - if (store->exists("mon_sync", "latest_monmap")) { - bufferlist backup_bl; - int err = store->get("mon_sync", "latest_monmap", backup_bl); - if (err < 0) { - derr << __func__ - << " something wrong happened while reading the store: " - << cpp_strerror(err) << dendl; - assert(0 == "error reading the store"); - } - assert(backup_bl.length() > 0); - - MonMap backup_monmap; - backup_monmap.decode(backup_bl); - - if (backup_monmap.epoch > latest_monmap.epoch) - latest_monmap = backup_monmap; - } - - // Check if our current monmap's epoch is greater than the one we've - // got so far. - if (monmap->epoch > latest_monmap.epoch) - latest_monmap = *monmap; - - dout(1) << __func__ << " obtained monmap e" << latest_monmap.epoch << dendl; - - latest_monmap.encode(bl, CEPH_FEATURES_ALL); -} - -void Monitor::sync_reset_requester() -{ - dout(10) << __func__ << dendl; - - if (sync_timeout_event) { - timer.cancel_event(sync_timeout_event); - sync_timeout_event = NULL; - } - - sync_provider = entity_inst_t(); - sync_cookie = 0; - sync_full = false; - sync_start_version = 0; -} - -void Monitor::sync_reset_provider() -{ - dout(10) << __func__ << dendl; - sync_providers.clear(); -} - -void Monitor::sync_start(entity_inst_t &other, bool full) -{ - dout(10) << __func__ << " " << other << (full ? " full" : " recent") << dendl; - - assert(state == STATE_PROBING || - state == STATE_SYNCHRONIZING); - state = STATE_SYNCHRONIZING; - - // make sure are not a provider for anyone! - sync_reset_provider(); - - sync_full = full; - - if (sync_full) { - // stash key state, and mark that we are syncing - auto t(std::make_shared()); - sync_stash_critical_state(t); - t->put("mon_sync", "in_sync", 1); - - sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version()); - dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor " - << sync_last_committed_floor << dendl; - t->put("mon_sync", "last_committed_floor", sync_last_committed_floor); - - store->apply_transaction(t); - - assert(g_conf->mon_sync_requester_kill_at != 1); - - // clear the underlying store - set targets = get_sync_targets_names(); - dout(10) << __func__ << " clearing prefixes " << targets << dendl; - store->clear(targets); - - // make sure paxos knows it has been reset. this prevents a - // bootstrap and then different probe reply order from possibly - // deciding a partial or no sync is needed. - paxos->init(); - - assert(g_conf->mon_sync_requester_kill_at != 2); - } - - // assume 'other' as the leader. We will update the leader once we receive - // a reply to the sync start. - sync_provider = other; - - sync_reset_timeout(); - - MMonSync *m = new MMonSync(sync_full ? MMonSync::OP_GET_COOKIE_FULL : MMonSync::OP_GET_COOKIE_RECENT); - if (!sync_full) - m->last_committed = paxos->get_version(); - messenger->send_message(m, sync_provider); -} - -void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t) -{ - dout(10) << __func__ << dendl; - bufferlist backup_monmap; - sync_obtain_latest_monmap(backup_monmap); - assert(backup_monmap.length() > 0); - t->put("mon_sync", "latest_monmap", backup_monmap); -} - -void Monitor::sync_reset_timeout() -{ - dout(10) << __func__ << dendl; - if (sync_timeout_event) - timer.cancel_event(sync_timeout_event); - sync_timeout_event = timer.add_event_after( - g_conf->mon_sync_timeout, - new C_MonContext(this, [this](int) { - sync_timeout(); - })); -} - -void Monitor::sync_finish(version_t last_committed) -{ - dout(10) << __func__ << " lc " << last_committed << " from " << sync_provider << dendl; - - assert(g_conf->mon_sync_requester_kill_at != 7); - - if (sync_full) { - // finalize the paxos commits - auto tx(std::make_shared()); - paxos->read_and_prepare_transactions(tx, sync_start_version, - last_committed); - tx->put(paxos->get_name(), "last_committed", last_committed); - - dout(30) << __func__ << " final tx dump:\n"; - JSONFormatter f(true); - tx->dump(&f); - f.flush(*_dout); - *_dout << dendl; - - store->apply_transaction(tx); - } - - assert(g_conf->mon_sync_requester_kill_at != 8); - - auto t(std::make_shared()); - t->erase("mon_sync", "in_sync"); - t->erase("mon_sync", "force_sync"); - t->erase("mon_sync", "last_committed_floor"); - store->apply_transaction(t); - - assert(g_conf->mon_sync_requester_kill_at != 9); - - init_paxos(); - - assert(g_conf->mon_sync_requester_kill_at != 10); - - bootstrap(); -} - -void Monitor::handle_sync(MonOpRequestRef op) -{ - MMonSync *m = static_cast(op->get_req()); - dout(10) << __func__ << " " << *m << dendl; - switch (m->op) { - - // provider --------- - - case MMonSync::OP_GET_COOKIE_FULL: - case MMonSync::OP_GET_COOKIE_RECENT: - handle_sync_get_cookie(op); - break; - case MMonSync::OP_GET_CHUNK: - handle_sync_get_chunk(op); - break; - - // client ----------- - - case MMonSync::OP_COOKIE: - handle_sync_cookie(op); - break; - - case MMonSync::OP_CHUNK: - case MMonSync::OP_LAST_CHUNK: - handle_sync_chunk(op); - break; - case MMonSync::OP_NO_COOKIE: - handle_sync_no_cookie(op); - break; - - default: - dout(0) << __func__ << " unknown op " << m->op << dendl; - assert(0 == "unknown op"); - } -} - -// leader - -void Monitor::_sync_reply_no_cookie(MonOpRequestRef op) -{ - MMonSync *m = static_cast(op->get_req()); - MMonSync *reply = new MMonSync(MMonSync::OP_NO_COOKIE, m->cookie); - m->get_connection()->send_message(reply); -} - -void Monitor::handle_sync_get_cookie(MonOpRequestRef op) -{ - MMonSync *m = static_cast(op->get_req()); - if (is_synchronizing()) { - _sync_reply_no_cookie(op); - return; - } - - assert(g_conf->mon_sync_provider_kill_at != 1); - - // make sure they can understand us. - if ((required_features ^ m->get_connection()->get_features()) & - required_features) { - dout(5) << " ignoring peer mon." << m->get_source().num() - << " has features " << std::hex - << m->get_connection()->get_features() - << " but we require " << required_features << std::dec << dendl; - return; - } - - // make up a unique cookie. include election epoch (which persists - // across restarts for the whole cluster) and a counter for this - // process instance. there is no need to be unique *across* - // monitors, though. - uint64_t cookie = ((unsigned long long)elector.get_epoch() << 24) + ++sync_provider_count; - assert(sync_providers.count(cookie) == 0); - - dout(10) << __func__ << " cookie " << cookie << " for " << m->get_source_inst() << dendl; - - SyncProvider& sp = sync_providers[cookie]; - sp.cookie = cookie; - sp.entity = m->get_source_inst(); - sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2); - - set sync_targets; - if (m->op == MMonSync::OP_GET_COOKIE_FULL) { - // full scan - sync_targets = get_sync_targets_names(); - sp.last_committed = paxos->get_version(); - sp.synchronizer = store->get_synchronizer(sp.last_key, sync_targets); - sp.full = true; - dout(10) << __func__ << " will sync prefixes " << sync_targets << dendl; - } else { - // just catch up paxos - sp.last_committed = m->last_committed; - } - dout(10) << __func__ << " will sync from version " << sp.last_committed << dendl; - - MMonSync *reply = new MMonSync(MMonSync::OP_COOKIE, sp.cookie); - reply->last_committed = sp.last_committed; - m->get_connection()->send_message(reply); -} - -void Monitor::handle_sync_get_chunk(MonOpRequestRef op) -{ - MMonSync *m = static_cast(op->get_req()); - dout(10) << __func__ << " " << *m << dendl; - - if (sync_providers.count(m->cookie) == 0) { - dout(10) << __func__ << " no cookie " << m->cookie << dendl; - _sync_reply_no_cookie(op); - return; - } - - assert(g_conf->mon_sync_provider_kill_at != 2); - - SyncProvider& sp = sync_providers[m->cookie]; - sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2); - - if (sp.last_committed < paxos->get_first_committed() && - paxos->get_first_committed() > 1) { - dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed - << " < our fc " << paxos->get_first_committed() << dendl; - sync_providers.erase(m->cookie); - _sync_reply_no_cookie(op); - return; - } - - MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie); - auto tx(std::make_shared()); - - int left = g_conf->mon_sync_max_payload_size; - while (sp.last_committed < paxos->get_version() && left > 0) { - bufferlist bl; - sp.last_committed++; - - int err = store->get(paxos->get_name(), sp.last_committed, bl); - assert(err == 0); - - tx->put(paxos->get_name(), sp.last_committed, bl); - left -= bl.length(); - dout(20) << __func__ << " including paxos state " << sp.last_committed - << dendl; - } - reply->last_committed = sp.last_committed; - - if (sp.full && left > 0) { - sp.synchronizer->get_chunk_tx(tx, left); - sp.last_key = sp.synchronizer->get_last_key(); - reply->last_key = sp.last_key; - } - - if ((sp.full && sp.synchronizer->has_next_chunk()) || - sp.last_committed < paxos->get_version()) { - dout(10) << __func__ << " chunk, through version " << sp.last_committed - << " key " << sp.last_key << dendl; - } else { - dout(10) << __func__ << " last chunk, through version " << sp.last_committed - << " key " << sp.last_key << dendl; - reply->op = MMonSync::OP_LAST_CHUNK; - - assert(g_conf->mon_sync_provider_kill_at != 3); - - // clean up our local state - sync_providers.erase(sp.cookie); - } - - ::encode(*tx, reply->chunk_bl); - - m->get_connection()->send_message(reply); -} - -// requester - -void Monitor::handle_sync_cookie(MonOpRequestRef op) -{ - MMonSync *m = static_cast(op->get_req()); - dout(10) << __func__ << " " << *m << dendl; - if (sync_cookie) { - dout(10) << __func__ << " already have a cookie, ignoring" << dendl; - return; - } - if (m->get_source_inst() != sync_provider) { - dout(10) << __func__ << " source does not match, discarding" << dendl; - return; - } - sync_cookie = m->cookie; - sync_start_version = m->last_committed; - - sync_reset_timeout(); - sync_get_next_chunk(); - - assert(g_conf->mon_sync_requester_kill_at != 3); -} - -void Monitor::sync_get_next_chunk() -{ - dout(20) << __func__ << " cookie " << sync_cookie << " provider " << sync_provider << dendl; - if (g_conf->mon_inject_sync_get_chunk_delay > 0) { - dout(20) << __func__ << " injecting delay of " << g_conf->mon_inject_sync_get_chunk_delay << dendl; - usleep((long long)(g_conf->mon_inject_sync_get_chunk_delay * 1000000.0)); - } - MMonSync *r = new MMonSync(MMonSync::OP_GET_CHUNK, sync_cookie); - messenger->send_message(r, sync_provider); - - assert(g_conf->mon_sync_requester_kill_at != 4); -} - -void Monitor::handle_sync_chunk(MonOpRequestRef op) -{ - MMonSync *m = static_cast(op->get_req()); - dout(10) << __func__ << " " << *m << dendl; - - if (m->cookie != sync_cookie) { - dout(10) << __func__ << " cookie does not match, discarding" << dendl; - return; - } - if (m->get_source_inst() != sync_provider) { - dout(10) << __func__ << " source does not match, discarding" << dendl; - return; - } - - assert(state == STATE_SYNCHRONIZING); - assert(g_conf->mon_sync_requester_kill_at != 5); - - auto tx(std::make_shared()); - tx->append_from_encoded(m->chunk_bl); - - dout(30) << __func__ << " tx dump:\n"; - JSONFormatter f(true); - tx->dump(&f); - f.flush(*_dout); - *_dout << dendl; - - store->apply_transaction(tx); - - assert(g_conf->mon_sync_requester_kill_at != 6); - - if (!sync_full) { - dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl; - auto tx(std::make_shared()); - paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1, - m->last_committed); - tx->put(paxos->get_name(), "last_committed", m->last_committed); - - dout(30) << __func__ << " tx dump:\n"; - JSONFormatter f(true); - tx->dump(&f); - f.flush(*_dout); - *_dout << dendl; - - store->apply_transaction(tx); - paxos->init(); // to refresh what we just wrote - } - - if (m->op == MMonSync::OP_CHUNK) { - sync_reset_timeout(); - sync_get_next_chunk(); - } else if (m->op == MMonSync::OP_LAST_CHUNK) { - sync_finish(m->last_committed); - } -} - -void Monitor::handle_sync_no_cookie(MonOpRequestRef op) -{ - dout(10) << __func__ << dendl; - bootstrap(); -} - -void Monitor::sync_trim_providers() -{ - dout(20) << __func__ << dendl; - - utime_t now = ceph_clock_now(); - map::iterator p = sync_providers.begin(); - while (p != sync_providers.end()) { - if (now > p->second.timeout) { - dout(10) << __func__ << " expiring cookie " << p->second.cookie << " for " << p->second.entity << dendl; - sync_providers.erase(p++); - } else { - ++p; - } - } -} - -// --------------------------------------------------- -// probe - -void Monitor::cancel_probe_timeout() -{ - if (probe_timeout_event) { - dout(10) << "cancel_probe_timeout " << probe_timeout_event << dendl; - timer.cancel_event(probe_timeout_event); - probe_timeout_event = NULL; - } else { - dout(10) << "cancel_probe_timeout (none scheduled)" << dendl; - } -} - -void Monitor::reset_probe_timeout() -{ - cancel_probe_timeout(); - probe_timeout_event = new C_MonContext(this, [this](int r) { - probe_timeout(r); - }); - double t = g_conf->mon_probe_timeout; - if (timer.add_event_after(t, probe_timeout_event)) { - dout(10) << "reset_probe_timeout " << probe_timeout_event - << " after " << t << " seconds" << dendl; - } else { - probe_timeout_event = nullptr; - } -} - -void Monitor::probe_timeout(int r) -{ - dout(4) << "probe_timeout " << probe_timeout_event << dendl; - assert(is_probing() || is_synchronizing()); - assert(probe_timeout_event); - probe_timeout_event = NULL; - bootstrap(); -} - -void Monitor::handle_probe(MonOpRequestRef op) -{ - MMonProbe *m = static_cast(op->get_req()); - dout(10) << "handle_probe " << *m << dendl; - - if (m->fsid != monmap->fsid) { - dout(0) << "handle_probe ignoring fsid " << m->fsid << " != " << monmap->fsid << dendl; - return; - } - - switch (m->op) { - case MMonProbe::OP_PROBE: - handle_probe_probe(op); - break; - - case MMonProbe::OP_REPLY: - handle_probe_reply(op); - break; - - case MMonProbe::OP_MISSING_FEATURES: - derr << __func__ << " missing features, have " << CEPH_FEATURES_ALL - << ", required " << m->required_features - << ", missing " << (m->required_features & ~CEPH_FEATURES_ALL) - << dendl; - break; - } -} - -void Monitor::handle_probe_probe(MonOpRequestRef op) -{ - MMonProbe *m = static_cast(op->get_req()); - - dout(10) << "handle_probe_probe " << m->get_source_inst() << *m - << " features " << m->get_connection()->get_features() << dendl; - uint64_t missing = required_features & ~m->get_connection()->get_features(); - if (missing) { - dout(1) << " peer " << m->get_source_addr() << " missing features " - << missing << dendl; - if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_PRIMARY_AFFINITY)) { - MMonProbe *r = new MMonProbe(monmap->fsid, MMonProbe::OP_MISSING_FEATURES, - name, has_ever_joined); - m->required_features = required_features; - m->get_connection()->send_message(r); - } - goto out; - } - - if (!is_probing() && !is_synchronizing()) { - // If the probing mon is way ahead of us, we need to re-bootstrap. - // Normally we capture this case when we initially bootstrap, but - // it is possible we pass those checks (we overlap with - // quorum-to-be) but fail to join a quorum before it moves past - // us. We need to be kicked back to bootstrap so we can - // synchonize, not keep calling elections. - if (paxos->get_version() + 1 < m->paxos_first_version) { - dout(1) << " peer " << m->get_source_addr() << " has first_committed " - << "ahead of us, re-bootstrapping" << dendl; - bootstrap(); - goto out; - - } - } - - MMonProbe *r; - r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined); - r->name = name; - r->quorum = quorum; - monmap->encode(r->monmap_bl, m->get_connection()->get_features()); - r->paxos_first_version = paxos->get_first_committed(); - r->paxos_last_version = paxos->get_version(); - m->get_connection()->send_message(r); - - // did we discover a peer here? - if (!monmap->contains(m->get_source_addr())) { - dout(1) << " adding peer " << m->get_source_addr() - << " to list of hints" << dendl; - extra_probe_peers.insert(m->get_source_addr()); - } - - out: - return; -} - -void Monitor::handle_probe_reply(MonOpRequestRef op) -{ - MMonProbe *m = static_cast(op->get_req()); - dout(10) << "handle_probe_reply " << m->get_source_inst() << *m << dendl; - dout(10) << " monmap is " << *monmap << dendl; - - // discover name and addrs during probing or electing states. - if (!is_probing() && !is_electing()) { - return; - } - - // newer map, or they've joined a quorum and we haven't? - bufferlist mybl; - monmap->encode(mybl, m->get_connection()->get_features()); - // make sure it's actually different; the checks below err toward - // taking the other guy's map, which could cause us to loop. - if (!mybl.contents_equal(m->monmap_bl)) { - MonMap *newmap = new MonMap; - newmap->decode(m->monmap_bl); - if (m->has_ever_joined && (newmap->get_epoch() > monmap->get_epoch() || - !has_ever_joined)) { - dout(10) << " got newer/committed monmap epoch " << newmap->get_epoch() - << ", mine was " << monmap->get_epoch() << dendl; - delete newmap; - monmap->decode(m->monmap_bl); - - bootstrap(); - return; - } - delete newmap; - } - - // rename peer? - string peer_name = monmap->get_name(m->get_source_addr()); - if (monmap->get_epoch() == 0 && peer_name.compare(0, 7, "noname-") == 0) { - dout(10) << " renaming peer " << m->get_source_addr() << " " - << peer_name << " -> " << m->name << " in my monmap" - << dendl; - monmap->rename(peer_name, m->name); - - if (is_electing()) { - bootstrap(); - return; - } - } else { - dout(10) << " peer name is " << peer_name << dendl; - } - - // new initial peer? - if (monmap->get_epoch() == 0 && - monmap->contains(m->name) && - monmap->get_addr(m->name).is_blank_ip()) { - dout(1) << " learned initial mon " << m->name << " addr " << m->get_source_addr() << dendl; - monmap->set_addr(m->name, m->get_source_addr()); - - bootstrap(); - return; - } - - // end discover phase - if (!is_probing()) { - return; - } - - assert(paxos != NULL); - - if (is_synchronizing()) { - dout(10) << " currently syncing" << dendl; - return; - } - - entity_inst_t other = m->get_source_inst(); - - if (m->paxos_last_version < sync_last_committed_floor) { - dout(10) << " peer paxos versions [" << m->paxos_first_version - << "," << m->paxos_last_version << "] < my sync_last_committed_floor " - << sync_last_committed_floor << ", ignoring" - << dendl; - } else { - if (paxos->get_version() < m->paxos_first_version && - m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1. - dout(10) << " peer paxos first versions [" << m->paxos_first_version - << "," << m->paxos_last_version << "]" - << " vs my version " << paxos->get_version() - << " (too far ahead)" - << dendl; - cancel_probe_timeout(); - sync_start(other, true); - return; - } - if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) { - dout(10) << " peer paxos last version " << m->paxos_last_version - << " vs my version " << paxos->get_version() - << " (too far ahead)" - << dendl; - cancel_probe_timeout(); - sync_start(other, false); - return; - } - } - - // is there an existing quorum? - if (m->quorum.size()) { - dout(10) << " existing quorum " << m->quorum << dendl; - - dout(10) << " peer paxos version " << m->paxos_last_version - << " vs my version " << paxos->get_version() - << " (ok)" - << dendl; - - if (monmap->contains(name) && - !monmap->get_addr(name).is_blank_ip()) { - // i'm part of the cluster; just initiate a new election - start_election(); - } else { - dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl; - messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()), - monmap->get_inst(*m->quorum.begin())); - } - } else { - if (monmap->contains(m->name)) { - dout(10) << " mon." << m->name << " is outside the quorum" << dendl; - outside_quorum.insert(m->name); - } else { - dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl; - return; - } - - unsigned need = monmap->size() / 2 + 1; - dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl; - if (outside_quorum.size() >= need) { - if (outside_quorum.count(name)) { - dout(10) << " that's enough to form a new quorum, calling election" << dendl; - start_election(); - } else { - dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl; - } - } else { - dout(10) << " that's not yet enough for a new quorum, waiting" << dendl; - } - } -} - -void Monitor::join_election() -{ - dout(10) << __func__ << dendl; - wait_for_paxos_write(); - _reset(); - state = STATE_ELECTING; - - logger->inc(l_mon_num_elections); -} - -void Monitor::start_election() -{ - dout(10) << "start_election" << dendl; - wait_for_paxos_write(); - _reset(); - state = STATE_ELECTING; - - logger->inc(l_mon_num_elections); - logger->inc(l_mon_election_call); - - clog->info() << "mon." << name << " calling new monitor election"; - elector.call_election(); -} - -void Monitor::win_standalone_election() -{ - dout(1) << "win_standalone_election" << dendl; - - // bump election epoch, in case the previous epoch included other - // monitors; we need to be able to make the distinction. - elector.init(); - elector.advance_epoch(); - - rank = monmap->get_rank(name); - assert(rank == 0); - set q; - q.insert(rank); - - map metadata; - collect_metadata(&metadata[0]); - - win_election(elector.get_epoch(), q, - CEPH_FEATURES_ALL, - ceph::features::mon::get_supported(), - metadata); -} - -const utime_t& Monitor::get_leader_since() const -{ - assert(state == STATE_LEADER); - return leader_since; -} - -epoch_t Monitor::get_epoch() -{ - return elector.get_epoch(); -} - -void Monitor::_finish_svc_election() -{ - assert(state == STATE_LEADER || state == STATE_PEON); - - for (auto p : paxos_service) { - // we already called election_finished() on monmon(); avoid callig twice - if (state == STATE_LEADER && p == monmon()) - continue; - p->election_finished(); - } -} - -void Monitor::win_election(epoch_t epoch, set& active, uint64_t features, - const mon_feature_t& mon_features, - const map& metadata) -{ - dout(10) << __func__ << " epoch " << epoch << " quorum " << active - << " features " << features - << " mon_features " << mon_features - << dendl; - assert(is_electing()); - state = STATE_LEADER; - leader_since = ceph_clock_now(); - leader = rank; - quorum = active; - quorum_con_features = features; - quorum_mon_features = mon_features; - pending_metadata = metadata; - outside_quorum.clear(); - - clog->info() << "mon." << name << "@" << rank - << " won leader election with quorum " << quorum; - - set_leader_commands(get_local_commands(mon_features)); - - paxos->leader_init(); - // NOTE: tell monmap monitor first. This is important for the - // bootstrap case to ensure that the very first paxos proposal - // codifies the monmap. Otherwise any manner of chaos can ensue - // when monitors are call elections or participating in a paxos - // round without agreeing on who the participants are. - monmon()->election_finished(); - _finish_svc_election(); - health_monitor->start(epoch); - - logger->inc(l_mon_election_win); - - // inject new metadata in first transaction. - { - // include previous metadata for missing mons (that aren't part of - // the current quorum). - map m = metadata; - for (unsigned rank = 0; rank < monmap->size(); ++rank) { - if (m.count(rank) == 0 && - mon_metadata.count(rank)) { - m[rank] = mon_metadata[rank]; - } - } - - // FIXME: This is a bit sloppy because we aren't guaranteed to submit - // a new transaction immediately after the election finishes. We should - // do that anyway for other reasons, though. - MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); - bufferlist bl; - ::encode(m, bl); - t->put(MONITOR_STORE_PREFIX, "last_metadata", bl); - } - - finish_election(); - if (monmap->size() > 1 && - monmap->get_epoch() > 0) { - timecheck_start(); - health_tick_start(); - do_health_to_clog_interval(); - scrub_event_start(); - } -} - -void Monitor::lose_election(epoch_t epoch, set &q, int l, - uint64_t features, - const mon_feature_t& mon_features) -{ - state = STATE_PEON; - leader_since = utime_t(); - leader = l; - quorum = q; - outside_quorum.clear(); - quorum_con_features = features; - quorum_mon_features = mon_features; - dout(10) << "lose_election, epoch " << epoch << " leader is mon" << leader - << " quorum is " << quorum << " features are " << quorum_con_features - << " mon_features are " << quorum_mon_features - << dendl; - - paxos->peon_init(); - _finish_svc_election(); - health_monitor->start(epoch); - - logger->inc(l_mon_election_lose); - - finish_election(); - - if ((quorum_con_features & CEPH_FEATURE_MON_METADATA) && - !HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)) { - // for pre-luminous mons only - Metadata sys_info; - collect_metadata(&sys_info); - messenger->send_message(new MMonMetadata(sys_info), - monmap->get_inst(get_leader())); - } -} - -void Monitor::collect_metadata(Metadata *m) -{ - collect_sys_info(m, g_ceph_context); - (*m)["addr"] = stringify(messenger->get_myaddr()); -} - -void Monitor::finish_election() -{ - apply_quorum_to_compatset_features(); - apply_monmap_to_compatset_features(); - timecheck_finish(); - exited_quorum = utime_t(); - finish_contexts(g_ceph_context, waitfor_quorum); - finish_contexts(g_ceph_context, maybe_wait_for_quorum); - resend_routed_requests(); - update_logger(); - register_cluster_logger(); - - // am i named properly? - string cur_name = monmap->get_name(messenger->get_myaddr()); - if (cur_name != name) { - dout(10) << " renaming myself from " << cur_name << " -> " << name << dendl; - messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()), - monmap->get_inst(*quorum.begin())); - } -} - -void Monitor::_apply_compatset_features(CompatSet &new_features) -{ - if (new_features.compare(features) != 0) { - CompatSet diff = features.unsupported(new_features); - dout(1) << __func__ << " enabling new quorum features: " << diff << dendl; - features = new_features; - - auto t = std::make_shared(); - write_features(t); - store->apply_transaction(t); - - calc_quorum_requirements(); - } -} - -void Monitor::apply_quorum_to_compatset_features() -{ - CompatSet new_features(features); - if (quorum_con_features & CEPH_FEATURE_OSD_ERASURE_CODES) { - new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES); - } - if (quorum_con_features & CEPH_FEATURE_OSDMAP_ENC) { - new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC); - } - if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2) { - new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2); - } - if (quorum_con_features & CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3) { - new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3); - } - dout(5) << __func__ << dendl; - _apply_compatset_features(new_features); -} - -void Monitor::apply_monmap_to_compatset_features() -{ - CompatSet new_features(features); - mon_feature_t monmap_features = monmap->get_required_features(); - - /* persistent monmap features may go into the compatset. - * optional monmap features may not - why? - * because optional monmap features may be set/unset by the admin, - * and possibly by other means that haven't yet been thought out, - * so we can't make the monitor enforce them on start - because they - * may go away. - * this, of course, does not invalidate setting a compatset feature - * for an optional feature - as long as you make sure to clean it up - * once you unset it. - */ - if (monmap_features.contains_all(ceph::features::mon::FEATURE_KRAKEN)) { - assert(ceph::features::mon::get_persistent().contains_all( - ceph::features::mon::FEATURE_KRAKEN)); - // this feature should only ever be set if the quorum supports it. - assert(HAVE_FEATURE(quorum_con_features, SERVER_KRAKEN)); - new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_KRAKEN); - } - if (monmap_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) { - assert(ceph::features::mon::get_persistent().contains_all( - ceph::features::mon::FEATURE_LUMINOUS)); - // this feature should only ever be set if the quorum supports it. - assert(HAVE_FEATURE(quorum_con_features, SERVER_LUMINOUS)); - new_features.incompat.insert(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS); - } - - dout(5) << __func__ << dendl; - _apply_compatset_features(new_features); -} - -void Monitor::calc_quorum_requirements() -{ - required_features = 0; - - // compatset - if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSD_ERASURE_CODES)) { - required_features |= CEPH_FEATURE_OSD_ERASURE_CODES; - } - if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_OSDMAP_ENC)) { - required_features |= CEPH_FEATURE_OSDMAP_ENC; - } - if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V2)) { - required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2; - } - if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_ERASURE_CODE_PLUGINS_V3)) { - required_features |= CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3; - } - if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_KRAKEN)) { - required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN; - } - if (features.incompat.contains(CEPH_MON_FEATURE_INCOMPAT_LUMINOUS)) { - required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS; - } - - // monmap - if (monmap->get_required_features().contains_all( - ceph::features::mon::FEATURE_KRAKEN)) { - required_features |= CEPH_FEATUREMASK_SERVER_KRAKEN; - } - if (monmap->get_required_features().contains_all( - ceph::features::mon::FEATURE_LUMINOUS)) { - required_features |= CEPH_FEATUREMASK_SERVER_LUMINOUS; - } - dout(10) << __func__ << " required_features " << required_features << dendl; -} - -void Monitor::get_combined_feature_map(FeatureMap *fm) -{ - *fm += session_map.feature_map; - for (auto id : quorum) { - if (id != rank) { - *fm += quorum_feature_map[id]; - } - } -} - -void Monitor::sync_force(Formatter *f, ostream& ss) -{ - bool free_formatter = false; - - if (!f) { - // louzy/lazy hack: default to json if no formatter has been defined - f = new JSONFormatter(); - free_formatter = true; - } - - auto tx(std::make_shared()); - sync_stash_critical_state(tx); - tx->put("mon_sync", "force_sync", 1); - store->apply_transaction(tx); - - f->open_object_section("sync_force"); - f->dump_int("ret", 0); - f->dump_stream("msg") << "forcing store sync the next time the monitor starts"; - f->close_section(); // sync_force - f->flush(ss); - if (free_formatter) - delete f; -} - -void Monitor::_quorum_status(Formatter *f, ostream& ss) -{ - bool free_formatter = false; - - if (!f) { - // louzy/lazy hack: default to json if no formatter has been defined - f = new JSONFormatter(); - free_formatter = true; - } - f->open_object_section("quorum_status"); - f->dump_int("election_epoch", get_epoch()); - - f->open_array_section("quorum"); - for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) - f->dump_int("mon", *p); - f->close_section(); // quorum - - list quorum_names = get_quorum_names(); - f->open_array_section("quorum_names"); - for (list::iterator p = quorum_names.begin(); p != quorum_names.end(); ++p) - f->dump_string("mon", *p); - f->close_section(); // quorum_names - - f->dump_string("quorum_leader_name", quorum.empty() ? string() : monmap->get_name(*quorum.begin())); - - f->open_object_section("monmap"); - monmap->dump(f); - f->close_section(); // monmap - - f->close_section(); // quorum_status - f->flush(ss); - if (free_formatter) - delete f; -} - -void Monitor::get_mon_status(Formatter *f, ostream& ss) -{ - bool free_formatter = false; - - if (!f) { - // louzy/lazy hack: default to json if no formatter has been defined - f = new JSONFormatter(); - free_formatter = true; - } - - f->open_object_section("mon_status"); - f->dump_string("name", name); - f->dump_int("rank", rank); - f->dump_string("state", get_state_name()); - f->dump_int("election_epoch", get_epoch()); - - f->open_array_section("quorum"); - for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) { - f->dump_int("mon", *p); - } - - f->close_section(); // quorum - - f->open_object_section("features"); - f->dump_stream("required_con") << required_features; - mon_feature_t req_mon_features = get_required_mon_features(); - req_mon_features.dump(f, "required_mon"); - f->dump_stream("quorum_con") << quorum_con_features; - quorum_mon_features.dump(f, "quorum_mon"); - f->close_section(); // features - - f->open_array_section("outside_quorum"); - for (set::iterator p = outside_quorum.begin(); p != outside_quorum.end(); ++p) - f->dump_string("mon", *p); - f->close_section(); // outside_quorum - - f->open_array_section("extra_probe_peers"); - for (set::iterator p = extra_probe_peers.begin(); - p != extra_probe_peers.end(); - ++p) - f->dump_stream("peer") << *p; - f->close_section(); // extra_probe_peers - - f->open_array_section("sync_provider"); - for (map::const_iterator p = sync_providers.begin(); - p != sync_providers.end(); - ++p) { - f->dump_unsigned("cookie", p->second.cookie); - f->dump_stream("entity") << p->second.entity; - f->dump_stream("timeout") << p->second.timeout; - f->dump_unsigned("last_committed", p->second.last_committed); - f->dump_stream("last_key") << p->second.last_key; - } - f->close_section(); - - if (is_synchronizing()) { - f->open_object_section("sync"); - f->dump_stream("sync_provider") << sync_provider; - f->dump_unsigned("sync_cookie", sync_cookie); - f->dump_unsigned("sync_start_version", sync_start_version); - f->close_section(); - } - - if (g_conf->mon_sync_provider_kill_at > 0) - f->dump_int("provider_kill_at", g_conf->mon_sync_provider_kill_at); - if (g_conf->mon_sync_requester_kill_at > 0) - f->dump_int("requester_kill_at", g_conf->mon_sync_requester_kill_at); - - f->open_object_section("monmap"); - monmap->dump(f); - f->close_section(); - - f->dump_object("feature_map", session_map.feature_map); - f->close_section(); // mon_status - - if (free_formatter) { - // flush formatter to ss and delete it iff we created the formatter - f->flush(ss); - delete f; - } -} - - -// health status to clog - -void Monitor::health_tick_start() -{ - if (!cct->_conf->mon_health_to_clog || - cct->_conf->mon_health_to_clog_tick_interval <= 0) - return; - - dout(15) << __func__ << dendl; - - health_tick_stop(); - health_tick_event = timer.add_event_after( - cct->_conf->mon_health_to_clog_tick_interval, - new C_MonContext(this, [this](int r) { - if (r < 0) - return; - do_health_to_clog(); - health_tick_start(); - })); -} - -void Monitor::health_tick_stop() -{ - dout(15) << __func__ << dendl; - - if (health_tick_event) { - timer.cancel_event(health_tick_event); - health_tick_event = NULL; - } -} - -utime_t Monitor::health_interval_calc_next_update() -{ - utime_t now = ceph_clock_now(); - - time_t secs = now.sec(); - int remainder = secs % cct->_conf->mon_health_to_clog_interval; - int adjustment = cct->_conf->mon_health_to_clog_interval - remainder; - utime_t next = utime_t(secs + adjustment, 0); - - dout(20) << __func__ - << " now: " << now << "," - << " next: " << next << "," - << " interval: " << cct->_conf->mon_health_to_clog_interval - << dendl; - - return next; -} - -void Monitor::health_interval_start() -{ - dout(15) << __func__ << dendl; - - if (!cct->_conf->mon_health_to_clog || - cct->_conf->mon_health_to_clog_interval <= 0) { - return; - } - - health_interval_stop(); - utime_t next = health_interval_calc_next_update(); - health_interval_event = new C_MonContext(this, [this](int r) { - if (r < 0) - return; - do_health_to_clog_interval(); - }); - if (!timer.add_event_at(next, health_interval_event)) { - health_interval_event = nullptr; - } -} - -void Monitor::health_interval_stop() -{ - dout(15) << __func__ << dendl; - if (health_interval_event) { - timer.cancel_event(health_interval_event); - } - health_interval_event = NULL; -} - -void Monitor::health_events_cleanup() -{ - health_tick_stop(); - health_interval_stop(); - health_status_cache.reset(); -} - -void Monitor::health_to_clog_update_conf(const std::set &changed) -{ - dout(20) << __func__ << dendl; - - if (changed.count("mon_health_to_clog")) { - if (!cct->_conf->mon_health_to_clog) { - health_events_cleanup(); - } else { - if (!health_tick_event) { - health_tick_start(); - } - if (!health_interval_event) { - health_interval_start(); - } - } - } - - if (changed.count("mon_health_to_clog_interval")) { - if (cct->_conf->mon_health_to_clog_interval <= 0) { - health_interval_stop(); - } else { - health_interval_start(); - } - } - - if (changed.count("mon_health_to_clog_tick_interval")) { - if (cct->_conf->mon_health_to_clog_tick_interval <= 0) { - health_tick_stop(); - } else { - health_tick_start(); - } - } -} - -void Monitor::do_health_to_clog_interval() -{ - // outputting to clog may have been disabled in the conf - // since we were scheduled. - if (!cct->_conf->mon_health_to_clog || - cct->_conf->mon_health_to_clog_interval <= 0) - return; - - dout(10) << __func__ << dendl; - - // do we have a cached value for next_clog_update? if not, - // do we know when the last update was? - - do_health_to_clog(true); - health_interval_start(); -} - -void Monitor::do_health_to_clog(bool force) -{ - // outputting to clog may have been disabled in the conf - // since we were scheduled. - if (!cct->_conf->mon_health_to_clog || - cct->_conf->mon_health_to_clog_interval <= 0) - return; - - dout(10) << __func__ << (force ? " (force)" : "") << dendl; - - if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { - string summary; - health_status_t level = get_health_status(false, nullptr, &summary); - if (!force && - summary == health_status_cache.summary && - level == health_status_cache.overall) - return; - clog->health(level) << "overall " << summary; - health_status_cache.summary = summary; - health_status_cache.overall = level; - } else { - // for jewel only - list status; - health_status_t overall = get_health(status, NULL, NULL); - dout(25) << __func__ - << (force ? " (force)" : "") - << dendl; - - string summary = joinify(status.begin(), status.end(), string("; ")); - - if (!force && - overall == health_status_cache.overall && - !health_status_cache.summary.empty() && - health_status_cache.summary == summary) { - // we got a dup! - return; - } - - clog->info() << summary; - - health_status_cache.overall = overall; - health_status_cache.summary = summary; - } -} - -health_status_t Monitor::get_health_status( - bool want_detail, - Formatter *f, - std::string *plain, - const char *sep1, - const char *sep2) -{ - health_status_t r = HEALTH_OK; - bool compat = g_conf->mon_health_preluminous_compat; - bool compat_warn = g_conf->get_val("mon_health_preluminous_compat_warning"); - if (f) { - f->open_object_section("health"); - f->open_object_section("checks"); - } - - string summary; - string *psummary = f ? nullptr : &summary; - for (auto& svc : paxos_service) { - r = std::min(r, svc->get_health_checks().dump_summary( - f, psummary, sep2, want_detail)); - } - - if (f) { - f->close_section(); - f->dump_stream("status") << r; - } else { - // one-liner: HEALTH_FOO[ thing1[; thing2 ...]] - *plain = stringify(r); - if (summary.size()) { - *plain += sep1; - *plain += summary; - } - *plain += "\n"; - } - - const std::string old_fields_message = "'ceph health' JSON format has " - "changed in luminous. If you see this your monitoring system is " - "scraping the wrong fields. Disable this with 'mon health preluminous " - "compat warning = false'"; - - if (f && (compat || compat_warn)) { - health_status_t cr = compat_warn ? min(HEALTH_WARN, r) : r; - f->open_array_section("summary"); - if (compat_warn) { - f->open_object_section("item"); - f->dump_stream("severity") << HEALTH_WARN; - f->dump_string("summary", old_fields_message); - f->close_section(); - } - if (compat) { - for (auto& svc : paxos_service) { - svc->get_health_checks().dump_summary_compat(f); - } - } - f->close_section(); - f->dump_stream("overall_status") << cr; - } - - if (want_detail) { - if (f && (compat || compat_warn)) { - f->open_array_section("detail"); - if (compat_warn) { - f->dump_string("item", old_fields_message); - } - } - - for (auto& svc : paxos_service) { - svc->get_health_checks().dump_detail(f, plain, compat); - } - - if (f && (compat || compat_warn)) { - f->close_section(); - } - } - if (f) { - f->close_section(); - } - return r; -} - -void Monitor::log_health( - const health_check_map_t& updated, - const health_check_map_t& previous, - MonitorDBStore::TransactionRef t) -{ - if (!g_conf->mon_health_to_clog) { - return; - } - - const utime_t now = ceph_clock_now(); - - // FIXME: log atomically as part of @t instead of using clog. - dout(10) << __func__ << " updated " << updated.checks.size() - << " previous " << previous.checks.size() - << dendl; - const auto min_log_period = g_conf->get_val( - "mon_health_log_update_period"); - for (auto& p : updated.checks) { - auto q = previous.checks.find(p.first); - bool logged = false; - if (q == previous.checks.end()) { - // new - ostringstream ss; - ss << "Health check failed: " << p.second.summary << " (" - << p.first << ")"; - clog->health(p.second.severity) << ss.str(); - - logged = true; - } else { - if (p.second.summary != q->second.summary || - p.second.severity != q->second.severity) { - - auto status_iter = health_check_log_times.find(p.first); - if (status_iter != health_check_log_times.end()) { - if (p.second.severity == q->second.severity && - now - status_iter->second.updated_at < min_log_period) { - // We already logged this recently and the severity is unchanged, - // so skip emitting an update of the summary string. - // We'll get an update out of tick() later if the check - // is still failing. - continue; - } - } - - // summary or severity changed (ignore detail changes at this level) - ostringstream ss; - ss << "Health check update: " << p.second.summary << " (" << p.first << ")"; - clog->health(p.second.severity) << ss.str(); - - logged = true; - } - } - // Record the time at which we last logged, so that we can check this - // when considering whether/when to print update messages. - if (logged) { - auto iter = health_check_log_times.find(p.first); - if (iter == health_check_log_times.end()) { - health_check_log_times.emplace(p.first, HealthCheckLogStatus( - p.second.severity, p.second.summary, now)); - } else { - iter->second = HealthCheckLogStatus( - p.second.severity, p.second.summary, now); - } - } - } - for (auto& p : previous.checks) { - if (!updated.checks.count(p.first)) { - // cleared - ostringstream ss; - if (p.first == "DEGRADED_OBJECTS") { - clog->info() << "All degraded objects recovered"; - } else if (p.first == "OSD_FLAGS") { - clog->info() << "OSD flags cleared"; - } else { - clog->info() << "Health check cleared: " << p.first << " (was: " - << p.second.summary << ")"; - } - - if (health_check_log_times.count(p.first)) { - health_check_log_times.erase(p.first); - } - } - } - - if (previous.checks.size() && updated.checks.size() == 0) { - // We might be going into a fully healthy state, check - // other subsystems - bool any_checks = false; - for (auto& svc : paxos_service) { - if (&(svc->get_health_checks()) == &(previous)) { - // Ignore the ones we're clearing right now - continue; - } - - if (svc->get_health_checks().checks.size() > 0) { - any_checks = true; - break; - } - } - if (!any_checks) { - clog->info() << "Cluster is now healthy"; - } - } -} - -health_status_t Monitor::get_health(list& status, - bufferlist *detailbl, - Formatter *f) -{ - list > summary; - list > detail; - - if (f) - f->open_object_section("health"); - - for (vector::iterator p = paxos_service.begin(); - p != paxos_service.end(); - ++p) { - PaxosService *s = *p; - s->get_health(summary, detailbl ? &detail : NULL, cct); - } - - health_monitor->get_health(summary, (detailbl ? &detail : NULL)); - - health_status_t overall = HEALTH_OK; - if (!timecheck_skews.empty()) { - list warns; - for (map::iterator i = timecheck_skews.begin(); - i != timecheck_skews.end(); ++i) { - entity_inst_t inst = i->first; - double skew = i->second; - double latency = timecheck_latencies[inst]; - string name = monmap->get_name(inst.addr); - ostringstream tcss; - health_status_t tcstatus = timecheck_status(tcss, skew, latency); - if (tcstatus != HEALTH_OK) { - if (overall > tcstatus) - overall = tcstatus; - warns.push_back(name); - ostringstream tmp_ss; - tmp_ss << "mon." << name - << " addr " << inst.addr << " " << tcss.str() - << " (latency " << latency << "s)"; - detail.push_back(make_pair(tcstatus, tmp_ss.str())); - } - } - if (!warns.empty()) { - ostringstream ss; - ss << "clock skew detected on"; - while (!warns.empty()) { - ss << " mon." << warns.front(); - warns.pop_front(); - if (!warns.empty()) - ss << ","; - } - status.push_back(ss.str()); - summary.push_back(make_pair(HEALTH_WARN, "Monitor clock skew detected ")); - } - } - - if (f) - f->open_array_section("summary"); - if (!summary.empty()) { - while (!summary.empty()) { - if (overall > summary.front().first) - overall = summary.front().first; - status.push_back(summary.front().second); - if (f) { - f->open_object_section("item"); - f->dump_stream("severity") << summary.front().first; - f->dump_string("summary", summary.front().second); - f->close_section(); - } - summary.pop_front(); - } - } - if (f) - f->close_section(); - - stringstream fss; - fss << overall; - status.push_front(fss.str()); - if (f) - f->dump_stream("overall_status") << overall; - - if (f) - f->open_array_section("detail"); - while (!detail.empty()) { - if (f) - f->dump_string("item", detail.front().second); - else if (detailbl != NULL) { - detailbl->append(detail.front().second); - detailbl->append('\n'); - } - detail.pop_front(); - } - if (f) - f->close_section(); - - if (f) - f->close_section(); - - return overall; -} - -void Monitor::get_cluster_status(stringstream &ss, Formatter *f) -{ - if (f) - f->open_object_section("status"); - - if (f) { - f->dump_stream("fsid") << monmap->get_fsid(); - if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { - get_health_status(false, f, nullptr); - } else { - list health_str; - get_health(health_str, nullptr, f); - } - f->dump_unsigned("election_epoch", get_epoch()); - { - f->open_array_section("quorum"); - for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) - f->dump_int("rank", *p); - f->close_section(); - f->open_array_section("quorum_names"); - for (set::iterator p = quorum.begin(); p != quorum.end(); ++p) - f->dump_string("id", monmap->get_name(*p)); - f->close_section(); - } - f->open_object_section("monmap"); - monmap->dump(f); - f->close_section(); - f->open_object_section("osdmap"); - osdmon()->osdmap.print_summary(f, cout, string(12, ' ')); - f->close_section(); - f->open_object_section("pgmap"); - pgservice->print_summary(f, NULL); - f->close_section(); - f->open_object_section("fsmap"); - mdsmon()->get_fsmap().print_summary(f, NULL); - f->close_section(); - f->open_object_section("mgrmap"); - mgrmon()->get_map().print_summary(f, nullptr); - f->close_section(); - - f->dump_object("servicemap", mgrstatmon()->get_service_map()); - f->close_section(); - } else { - ss << " cluster:\n"; - ss << " id: " << monmap->get_fsid() << "\n"; - - string health; - if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { - get_health_status(false, nullptr, &health, - "\n ", "\n "); - } else { - list ls; - get_health(ls, NULL, f); - health = joinify(ls.begin(), ls.end(), - string("\n ")); - } - ss << " health: " << health << "\n"; - - ss << "\n \n services:\n"; - { - size_t maxlen = 3; - auto& service_map = mgrstatmon()->get_service_map(); - for (auto& p : service_map.services) { - maxlen = std::max(maxlen, p.first.size()); - } - string spacing(maxlen - 3, ' '); - const auto quorum_names = get_quorum_names(); - const auto mon_count = monmap->mon_info.size(); - ss << " mon: " << spacing << mon_count << " daemons, quorum " - << quorum_names; - if (quorum_names.size() != mon_count) { - std::list out_of_q; - for (size_t i = 0; i < monmap->ranks.size(); ++i) { - if (quorum.count(i) == 0) { - out_of_q.push_back(monmap->ranks[i]); - } - } - ss << ", out of quorum: " << joinify(out_of_q.begin(), - out_of_q.end(), std::string(", ")); - } - ss << "\n"; - if (mgrmon()->in_use()) { - ss << " mgr: " << spacing; - mgrmon()->get_map().print_summary(nullptr, &ss); - ss << "\n"; - } - if (mdsmon()->get_fsmap().filesystem_count() > 0) { - ss << " mds: " << spacing << mdsmon()->get_fsmap() << "\n"; - } - ss << " osd: " << spacing; - osdmon()->osdmap.print_summary(NULL, ss, string(maxlen + 6, ' ')); - ss << "\n"; - for (auto& p : service_map.services) { - ss << " " << p.first << ": " << string(maxlen - p.first.size(), ' ') - << p.second.get_summary() << "\n"; - } - } - - ss << "\n \n data:\n"; - pgservice->print_summary(NULL, &ss); - ss << "\n "; - } -} - -void Monitor::_generate_command_map(map& cmdmap, - map ¶m_str_map) -{ - for (map::const_iterator p = cmdmap.begin(); - p != cmdmap.end(); ++p) { - if (p->first == "prefix") - continue; - if (p->first == "caps") { - vector cv; - if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) && - cv.size() % 2 == 0) { - for (unsigned i = 0; i < cv.size(); i += 2) { - string k = string("caps_") + cv[i]; - param_str_map[k] = cv[i + 1]; - } - continue; - } - } - param_str_map[p->first] = cmd_vartype_stringify(p->second); - } -} - -const MonCommand *Monitor::_get_moncommand( - const string &cmd_prefix, - const vector& cmds) -{ - for (auto& c : cmds) { - if (c.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) { - return &c; - } - } - return nullptr; -} - -bool Monitor::_allowed_command(MonSession *s, string &module, string &prefix, - const map& cmdmap, - const map& param_str_map, - const MonCommand *this_cmd) { - - bool cmd_r = this_cmd->requires_perm('r'); - bool cmd_w = this_cmd->requires_perm('w'); - bool cmd_x = this_cmd->requires_perm('x'); - - bool capable = s->caps.is_capable( - g_ceph_context, - CEPH_ENTITY_TYPE_MON, - s->entity_name, - module, prefix, param_str_map, - cmd_r, cmd_w, cmd_x); - - dout(10) << __func__ << " " << (capable ? "" : "not ") << "capable" << dendl; - return capable; -} - -void Monitor::format_command_descriptions(const std::vector &commands, - Formatter *f, - bufferlist *rdata, - bool hide_mgr_flag) -{ - int cmdnum = 0; - f->open_object_section("command_descriptions"); - for (const auto &cmd : commands) { - unsigned flags = cmd.flags; - if (hide_mgr_flag) { - flags &= ~MonCommand::FLAG_MGR; - } - ostringstream secname; - secname << "cmd" << setfill('0') << std::setw(3) << cmdnum; - dump_cmddesc_to_json(f, secname.str(), - cmd.cmdstring, cmd.helpstring, cmd.module, - cmd.req_perms, cmd.availability, flags); - cmdnum++; - } - f->close_section(); // command_descriptions - - f->flush(*rdata); -} - -bool Monitor::is_keyring_required() -{ - string auth_cluster_required = g_conf->auth_supported.empty() ? - g_conf->auth_cluster_required : g_conf->auth_supported; - string auth_service_required = g_conf->auth_supported.empty() ? - g_conf->auth_service_required : g_conf->auth_supported; - - return auth_service_required == "cephx" || - auth_cluster_required == "cephx"; -} - -struct C_MgrProxyCommand : public Context { - Monitor *mon; - MonOpRequestRef op; - uint64_t size; - bufferlist outbl; - string outs; - C_MgrProxyCommand(Monitor *mon, MonOpRequestRef op, uint64_t s) - : mon(mon), op(op), size(s) { } - void finish(int r) { - Mutex::Locker l(mon->lock); - mon->mgr_proxy_bytes -= size; - mon->reply_command(op, r, outs, outbl, 0); - } -}; - -void Monitor::handle_command(MonOpRequestRef op) -{ - assert(op->is_type_command()); - MMonCommand *m = static_cast(op->get_req()); - if (m->fsid != monmap->fsid) { - dout(0) << "handle_command on fsid " << m->fsid << " != " << monmap->fsid << dendl; - reply_command(op, -EPERM, "wrong fsid", 0); - return; - } - - MonSession *session = static_cast( - m->get_connection()->get_priv()); - if (!session) { - dout(5) << __func__ << " dropping stray message " << *m << dendl; - return; - } - BOOST_SCOPE_EXIT_ALL(=) { - session->put(); - }; - - if (m->cmd.empty()) { - string rs = "No command supplied"; - reply_command(op, -EINVAL, rs, 0); - return; - } - - string prefix; - vector fullcmd; - map cmdmap; - stringstream ss, ds; - bufferlist rdata; - string rs; - int r = -EINVAL; - rs = "unrecognized command"; - - if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { - // ss has reason for failure - r = -EINVAL; - rs = ss.str(); - if (!m->get_source().is_mon()) // don't reply to mon->mon commands - reply_command(op, r, rs, 0); - return; - } - - // check return value. If no prefix parameter provided, - // return value will be false, then return error info. - if (!cmd_getval(g_ceph_context, cmdmap, "prefix", prefix)) { - reply_command(op, -EINVAL, "command prefix not found", 0); - return; - } - - // check prefix is empty - if (prefix.empty()) { - reply_command(op, -EINVAL, "command prefix must not be empty", 0); - return; - } - - if (prefix == "get_command_descriptions") { - bufferlist rdata; - Formatter *f = Formatter::create("json"); - // hide mgr commands until luminous upgrade is complete - bool hide_mgr_flag = - osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS; - - std::vector commands; - - // only include mgr commands once all mons are upgrade (and we've dropped - // the hard-coded PGMonitor commands) - if (quorum_mon_features.contains_all(ceph::features::mon::FEATURE_LUMINOUS)) { - commands = static_cast( - paxos_service[PAXOS_MGR])->get_command_descs(); - } - - for (auto& c : leader_mon_commands) { - commands.push_back(c); - } - - format_command_descriptions(commands, f, &rdata, hide_mgr_flag); - delete f; - reply_command(op, 0, "", rdata, 0); - return; - } - - string module; - string err; - - dout(0) << "handle_command " << *m << dendl; - - string format; - cmd_getval(g_ceph_context, cmdmap, "format", format, string("plain")); - boost::scoped_ptr f(Formatter::create(format)); - - get_str_vec(prefix, fullcmd); - - // make sure fullcmd is not empty. - // invalid prefix will cause empty vector fullcmd. - // such as, prefix=";,,;" - if (fullcmd.empty()) { - reply_command(op, -EINVAL, "command requires a prefix to be valid", 0); - return; - } - - module = fullcmd[0]; - - // validate command is in leader map - - const MonCommand *leader_cmd; - const auto& mgr_cmds = mgrmon()->get_command_descs(); - const MonCommand *mgr_cmd = nullptr; - if (!mgr_cmds.empty()) { - mgr_cmd = _get_moncommand(prefix, mgr_cmds); - } - leader_cmd = _get_moncommand(prefix, leader_mon_commands); - if (!leader_cmd) { - leader_cmd = mgr_cmd; - if (!leader_cmd) { - reply_command(op, -EINVAL, "command not known", 0); - return; - } - } - // validate command is in our map & matches, or forward if it is allowed - const MonCommand *mon_cmd = _get_moncommand( - prefix, - get_local_commands(quorum_mon_features)); - if (!mon_cmd) { - mon_cmd = mgr_cmd; - } - if (!is_leader()) { - if (!mon_cmd) { - if (leader_cmd->is_noforward()) { - reply_command(op, -EINVAL, - "command not locally supported and not allowed to forward", - 0); - return; - } - dout(10) << "Command not locally supported, forwarding request " - << m << dendl; - forward_request_leader(op); - return; - } else if (!mon_cmd->is_compat(leader_cmd)) { - if (mon_cmd->is_noforward()) { - reply_command(op, -EINVAL, - "command not compatible with leader and not allowed to forward", - 0); - return; - } - dout(10) << "Command not compatible with leader, forwarding request " - << m << dendl; - forward_request_leader(op); - return; - } - } - - if (mon_cmd->is_obsolete() || - (cct->_conf->mon_debug_deprecated_as_obsolete - && mon_cmd->is_deprecated())) { - reply_command(op, -ENOTSUP, - "command is obsolete; please check usage and/or man page", - 0); - return; - } - - if (session->proxy_con && mon_cmd->is_noforward()) { - dout(10) << "Got forward for noforward command " << m << dendl; - reply_command(op, -EINVAL, "forward for noforward command", rdata, 0); - return; - } - - /* what we perceive as being the service the command falls under */ - string service(mon_cmd->module); - - dout(25) << __func__ << " prefix='" << prefix - << "' module='" << module - << "' service='" << service << "'" << dendl; - - bool cmd_is_rw = - (mon_cmd->requires_perm('w') || mon_cmd->requires_perm('x')); - - // validate user's permissions for requested command - map param_str_map; - _generate_command_map(cmdmap, param_str_map); - if (!_allowed_command(session, service, prefix, cmdmap, - param_str_map, mon_cmd)) { - dout(1) << __func__ << " access denied" << dendl; - (cmd_is_rw ? audit_clog->info() : audit_clog->debug()) - << "from='" << session->inst << "' " - << "entity='" << session->entity_name << "' " - << "cmd=" << m->cmd << ": access denied"; - reply_command(op, -EACCES, "access denied", 0); - return; - } - - (cmd_is_rw ? audit_clog->info() : audit_clog->debug()) - << "from='" << session->inst << "' " - << "entity='" << session->entity_name << "' " - << "cmd=" << m->cmd << ": dispatch"; - - if (mon_cmd->is_mgr() && - osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { - const auto& hdr = m->get_header(); - uint64_t size = hdr.front_len + hdr.middle_len + hdr.data_len; - uint64_t max = g_conf->get_val("mon_client_bytes") - * g_conf->get_val("mon_mgr_proxy_client_bytes_ratio"); - if (mgr_proxy_bytes + size > max) { - dout(10) << __func__ << " current mgr proxy bytes " << mgr_proxy_bytes - << " + " << size << " > max " << max << dendl; - reply_command(op, -EAGAIN, "hit limit on proxied mgr commands", rdata, 0); - return; - } - mgr_proxy_bytes += size; - dout(10) << __func__ << " proxying mgr command (+" << size - << " -> " << mgr_proxy_bytes << ")" << dendl; - C_MgrProxyCommand *fin = new C_MgrProxyCommand(this, op, size); - mgr_client.start_command(m->cmd, - m->get_data(), - &fin->outbl, - &fin->outs, - new C_OnFinisher(fin, &finisher)); - return; - } - - if ((module == "mds" || module == "fs") && - prefix != "fs authorize") { - mdsmon()->dispatch(op); - return; - } - if ((module == "osd" || prefix == "pg map") && - prefix != "osd last-stat-seq") { - osdmon()->dispatch(op); - return; - } - - if (module == "pg") { - pgmon()->dispatch(op); - return; - } - if (module == "mon" && - /* Let the Monitor class handle the following commands: - * 'mon compact' - * 'mon scrub' - * 'mon sync force' - */ - prefix != "mon compact" && - prefix != "mon scrub" && - prefix != "mon sync force" && - prefix != "mon metadata" && - prefix != "mon versions" && - prefix != "mon count-metadata") { - monmon()->dispatch(op); - return; - } - if (module == "auth" || prefix == "fs authorize") { - authmon()->dispatch(op); - return; - } - if (module == "log") { - logmon()->dispatch(op); - return; - } - - if (module == "config-key") { - config_key_service->dispatch(op); - return; - } - - if (module == "mgr") { - mgrmon()->dispatch(op); - return; - } - - if (prefix == "fsid") { - if (f) { - f->open_object_section("fsid"); - f->dump_stream("fsid") << monmap->fsid; - f->close_section(); - f->flush(rdata); - } else { - ds << monmap->fsid; - rdata.append(ds); - } - reply_command(op, 0, "", rdata, 0); - return; - } - - if (prefix == "scrub" || prefix == "mon scrub") { - wait_for_paxos_write(); - if (is_leader()) { - int r = scrub_start(); - reply_command(op, r, "", rdata, 0); - } else if (is_peon()) { - forward_request_leader(op); - } else { - reply_command(op, -EAGAIN, "no quorum", rdata, 0); - } - return; - } - - if (prefix == "compact" || prefix == "mon compact") { - dout(1) << "triggering manual compaction" << dendl; - utime_t start = ceph_clock_now(); - store->compact(); - utime_t end = ceph_clock_now(); - end -= start; - dout(1) << "finished manual compaction in " << end << " seconds" << dendl; - ostringstream oss; - oss << "compacted " << g_conf->get_val("mon_keyvaluedb") << " in " << end << " seconds"; - rs = oss.str(); - r = 0; - } - else if (prefix == "injectargs") { - vector injected_args; - cmd_getval(g_ceph_context, cmdmap, "injected_args", injected_args); - if (!injected_args.empty()) { - dout(0) << "parsing injected options '" << injected_args << "'" << dendl; - ostringstream oss; - r = g_conf->injectargs(str_join(injected_args, " "), &oss); - ss << "injectargs:" << oss.str(); - rs = ss.str(); - goto out; - } else { - rs = "must supply options to be parsed in a single string"; - r = -EINVAL; - } - } else if (prefix == "time-sync-status") { - if (!f) - f.reset(Formatter::create("json-pretty")); - f->open_object_section("time_sync"); - if (!timecheck_skews.empty()) { - f->open_object_section("time_skew_status"); - for (auto& i : timecheck_skews) { - entity_inst_t inst = i.first; - double skew = i.second; - double latency = timecheck_latencies[inst]; - string name = monmap->get_name(inst.addr); - ostringstream tcss; - health_status_t tcstatus = timecheck_status(tcss, skew, latency); - f->open_object_section(name.c_str()); - f->dump_float("skew", skew); - f->dump_float("latency", latency); - f->dump_stream("health") << tcstatus; - if (tcstatus != HEALTH_OK) { - f->dump_stream("details") << tcss.str(); - } - f->close_section(); - } - f->close_section(); - } - f->open_object_section("timechecks"); - f->dump_unsigned("epoch", get_epoch()); - f->dump_int("round", timecheck_round); - f->dump_stream("round_status") << ((timecheck_round%2) ? - "on-going" : "finished"); - f->close_section(); - f->close_section(); - f->flush(rdata); - r = 0; - rs = ""; - } else if (prefix == "config set") { - std::string key; - cmd_getval(cct, cmdmap, "key", key); - std::string val; - cmd_getval(cct, cmdmap, "value", val); - r = g_conf->set_val(key, val, true, &ss); - if (r == 0) { - g_conf->apply_changes(nullptr); - } - rs = ss.str(); - goto out; - } else if (prefix == "status" || - prefix == "health" || - prefix == "df") { - string detail; - cmd_getval(g_ceph_context, cmdmap, "detail", detail); - - if (prefix == "status") { - // get_cluster_status handles f == NULL - get_cluster_status(ds, f.get()); - - if (f) { - f->flush(ds); - ds << '\n'; - } - rdata.append(ds); - } else if (prefix == "health") { - if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { - string plain; - get_health_status(detail == "detail", f.get(), f ? nullptr : &plain); - if (f) { - f->flush(rdata); - } else { - rdata.append(plain); - } - } else { - list health_str; - get_health(health_str, detail == "detail" ? &rdata : NULL, f.get()); - if (f) { - f->flush(ds); - ds << '\n'; - } else { - assert(!health_str.empty()); - ds << health_str.front(); - health_str.pop_front(); - if (!health_str.empty()) { - ds << ' '; - ds << joinify(health_str.begin(), health_str.end(), string("; ")); - } - } - bufferlist comb; - comb.append(ds); - if (detail == "detail") - comb.append(rdata); - rdata = comb; - } - } else if (prefix == "df") { - bool verbose = (detail == "detail"); - if (f) - f->open_object_section("stats"); - - pgservice->dump_fs_stats(&ds, f.get(), verbose); - if (!f) - ds << '\n'; - pgservice->dump_pool_stats(osdmon()->osdmap, &ds, f.get(), verbose); - - if (f) { - f->close_section(); - f->flush(ds); - ds << '\n'; - } - } else { - assert(0 == "We should never get here!"); - return; - } - rdata.append(ds); - rs = ""; - r = 0; - } else if (prefix == "report") { - - // this must be formatted, in its current form - if (!f) - f.reset(Formatter::create("json-pretty")); - f->open_object_section("report"); - f->dump_stream("cluster_fingerprint") << fingerprint; - f->dump_string("version", ceph_version_to_str()); - f->dump_string("commit", git_version_to_str()); - f->dump_stream("timestamp") << ceph_clock_now(); - - vector tagsvec; - cmd_getval(g_ceph_context, cmdmap, "tags", tagsvec); - string tagstr = str_join(tagsvec, " "); - if (!tagstr.empty()) - tagstr = tagstr.substr(0, tagstr.find_last_of(' ')); - f->dump_string("tag", tagstr); - - if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { - get_health_status(true, f.get(), nullptr); - } else { - list health_str; - get_health(health_str, nullptr, f.get()); - } - - monmon()->dump_info(f.get()); - osdmon()->dump_info(f.get()); - mdsmon()->dump_info(f.get()); - authmon()->dump_info(f.get()); - pgservice->dump_info(f.get()); - - paxos->dump_info(f.get()); - - f->close_section(); - f->flush(rdata); - - ostringstream ss2; - ss2 << "report " << rdata.crc32c(CEPH_MON_PORT); - rs = ss2.str(); - r = 0; - } else if (prefix == "osd last-stat-seq") { - int64_t osd; - cmd_getval(g_ceph_context, cmdmap, "id", osd); - uint64_t seq = mgrstatmon()->get_last_osd_stat_seq(osd); - if (f) { - f->dump_unsigned("seq", seq); - f->flush(ds); - } else { - ds << seq; - rdata.append(ds); - } - rs = ""; - r = 0; - } else if (prefix == "node ls") { - string node_type("all"); - cmd_getval(g_ceph_context, cmdmap, "type", node_type); - if (!f) - f.reset(Formatter::create("json-pretty")); - if (node_type == "all") { - f->open_object_section("nodes"); - print_nodes(f.get(), ds); - osdmon()->print_nodes(f.get()); - mdsmon()->print_nodes(f.get()); - f->close_section(); - } else if (node_type == "mon") { - print_nodes(f.get(), ds); - } else if (node_type == "osd") { - osdmon()->print_nodes(f.get()); - } else if (node_type == "mds") { - mdsmon()->print_nodes(f.get()); - } - f->flush(ds); - rdata.append(ds); - rs = ""; - r = 0; - } else if (prefix == "features") { - if (!is_leader() && !is_peon()) { - dout(10) << " waiting for quorum" << dendl; - waitfor_quorum.push_back(new C_RetryMessage(this, op)); - return; - } - if (!is_leader()) { - forward_request_leader(op); - return; - } - if (!f) - f.reset(Formatter::create("json-pretty")); - FeatureMap fm; - get_combined_feature_map(&fm); - f->dump_object("features", fm); - f->flush(rdata); - rs = ""; - r = 0; - } else if (prefix == "mon metadata") { - if (!f) - f.reset(Formatter::create("json-pretty")); - - string name; - bool all = !cmd_getval(g_ceph_context, cmdmap, "id", name); - if (!all) { - // Dump a single mon's metadata - int mon = monmap->get_rank(name); - if (mon < 0) { - rs = "requested mon not found"; - r = -ENOENT; - goto out; - } - f->open_object_section("mon_metadata"); - r = get_mon_metadata(mon, f.get(), ds); - f->close_section(); - } else { - // Dump all mons' metadata - r = 0; - f->open_array_section("mon_metadata"); - for (unsigned int rank = 0; rank < monmap->size(); ++rank) { - std::ostringstream get_err; - f->open_object_section("mon"); - f->dump_string("name", monmap->get_name(rank)); - r = get_mon_metadata(rank, f.get(), get_err); - f->close_section(); - if (r == -ENOENT || r == -EINVAL) { - dout(1) << get_err.str() << dendl; - // Drop error, list what metadata we do have - r = 0; - } else if (r != 0) { - derr << "Unexpected error from get_mon_metadata: " - << cpp_strerror(r) << dendl; - ds << get_err.str(); - break; - } - } - f->close_section(); - } - - f->flush(ds); - rdata.append(ds); - rs = ""; - } else if (prefix == "mon versions") { - if (!f) - f.reset(Formatter::create("json-pretty")); - count_metadata("ceph_version", f.get()); - f->flush(ds); - rdata.append(ds); - rs = ""; - r = 0; - } else if (prefix == "mon count-metadata") { - if (!f) - f.reset(Formatter::create("json-pretty")); - string field; - cmd_getval(g_ceph_context, cmdmap, "property", field); - count_metadata(field, f.get()); - f->flush(ds); - rdata.append(ds); - rs = ""; - r = 0; - } else if (prefix == "quorum_status") { - // make sure our map is readable and up to date - if (!is_leader() && !is_peon()) { - dout(10) << " waiting for quorum" << dendl; - waitfor_quorum.push_back(new C_RetryMessage(this, op)); - return; - } - _quorum_status(f.get(), ds); - rdata.append(ds); - rs = ""; - r = 0; - } else if (prefix == "mon_status") { - get_mon_status(f.get(), ds); - if (f) - f->flush(ds); - rdata.append(ds); - rs = ""; - r = 0; - } else if (prefix == "sync force" || - prefix == "mon sync force") { - string validate1, validate2; - cmd_getval(g_ceph_context, cmdmap, "validate1", validate1); - cmd_getval(g_ceph_context, cmdmap, "validate2", validate2); - if (validate1 != "--yes-i-really-mean-it" || - validate2 != "--i-know-what-i-am-doing") { - r = -EINVAL; - rs = "are you SURE? this will mean the monitor store will be " - "erased. pass '--yes-i-really-mean-it " - "--i-know-what-i-am-doing' if you really do."; - goto out; - } - sync_force(f.get(), ds); - rs = ds.str(); - r = 0; - } else if (prefix == "heap") { - if (!ceph_using_tcmalloc()) - rs = "tcmalloc not enabled, can't use heap profiler commands\n"; - else { - string heapcmd; - cmd_getval(g_ceph_context, cmdmap, "heapcmd", heapcmd); - // XXX 1-element vector, change at callee or make vector here? - vector heapcmd_vec; - get_str_vec(heapcmd, heapcmd_vec); - ceph_heap_profiler_handle_command(heapcmd_vec, ds); - rdata.append(ds); - rs = ""; - r = 0; - } - } else if (prefix == "quorum") { - string quorumcmd; - cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd); - if (quorumcmd == "exit") { - start_election(); - elector.stop_participating(); - rs = "stopped responding to quorum, initiated new election"; - r = 0; - } else if (quorumcmd == "enter") { - elector.start_participating(); - start_election(); - rs = "started responding to quorum, initiated new election"; - r = 0; - } else { - rs = "needs a valid 'quorum' command"; - r = -EINVAL; - } - } else if (prefix == "version") { - if (f) { - f->open_object_section("version"); - f->dump_string("version", pretty_version_to_str()); - f->close_section(); - f->flush(ds); - } else { - ds << pretty_version_to_str(); - } - rdata.append(ds); - rs = ""; - r = 0; - } else if (prefix == "versions") { - if (!f) - f.reset(Formatter::create("json-pretty")); - map overall; - f->open_object_section("version"); - map mon, mgr, osd, mds; - - count_metadata("ceph_version", &mon); - f->open_object_section("mon"); - for (auto& p : mon) { - f->dump_int(p.first.c_str(), p.second); - overall[p.first] += p.second; - } - f->close_section(); - - mgrmon()->count_metadata("ceph_version", &mgr); - f->open_object_section("mgr"); - for (auto& p : mgr) { - f->dump_int(p.first.c_str(), p.second); - overall[p.first] += p.second; - } - f->close_section(); - - osdmon()->count_metadata("ceph_version", &osd); - f->open_object_section("osd"); - for (auto& p : osd) { - f->dump_int(p.first.c_str(), p.second); - overall[p.first] += p.second; - } - f->close_section(); - - mdsmon()->count_metadata("ceph_version", &mds); - f->open_object_section("mds"); - for (auto& p : mds) { - f->dump_int(p.first.c_str(), p.second); - overall[p.first] += p.second; - } - f->close_section(); - - for (auto& p : mgrstatmon()->get_service_map().services) { - f->open_object_section(p.first.c_str()); - map m; - p.second.count_metadata("ceph_version", &m); - for (auto& q : m) { - f->dump_int(q.first.c_str(), q.second); - overall[q.first] += q.second; - } - f->close_section(); - } - - f->open_object_section("overall"); - for (auto& p : overall) { - f->dump_int(p.first.c_str(), p.second); - } - f->close_section(); - f->close_section(); - f->flush(rdata); - rs = ""; - r = 0; - } - - out: - if (!m->get_source().is_mon()) // don't reply to mon->mon commands - reply_command(op, r, rs, rdata, 0); -} - -void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, version_t version) -{ - bufferlist rdata; - reply_command(op, rc, rs, rdata, version); -} - -void Monitor::reply_command(MonOpRequestRef op, int rc, const string &rs, - bufferlist& rdata, version_t version) -{ - MMonCommand *m = static_cast(op->get_req()); - assert(m->get_type() == MSG_MON_COMMAND); - MMonCommandAck *reply = new MMonCommandAck(m->cmd, rc, rs, version); - reply->set_tid(m->get_tid()); - reply->set_data(rdata); - send_reply(op, reply); -} - - -// ------------------------ -// request/reply routing -// -// a client/mds/osd will connect to a random monitor. we need to forward any -// messages requiring state updates to the leader, and then route any replies -// back via the correct monitor and back to them. (the monitor will not -// initiate any connections.) - -void Monitor::forward_request_leader(MonOpRequestRef op) -{ - op->mark_event(__func__); - - int mon = get_leader(); - MonSession *session = op->get_session(); - PaxosServiceMessage *req = op->get_req(); - - if (req->get_source().is_mon() && req->get_source_addr() != messenger->get_myaddr()) { - dout(10) << "forward_request won't forward (non-local) mon request " << *req << dendl; - } else if (session->proxy_con) { - dout(10) << "forward_request won't double fwd request " << *req << dendl; - } else if (!session->closed) { - RoutedRequest *rr = new RoutedRequest; - rr->tid = ++routed_request_tid; - rr->client_inst = req->get_source_inst(); - rr->con = req->get_connection(); - rr->con_features = rr->con->get_features(); - encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features - rr->session = static_cast(session->get()); - rr->op = op; - routed_requests[rr->tid] = rr; - session->routed_request_tids.insert(rr->tid); - - dout(10) << "forward_request " << rr->tid << " request " << *req - << " features " << rr->con_features << dendl; - - MForward *forward = new MForward(rr->tid, - req, - rr->con_features, - rr->session->caps); - forward->set_priority(req->get_priority()); - if (session->auth_handler) { - forward->entity_name = session->entity_name; - } else if (req->get_source().is_mon()) { - forward->entity_name.set_type(CEPH_ENTITY_TYPE_MON); - } - messenger->send_message(forward, monmap->get_inst(mon)); - op->mark_forwarded(); - assert(op->get_req()->get_type() != 0); - } else { - dout(10) << "forward_request no session for request " << *req << dendl; - } -} - -// fake connection attached to forwarded messages -struct AnonConnection : public Connection { - explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {} - - int send_message(Message *m) override { - assert(!"send_message on anonymous connection"); - } - void send_keepalive() override { - assert(!"send_keepalive on anonymous connection"); - } - void mark_down() override { - // silently ignore - } - void mark_disposable() override { - // silengtly ignore - } - bool is_connected() override { return false; } -}; - -//extract the original message and put it into the regular dispatch function -void Monitor::handle_forward(MonOpRequestRef op) -{ - MForward *m = static_cast(op->get_req()); - dout(10) << "received forwarded message from " << m->client - << " via " << m->get_source_inst() << dendl; - MonSession *session = op->get_session(); - assert(session); - - if (!session->is_capable("mon", MON_CAP_X)) { - dout(0) << "forward from entity with insufficient caps! " - << session->caps << dendl; - } else { - // see PaxosService::dispatch(); we rely on this being anon - // (c->msgr == NULL) - PaxosServiceMessage *req = m->claim_message(); - assert(req != NULL); - - ConnectionRef c(new AnonConnection(cct)); - MonSession *s = new MonSession(req->get_source_inst(), - static_cast(c.get())); - c->set_priv(s->get()); - c->set_peer_addr(m->client.addr); - c->set_peer_type(m->client.name.type()); - c->set_features(m->con_features); - - s->caps = m->client_caps; - dout(10) << " caps are " << s->caps << dendl; - s->entity_name = m->entity_name; - dout(10) << " entity name '" << s->entity_name << "' type " - << s->entity_name.get_type() << dendl; - s->proxy_con = m->get_connection(); - s->proxy_tid = m->tid; - - req->set_connection(c); - - // not super accurate, but better than nothing. - req->set_recv_stamp(m->get_recv_stamp()); - - /* - * note which election epoch this is; we will drop the message if - * there is a future election since our peers will resend routed - * requests in that case. - */ - req->rx_election_epoch = get_epoch(); - - /* Because this is a special fake connection, we need to break - the ref loop between Connection and MonSession differently - than we normally do. Here, the Message refers to the Connection - which refers to the Session, and nobody else refers to the Connection - or the Session. And due to the special nature of this message, - nobody refers to the Connection via the Session. So, clear out that - half of the ref loop.*/ - s->con.reset(NULL); - - dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl; - - _ms_dispatch(req); - s->put(); - } -} - -void Monitor::try_send_message(Message *m, const entity_inst_t& to) -{ - dout(10) << "try_send_message " << *m << " to " << to << dendl; - - bufferlist bl; - encode_message(m, quorum_con_features, bl); - - messenger->send_message(m, to); - - for (int i=0; i<(int)monmap->size(); i++) { - if (i != rank) - messenger->send_message(new MRoute(bl, to), monmap->get_inst(i)); - } -} - -void Monitor::send_reply(MonOpRequestRef op, Message *reply) -{ - op->mark_event(__func__); - - MonSession *session = op->get_session(); - assert(session); - Message *req = op->get_req(); - ConnectionRef con = op->get_connection(); - - reply->set_cct(g_ceph_context); - dout(2) << __func__ << " " << op << " " << reply << " " << *reply << dendl; - - if (!con) { - dout(2) << "send_reply no connection, dropping reply " << *reply - << " to " << req << " " << *req << dendl; - reply->put(); - op->mark_event("reply: no connection"); - return; - } - - if (!session->con && !session->proxy_con) { - dout(2) << "send_reply no connection, dropping reply " << *reply - << " to " << req << " " << *req << dendl; - reply->put(); - op->mark_event("reply: no connection"); - return; - } - - if (session->proxy_con) { - dout(15) << "send_reply routing reply to " << con->get_peer_addr() - << " via " << session->proxy_con->get_peer_addr() - << " for request " << *req << dendl; - session->proxy_con->send_message(new MRoute(session->proxy_tid, reply)); - op->mark_event("reply: send routed request"); - } else { - session->con->send_message(reply); - op->mark_event("reply: send"); - } -} - -void Monitor::no_reply(MonOpRequestRef op) -{ - MonSession *session = op->get_session(); - Message *req = op->get_req(); - - if (session->proxy_con) { - dout(10) << "no_reply to " << req->get_source_inst() - << " via " << session->proxy_con->get_peer_addr() - << " for request " << *req << dendl; - session->proxy_con->send_message(new MRoute(session->proxy_tid, NULL)); - op->mark_event("no_reply: send routed request"); - } else { - dout(10) << "no_reply to " << req->get_source_inst() - << " " << *req << dendl; - op->mark_event("no_reply"); - } -} - -void Monitor::handle_route(MonOpRequestRef op) -{ - MRoute *m = static_cast(op->get_req()); - MonSession *session = op->get_session(); - //check privileges - if (!session->is_capable("mon", MON_CAP_X)) { - dout(0) << "MRoute received from entity without appropriate perms! " - << dendl; - return; - } - if (m->msg) - dout(10) << "handle_route " << *m->msg << " to " << m->dest << dendl; - else - dout(10) << "handle_route null to " << m->dest << dendl; - - // look it up - if (m->session_mon_tid) { - if (routed_requests.count(m->session_mon_tid)) { - RoutedRequest *rr = routed_requests[m->session_mon_tid]; - - // reset payload, in case encoding is dependent on target features - if (m->msg) { - m->msg->clear_payload(); - rr->con->send_message(m->msg); - m->msg = NULL; - } - if (m->send_osdmap_first) { - dout(10) << " sending osdmaps from " << m->send_osdmap_first << dendl; - osdmon()->send_incremental(m->send_osdmap_first, rr->session, - true, MonOpRequestRef()); - } - assert(rr->tid == m->session_mon_tid && rr->session->routed_request_tids.count(m->session_mon_tid)); - routed_requests.erase(m->session_mon_tid); - rr->session->routed_request_tids.erase(m->session_mon_tid); - delete rr; - } else { - dout(10) << " don't have routed request tid " << m->session_mon_tid << dendl; - } - } else { - dout(10) << " not a routed request, trying to send anyway" << dendl; - if (m->msg) { - messenger->send_message(m->msg, m->dest); - m->msg = NULL; - } - } -} - -void Monitor::resend_routed_requests() -{ - dout(10) << "resend_routed_requests" << dendl; - int mon = get_leader(); - list retry; - for (map::iterator p = routed_requests.begin(); - p != routed_requests.end(); - ++p) { - RoutedRequest *rr = p->second; - - if (mon == rank) { - dout(10) << " requeue for self tid " << rr->tid << dendl; - rr->op->mark_event("retry routed request"); - retry.push_back(new C_RetryMessage(this, rr->op)); - if (rr->session) { - assert(rr->session->routed_request_tids.count(p->first)); - rr->session->routed_request_tids.erase(p->first); - } - delete rr; - } else { - bufferlist::iterator q = rr->request_bl.begin(); - PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q); - rr->op->mark_event("resend forwarded message to leader"); - dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl; - MForward *forward = new MForward(rr->tid, req, rr->con_features, - rr->session->caps); - req->put(); // forward takes its own ref; drop ours. - forward->client = rr->client_inst; - forward->set_priority(req->get_priority()); - messenger->send_message(forward, monmap->get_inst(mon)); - } - } - if (mon == rank) { - routed_requests.clear(); - finish_contexts(g_ceph_context, retry); - } -} - -void Monitor::remove_session(MonSession *s) -{ - dout(10) << "remove_session " << s << " " << s->inst - << " features 0x" << std::hex << s->con_features << std::dec << dendl; - assert(s->con); - assert(!s->closed); - for (set::iterator p = s->routed_request_tids.begin(); - p != s->routed_request_tids.end(); - ++p) { - assert(routed_requests.count(*p)); - RoutedRequest *rr = routed_requests[*p]; - dout(10) << " dropping routed request " << rr->tid << dendl; - delete rr; - routed_requests.erase(*p); - } - s->routed_request_tids.clear(); - s->con->set_priv(NULL); - session_map.remove_session(s); - logger->set(l_mon_num_sessions, session_map.get_size()); - logger->inc(l_mon_session_rm); -} - -void Monitor::remove_all_sessions() -{ - Mutex::Locker l(session_map_lock); - while (!session_map.sessions.empty()) { - MonSession *s = session_map.sessions.front(); - remove_session(s); - if (logger) - logger->inc(l_mon_session_rm); - } - if (logger) - logger->set(l_mon_num_sessions, session_map.get_size()); -} - -void Monitor::send_command(const entity_inst_t& inst, - const vector& com) -{ - dout(10) << "send_command " << inst << "" << com << dendl; - MMonCommand *c = new MMonCommand(monmap->fsid); - c->cmd = com; - try_send_message(c, inst); -} - -void Monitor::waitlist_or_zap_client(MonOpRequestRef op) -{ - /** - * Wait list the new session until we're in the quorum, assuming it's - * sufficiently new. - * tick() will periodically send them back through so we can send - * the client elsewhere if we don't think we're getting back in. - * - * But we whitelist a few sorts of messages: - * 1) Monitors can talk to us at any time, of course. - * 2) auth messages. It's unlikely to go through much faster, but - * it's possible we've just lost our quorum status and we want to take... - * 3) command messages. We want to accept these under all possible - * circumstances. - */ - Message *m = op->get_req(); - MonSession *s = op->get_session(); - ConnectionRef con = op->get_connection(); - utime_t too_old = ceph_clock_now(); - too_old -= g_ceph_context->_conf->mon_lease; - if (m->get_recv_stamp() > too_old && - con->is_connected()) { - dout(5) << "waitlisting message " << *m << dendl; - maybe_wait_for_quorum.push_back(new C_RetryMessage(this, op)); - op->mark_wait_for_quorum(); - } else { - dout(5) << "discarding message " << *m << " and sending client elsewhere" << dendl; - con->mark_down(); - // proxied sessions aren't registered and don't have a con; don't remove - // those. - if (!s->proxy_con) { - Mutex::Locker l(session_map_lock); - remove_session(s); - } - op->mark_zap(); - } -} - -void Monitor::_ms_dispatch(Message *m) -{ - if (is_shutdown()) { - m->put(); - return; - } - - MonOpRequestRef op = op_tracker.create_request(m); - bool src_is_mon = op->is_src_mon(); - op->mark_event("mon:_ms_dispatch"); - MonSession *s = op->get_session(); - if (s && s->closed) { - return; - } - - if (src_is_mon && s) { - ConnectionRef con = m->get_connection(); - if (con->get_messenger() && con->get_features() != s->con_features) { - // only update features if this is a non-anonymous connection - dout(10) << __func__ << " feature change for " << m->get_source_inst() - << " (was " << s->con_features - << ", now " << con->get_features() << ")" << dendl; - // connection features changed - recreate session. - if (s->con && s->con != con) { - dout(10) << __func__ << " connection for " << m->get_source_inst() - << " changed from session; mark down and replace" << dendl; - s->con->mark_down(); - } - if (s->item.is_on_list()) { - // forwarded messages' sessions are not in the sessions map and - // exist only while the op is being handled. - remove_session(s); - } - s->put(); - s = nullptr; - } - } - - if (!s) { - // if the sender is not a monitor, make sure their first message for a - // session is an MAuth. If it is not, assume it's a stray message, - // and considering that we are creating a new session it is safe to - // assume that the sender hasn't authenticated yet, so we have no way - // of assessing whether we should handle it or not. - if (!src_is_mon && (m->get_type() != CEPH_MSG_AUTH && - m->get_type() != CEPH_MSG_MON_GET_MAP && - m->get_type() != CEPH_MSG_PING)) { - dout(1) << __func__ << " dropping stray message " << *m - << " from " << m->get_source_inst() << dendl; - return; - } - - ConnectionRef con = m->get_connection(); - { - Mutex::Locker l(session_map_lock); - s = session_map.new_session(m->get_source_inst(), con.get()); - } - assert(s); - con->set_priv(s->get()); - dout(10) << __func__ << " new session " << s << " " << *s - << " features 0x" << std::hex - << s->con_features << std::dec << dendl; - op->set_session(s); - - logger->set(l_mon_num_sessions, session_map.get_size()); - logger->inc(l_mon_session_add); - - if (src_is_mon) { - // give it monitor caps; the peer type has been authenticated - dout(5) << __func__ << " setting monitor caps on this connection" << dendl; - if (!s->caps.is_allow_all()) // but no need to repeatedly copy - s->caps = *mon_caps; - } - s->put(); - } else { - dout(20) << __func__ << " existing session " << s << " for " << s->inst - << dendl; - } - - assert(s); - - s->session_timeout = ceph_clock_now(); - s->session_timeout += g_conf->mon_session_timeout; - - if (s->auth_handler) { - s->entity_name = s->auth_handler->get_entity_name(); - } - dout(20) << " caps " << s->caps.get_str() << dendl; - - if ((is_synchronizing() || - (s->global_id == 0 && !exited_quorum.is_zero())) && - !src_is_mon && - m->get_type() != CEPH_MSG_PING) { - waitlist_or_zap_client(op); - } else { - dispatch_op(op); - } - return; -} - -void Monitor::dispatch_op(MonOpRequestRef op) -{ - op->mark_event("mon:dispatch_op"); - MonSession *s = op->get_session(); - assert(s); - if (s->closed) { - dout(10) << " session closed, dropping " << op->get_req() << dendl; - return; - } - - /* we will consider the default type as being 'monitor' until proven wrong */ - op->set_type_monitor(); - /* deal with all messages that do not necessarily need caps */ - bool dealt_with = true; - switch (op->get_req()->get_type()) { - // auth - case MSG_MON_GLOBAL_ID: - case CEPH_MSG_AUTH: - op->set_type_service(); - /* no need to check caps here */ - paxos_service[PAXOS_AUTH]->dispatch(op); - break; - - case CEPH_MSG_PING: - handle_ping(op); - break; - - /* MMonGetMap may be used by clients to obtain a monmap *before* - * authenticating with the monitor. We need to handle these without - * checking caps because, even on a cluster without cephx, we only set - * session caps *after* the auth handshake. A good example of this - * is when a client calls MonClient::get_monmap_privately(), which does - * not authenticate when obtaining a monmap. - */ - case CEPH_MSG_MON_GET_MAP: - handle_mon_get_map(op); - break; - - case CEPH_MSG_MON_METADATA: - return handle_mon_metadata(op); - - default: - dealt_with = false; - break; - } - if (dealt_with) - return; - - /* well, maybe the op belongs to a service... */ - op->set_type_service(); - /* deal with all messages which caps should be checked somewhere else */ - dealt_with = true; - switch (op->get_req()->get_type()) { - - // OSDs - case CEPH_MSG_MON_GET_OSDMAP: - case CEPH_MSG_POOLOP: - case MSG_OSD_BEACON: - case MSG_OSD_MARK_ME_DOWN: - case MSG_OSD_FULL: - case MSG_OSD_FAILURE: - case MSG_OSD_BOOT: - case MSG_OSD_ALIVE: - case MSG_OSD_PGTEMP: - case MSG_OSD_PG_CREATED: - case MSG_REMOVE_SNAPS: - paxos_service[PAXOS_OSDMAP]->dispatch(op); - break; - - // MDSs - case MSG_MDS_BEACON: - case MSG_MDS_OFFLOAD_TARGETS: - paxos_service[PAXOS_MDSMAP]->dispatch(op); - break; - - // Mgrs - case MSG_MGR_BEACON: - paxos_service[PAXOS_MGR]->dispatch(op); - break; - - // MgrStat - case CEPH_MSG_STATFS: - // this is an ugly hack, sorry! force the version to 1 so that we do - // not run afoul of the is_readable() paxos check. the client is going - // by the pgmonitor version and the MgrStatMonitor version will lag behind - // that until we complete the upgrade. The paxos ordering crap really - // doesn't matter for statfs results, so just kludge around it here. - if (osdmon()->osdmap.require_osd_release < CEPH_RELEASE_LUMINOUS) { - ((MStatfs*)op->get_req())->version = 1; - } - case MSG_MON_MGR_REPORT: - case MSG_GETPOOLSTATS: - paxos_service[PAXOS_MGRSTAT]->dispatch(op); - break; - - // pg - case MSG_PGSTATS: - paxos_service[PAXOS_PGMAP]->dispatch(op); - break; - - // log - case MSG_LOG: - paxos_service[PAXOS_LOG]->dispatch(op); - break; - - // handle_command() does its own caps checking - case MSG_MON_COMMAND: - op->set_type_command(); - handle_command(op); - break; - - default: - dealt_with = false; - break; - } - if (dealt_with) - return; - - /* nop, looks like it's not a service message; revert back to monitor */ - op->set_type_monitor(); - - /* messages we, the Monitor class, need to deal with - * but may be sent by clients. */ - - if (!op->get_session()->is_capable("mon", MON_CAP_R)) { - dout(5) << __func__ << " " << op->get_req()->get_source_inst() - << " not enough caps for " << *(op->get_req()) << " -- dropping" - << dendl; - goto drop; - } - - dealt_with = true; - switch (op->get_req()->get_type()) { - - // misc - case CEPH_MSG_MON_GET_VERSION: - handle_get_version(op); - break; - - case CEPH_MSG_MON_SUBSCRIBE: - /* FIXME: check what's being subscribed, filter accordingly */ - handle_subscribe(op); - break; - - default: - dealt_with = false; - break; - } - if (dealt_with) - return; - - if (!op->is_src_mon()) { - dout(1) << __func__ << " unexpected monitor message from" - << " non-monitor entity " << op->get_req()->get_source_inst() - << " " << *(op->get_req()) << " -- dropping" << dendl; - goto drop; - } - - /* messages that should only be sent by another monitor */ - dealt_with = true; - switch (op->get_req()->get_type()) { - - case MSG_ROUTE: - handle_route(op); - break; - - case MSG_MON_PROBE: - handle_probe(op); - break; - - // Sync (i.e., the new slurp, but on steroids) - case MSG_MON_SYNC: - handle_sync(op); - break; - case MSG_MON_SCRUB: - handle_scrub(op); - break; - - /* log acks are sent from a monitor we sent the MLog to, and are - never sent by clients to us. */ - case MSG_LOGACK: - log_client.handle_log_ack((MLogAck*)op->get_req()); - break; - - // monmap - case MSG_MON_JOIN: - op->set_type_service(); - paxos_service[PAXOS_MONMAP]->dispatch(op); - break; - - // paxos - case MSG_MON_PAXOS: - { - op->set_type_paxos(); - MMonPaxos *pm = static_cast(op->get_req()); - if (!op->get_session()->is_capable("mon", MON_CAP_X)) { - //can't send these! - break; - } - - if (state == STATE_SYNCHRONIZING) { - // we are synchronizing. These messages would do us no - // good, thus just drop them and ignore them. - dout(10) << __func__ << " ignore paxos msg from " - << pm->get_source_inst() << dendl; - break; - } - - // sanitize - if (pm->epoch > get_epoch()) { - bootstrap(); - break; - } - if (pm->epoch != get_epoch()) { - break; - } - - paxos->dispatch(op); - } - break; - - // elector messages - case MSG_MON_ELECTION: - op->set_type_election(); - //check privileges here for simplicity - if (!op->get_session()->is_capable("mon", MON_CAP_X)) { - dout(0) << "MMonElection received from entity without enough caps!" - << op->get_session()->caps << dendl; - break; - } - if (!is_probing() && !is_synchronizing()) { - elector.dispatch(op); - } - break; - - case MSG_FORWARD: - handle_forward(op); - break; - - case MSG_TIMECHECK: - handle_timecheck(op); - break; - - case MSG_MON_HEALTH: - health_monitor->dispatch(op); - break; - - case MSG_MON_HEALTH_CHECKS: - op->set_type_service(); - paxos_service[PAXOS_HEALTH]->dispatch(op); - break; - - default: - dealt_with = false; - break; - } - if (!dealt_with) { - dout(1) << "dropping unexpected " << *(op->get_req()) << dendl; - goto drop; - } - return; - -drop: - return; -} - -void Monitor::handle_ping(MonOpRequestRef op) -{ - MPing *m = static_cast(op->get_req()); - dout(10) << __func__ << " " << *m << dendl; - MPing *reply = new MPing; - entity_inst_t inst = m->get_source_inst(); - bufferlist payload; - boost::scoped_ptr f(new JSONFormatter(true)); - f->open_object_section("pong"); - - if (osdmon()->osdmap.require_osd_release >= CEPH_RELEASE_LUMINOUS) { - get_health_status(false, f.get(), nullptr); - } else { - list health_str; - get_health(health_str, nullptr, f.get()); - } - - { - stringstream ss; - get_mon_status(f.get(), ss); - } - - f->close_section(); - stringstream ss; - f->flush(ss); - ::encode(ss.str(), payload); - reply->set_payload(payload); - dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl; - messenger->send_message(reply, inst); -} - -void Monitor::timecheck_start() -{ - dout(10) << __func__ << dendl; - timecheck_cleanup(); - timecheck_start_round(); -} - -void Monitor::timecheck_finish() -{ - dout(10) << __func__ << dendl; - timecheck_cleanup(); -} - -void Monitor::timecheck_start_round() -{ - dout(10) << __func__ << " curr " << timecheck_round << dendl; - assert(is_leader()); - - if (monmap->size() == 1) { - assert(0 == "We are alone; this shouldn't have been scheduled!"); - return; - } - - if (timecheck_round % 2) { - dout(10) << __func__ << " there's a timecheck going on" << dendl; - utime_t curr_time = ceph_clock_now(); - double max = g_conf->mon_timecheck_interval*3; - if (curr_time - timecheck_round_start < max) { - dout(10) << __func__ << " keep current round going" << dendl; - goto out; - } else { - dout(10) << __func__ - << " finish current timecheck and start new" << dendl; - timecheck_cancel_round(); - } - } - - assert(timecheck_round % 2 == 0); - timecheck_acks = 0; - timecheck_round ++; - timecheck_round_start = ceph_clock_now(); - dout(10) << __func__ << " new " << timecheck_round << dendl; - - timecheck(); -out: - dout(10) << __func__ << " setting up next event" << dendl; - timecheck_reset_event(); -} - -void Monitor::timecheck_finish_round(bool success) -{ - dout(10) << __func__ << " curr " << timecheck_round << dendl; - assert(timecheck_round % 2); - timecheck_round ++; - timecheck_round_start = utime_t(); - - if (success) { - assert(timecheck_waiting.empty()); - assert(timecheck_acks == quorum.size()); - timecheck_report(); - timecheck_check_skews(); - return; - } - - dout(10) << __func__ << " " << timecheck_waiting.size() - << " peers still waiting:"; - for (map::iterator p = timecheck_waiting.begin(); - p != timecheck_waiting.end(); ++p) { - *_dout << " " << p->first.name; - } - *_dout << dendl; - timecheck_waiting.clear(); - - dout(10) << __func__ << " finished to " << timecheck_round << dendl; -} - -void Monitor::timecheck_cancel_round() -{ - timecheck_finish_round(false); -} - -void Monitor::timecheck_cleanup() -{ - timecheck_round = 0; - timecheck_acks = 0; - timecheck_round_start = utime_t(); - - if (timecheck_event) { - timer.cancel_event(timecheck_event); - timecheck_event = NULL; - } - timecheck_waiting.clear(); - timecheck_skews.clear(); - timecheck_latencies.clear(); - - timecheck_rounds_since_clean = 0; -} - -void Monitor::timecheck_reset_event() -{ - if (timecheck_event) { - timer.cancel_event(timecheck_event); - timecheck_event = NULL; - } - - double delay = - cct->_conf->mon_timecheck_skew_interval * timecheck_rounds_since_clean; - - if (delay <= 0 || delay > cct->_conf->mon_timecheck_interval) { - delay = cct->_conf->mon_timecheck_interval; - } - - dout(10) << __func__ << " delay " << delay - << " rounds_since_clean " << timecheck_rounds_since_clean - << dendl; - - timecheck_event = timer.add_event_after( - delay, - new C_MonContext(this, [this](int) { - timecheck_start_round(); - })); -} - -void Monitor::timecheck_check_skews() -{ - dout(10) << __func__ << dendl; - assert(is_leader()); - assert((timecheck_round % 2) == 0); - if (monmap->size() == 1) { - assert(0 == "We are alone; we shouldn't have gotten here!"); - return; - } - assert(timecheck_latencies.size() == timecheck_skews.size()); - - bool found_skew = false; - for (map::iterator p = timecheck_skews.begin(); - p != timecheck_skews.end(); ++p) { - - double abs_skew; - if (timecheck_has_skew(p->second, &abs_skew)) { - dout(10) << __func__ - << " " << p->first << " skew " << abs_skew << dendl; - found_skew = true; - } - } - - if (found_skew) { - ++timecheck_rounds_since_clean; - timecheck_reset_event(); - } else if (timecheck_rounds_since_clean > 0) { - dout(1) << __func__ - << " no clock skews found after " << timecheck_rounds_since_clean - << " rounds" << dendl; - // make sure the skews are really gone and not just a transient success - // this will run just once if not in the presence of skews again. - timecheck_rounds_since_clean = 1; - timecheck_reset_event(); - timecheck_rounds_since_clean = 0; - } - -} - -void Monitor::timecheck_report() -{ - dout(10) << __func__ << dendl; - assert(is_leader()); - assert((timecheck_round % 2) == 0); - if (monmap->size() == 1) { - assert(0 == "We are alone; we shouldn't have gotten here!"); - return; - } - - assert(timecheck_latencies.size() == timecheck_skews.size()); - bool do_output = true; // only output report once - for (set::iterator q = quorum.begin(); q != quorum.end(); ++q) { - if (monmap->get_name(*q) == name) - continue; - - MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_REPORT); - m->epoch = get_epoch(); - m->round = timecheck_round; - - for (map::iterator it = timecheck_skews.begin(); - it != timecheck_skews.end(); ++it) { - double skew = it->second; - double latency = timecheck_latencies[it->first]; - - m->skews[it->first] = skew; - m->latencies[it->first] = latency; - - if (do_output) { - dout(25) << __func__ << " " << it->first - << " latency " << latency - << " skew " << skew << dendl; - } - } - do_output = false; - entity_inst_t inst = monmap->get_inst(*q); - dout(10) << __func__ << " send report to " << inst << dendl; - messenger->send_message(m, inst); - } -} - -void Monitor::timecheck() -{ - dout(10) << __func__ << dendl; - assert(is_leader()); - if (monmap->size() == 1) { - assert(0 == "We are alone; we shouldn't have gotten here!"); - return; - } - assert(timecheck_round % 2 != 0); - - timecheck_acks = 1; // we ack ourselves - - dout(10) << __func__ << " start timecheck epoch " << get_epoch() - << " round " << timecheck_round << dendl; - - // we are at the eye of the storm; the point of reference - timecheck_skews[messenger->get_myinst()] = 0.0; - timecheck_latencies[messenger->get_myinst()] = 0.0; - - for (set::iterator it = quorum.begin(); it != quorum.end(); ++it) { - if (monmap->get_name(*it) == name) - continue; - - entity_inst_t inst = monmap->get_inst(*it); - utime_t curr_time = ceph_clock_now(); - timecheck_waiting[inst] = curr_time; - MTimeCheck *m = new MTimeCheck(MTimeCheck::OP_PING); - m->epoch = get_epoch(); - m->round = timecheck_round; - dout(10) << __func__ << " send " << *m << " to " << inst << dendl; - messenger->send_message(m, inst); - } -} - -health_status_t Monitor::timecheck_status(ostringstream &ss, - const double skew_bound, - const double latency) -{ - health_status_t status = HEALTH_OK; - assert(latency >= 0); - - double abs_skew; - if (timecheck_has_skew(skew_bound, &abs_skew)) { - status = HEALTH_WARN; - ss << "clock skew " << abs_skew << "s" - << " > max " << g_conf->mon_clock_drift_allowed << "s"; - } - - return status; -} - -void Monitor::handle_timecheck_leader(MonOpRequestRef op) -{ - MTimeCheck *m = static_cast(op->get_req()); - dout(10) << __func__ << " " << *m << dendl; - /* handles PONG's */ - assert(m->op == MTimeCheck::OP_PONG); - - entity_inst_t other = m->get_source_inst(); - if (m->epoch < get_epoch()) { - dout(1) << __func__ << " got old timecheck epoch " << m->epoch - << " from " << other - << " curr " << get_epoch() - << " -- severely lagged? discard" << dendl; - return; - } - assert(m->epoch == get_epoch()); - - if (m->round < timecheck_round) { - dout(1) << __func__ << " got old round " << m->round - << " from " << other - << " curr " << timecheck_round << " -- discard" << dendl; - return; - } - - utime_t curr_time = ceph_clock_now(); - - assert(timecheck_waiting.count(other) > 0); - utime_t timecheck_sent = timecheck_waiting[other]; - timecheck_waiting.erase(other); - if (curr_time < timecheck_sent) { - // our clock was readjusted -- drop everything until it all makes sense. - dout(1) << __func__ << " our clock was readjusted --" - << " bump round and drop current check" - << dendl; - timecheck_cancel_round(); - return; - } - - /* update peer latencies */ - double latency = (double)(curr_time - timecheck_sent); - - if (timecheck_latencies.count(other) == 0) - timecheck_latencies[other] = latency; - else { - double avg_latency = ((timecheck_latencies[other]*0.8)+(latency*0.2)); - timecheck_latencies[other] = avg_latency; - } - - /* - * update skews - * - * some nasty thing goes on if we were to do 'a - b' between two utime_t, - * and 'a' happens to be lower than 'b'; so we use double instead. - * - * latency is always expected to be >= 0. - * - * delta, the difference between theirs timestamp and ours, may either be - * lower or higher than 0; will hardly ever be 0. - * - * The absolute skew is the absolute delta minus the latency, which is - * taken as a whole instead of an rtt given that there is some queueing - * and dispatch times involved and it's hard to assess how long exactly - * it took for the message to travel to the other side and be handled. So - * we call it a bounded skew, the worst case scenario. - * - * Now, to math! - * - * Given that the latency is always positive, we can establish that the - * bounded skew will be: - * - * 1. positive if the absolute delta is higher than the latency and - * delta is positive - * 2. negative if the absolute delta is higher than the latency and - * delta is negative. - * 3. zero if the absolute delta is lower than the latency. - * - * On 3. we make a judgement call and treat the skew as non-existent. - * This is because that, if the absolute delta is lower than the - * latency, then the apparently existing skew is nothing more than a - * side-effect of the high latency at work. - * - * This may not be entirely true though, as a severely skewed clock - * may be masked by an even higher latency, but with high latencies - * we probably have worse issues to deal with than just skewed clocks. - */ - assert(latency >= 0); - - double delta = ((double) m->timestamp) - ((double) curr_time); - double abs_delta = (delta > 0 ? delta : -delta); - double skew_bound = abs_delta - latency; - if (skew_bound < 0) - skew_bound = 0; - else if (delta < 0) - skew_bound = -skew_bound; - - ostringstream ss; - health_status_t status = timecheck_status(ss, skew_bound, latency); - clog->health(status) << other << " " << ss.str(); - - dout(10) << __func__ << " from " << other << " ts " << m->timestamp - << " delta " << delta << " skew_bound " << skew_bound - << " latency " << latency << dendl; - - timecheck_skews[other] = skew_bound; - - timecheck_acks++; - if (timecheck_acks == quorum.size()) { - dout(10) << __func__ << " got pongs from everybody (" - << timecheck_acks << " total)" << dendl; - assert(timecheck_skews.size() == timecheck_acks); - assert(timecheck_waiting.empty()); - // everyone has acked, so bump the round to finish it. - timecheck_finish_round(); - } -} - -void Monitor::handle_timecheck_peon(MonOpRequestRef op) -{ - MTimeCheck *m = static_cast(op->get_req()); - dout(10) << __func__ << " " << *m << dendl; - - assert(is_peon()); - assert(m->op == MTimeCheck::OP_PING || m->op == MTimeCheck::OP_REPORT); - - if (m->epoch != get_epoch()) { - dout(1) << __func__ << " got wrong epoch " - << "(ours " << get_epoch() - << " theirs: " << m->epoch << ") -- discarding" << dendl; - return; - } - - if (m->round < timecheck_round) { - dout(1) << __func__ << " got old round " << m->round - << " current " << timecheck_round - << " (epoch " << get_epoch() << ") -- discarding" << dendl; - return; - } - - timecheck_round = m->round; - - if (m->op == MTimeCheck::OP_REPORT) { - assert((timecheck_round % 2) == 0); - timecheck_latencies.swap(m->latencies); - timecheck_skews.swap(m->skews); - return; - } - - assert((timecheck_round % 2) != 0); - MTimeCheck *reply = new MTimeCheck(MTimeCheck::OP_PONG); - utime_t curr_time = ceph_clock_now(); - reply->timestamp = curr_time; - reply->epoch = m->epoch; - reply->round = m->round; - dout(10) << __func__ << " send " << *m - << " to " << m->get_source_inst() << dendl; - m->get_connection()->send_message(reply); -} - -void Monitor::handle_timecheck(MonOpRequestRef op) -{ - MTimeCheck *m = static_cast(op->get_req()); - dout(10) << __func__ << " " << *m << dendl; - - if (is_leader()) { - if (m->op != MTimeCheck::OP_PONG) { - dout(1) << __func__ << " drop unexpected msg (not pong)" << dendl; - } else { - handle_timecheck_leader(op); - } - } else if (is_peon()) { - if (m->op != MTimeCheck::OP_PING && m->op != MTimeCheck::OP_REPORT) { - dout(1) << __func__ << " drop unexpected msg (not ping or report)" << dendl; - } else { - handle_timecheck_peon(op); - } - } else { - dout(1) << __func__ << " drop unexpected msg" << dendl; - } -} - -void Monitor::handle_subscribe(MonOpRequestRef op) -{ - MMonSubscribe *m = static_cast(op->get_req()); - dout(10) << "handle_subscribe " << *m << dendl; - - bool reply = false; - - MonSession *s = op->get_session(); - assert(s); - - for (map::iterator p = m->what.begin(); - p != m->what.end(); - ++p) { - // if there are any non-onetime subscriptions, we need to reply to start the resubscribe timer - if ((p->second.flags & CEPH_SUBSCRIBE_ONETIME) == 0) - reply = true; - - // remove conflicting subscribes - if (logmon()->sub_name_to_id(p->first) >= 0) { - for (map::iterator it = s->sub_map.begin(); - it != s->sub_map.end(); ) { - if (it->first != p->first && logmon()->sub_name_to_id(it->first) >= 0) { - Mutex::Locker l(session_map_lock); - session_map.remove_sub((it++)->second); - } else { - ++it; - } - } - } - - { - Mutex::Locker l(session_map_lock); - session_map.add_update_sub(s, p->first, p->second.start, - p->second.flags & CEPH_SUBSCRIBE_ONETIME, - m->get_connection()->has_feature(CEPH_FEATURE_INCSUBOSDMAP)); - } - - if (p->first.compare(0, 6, "mdsmap") == 0 || p->first.compare(0, 5, "fsmap") == 0) { - dout(10) << __func__ << ": MDS sub '" << p->first << "'" << dendl; - if ((int)s->is_capable("mds", MON_CAP_R)) { - Subscription *sub = s->sub_map[p->first]; - assert(sub != nullptr); - mdsmon()->check_sub(sub); - } - } else if (p->first == "osdmap") { - if ((int)s->is_capable("osd", MON_CAP_R)) { - if (s->osd_epoch > p->second.start) { - // client needs earlier osdmaps on purpose, so reset the sent epoch - s->osd_epoch = 0; - } - osdmon()->check_osdmap_sub(s->sub_map["osdmap"]); - } - } else if (p->first == "osd_pg_creates") { - if ((int)s->is_capable("osd", MON_CAP_W)) { - if (monmap->get_required_features().contains_all( - ceph::features::mon::FEATURE_LUMINOUS)) { - osdmon()->check_pg_creates_sub(s->sub_map["osd_pg_creates"]); - } else { - pgmon()->check_sub(s->sub_map["osd_pg_creates"]); - } - } - } else if (p->first == "monmap") { - monmon()->check_sub(s->sub_map[p->first]); - } else if (logmon()->sub_name_to_id(p->first) >= 0) { - logmon()->check_sub(s->sub_map[p->first]); - } else if (p->first == "mgrmap" || p->first == "mgrdigest") { - mgrmon()->check_sub(s->sub_map[p->first]); - } else if (p->first == "servicemap") { - mgrstatmon()->check_sub(s->sub_map[p->first]); - } - } - - if (reply) { - // we only need to reply if the client is old enough to think it - // has to send renewals. - ConnectionRef con = m->get_connection(); - if (!con->has_feature(CEPH_FEATURE_MON_STATEFUL_SUB)) - m->get_connection()->send_message(new MMonSubscribeAck( - monmap->get_fsid(), (int)g_conf->mon_subscribe_interval)); - } - -} - -void Monitor::handle_get_version(MonOpRequestRef op) -{ - MMonGetVersion *m = static_cast(op->get_req()); - dout(10) << "handle_get_version " << *m << dendl; - PaxosService *svc = NULL; - - MonSession *s = op->get_session(); - assert(s); - - if (!is_leader() && !is_peon()) { - dout(10) << " waiting for quorum" << dendl; - waitfor_quorum.push_back(new C_RetryMessage(this, op)); - goto out; - } - - if (m->what == "mdsmap") { - svc = mdsmon(); - } else if (m->what == "fsmap") { - svc = mdsmon(); - } else if (m->what == "osdmap") { - svc = osdmon(); - } else if (m->what == "monmap") { - svc = monmon(); - } else { - derr << "invalid map type " << m->what << dendl; - } - - if (svc) { - if (!svc->is_readable()) { - svc->wait_for_readable(op, new C_RetryMessage(this, op)); - goto out; - } - - MMonGetVersionReply *reply = new MMonGetVersionReply(); - reply->handle = m->handle; - reply->version = svc->get_last_committed(); - reply->oldest_version = svc->get_first_committed(); - reply->set_tid(m->get_tid()); - - m->get_connection()->send_message(reply); - } - out: - return; -} - -bool Monitor::ms_handle_reset(Connection *con) -{ - dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl; - - // ignore lossless monitor sessions - if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) - return false; - - MonSession *s = static_cast(con->get_priv()); - if (!s) - return false; - - // break any con <-> session ref cycle - s->con->set_priv(NULL); - - if (is_shutdown()) - return false; - - Mutex::Locker l(lock); - - dout(10) << "reset/close on session " << s->inst << dendl; - if (!s->closed) { - Mutex::Locker l(session_map_lock); - remove_session(s); - } - s->put(); - return true; -} - -bool Monitor::ms_handle_refused(Connection *con) -{ - // just log for now... - dout(10) << "ms_handle_refused " << con << " " << con->get_peer_addr() << dendl; - return false; -} - -// ----- - -void Monitor::send_latest_monmap(Connection *con) -{ - bufferlist bl; - monmap->encode(bl, con->get_features()); - con->send_message(new MMonMap(bl)); -} - -void Monitor::handle_mon_get_map(MonOpRequestRef op) -{ - MMonGetMap *m = static_cast(op->get_req()); - dout(10) << "handle_mon_get_map" << dendl; - send_latest_monmap(m->get_connection().get()); -} - -void Monitor::handle_mon_metadata(MonOpRequestRef op) -{ - MMonMetadata *m = static_cast(op->get_req()); - if (is_leader()) { - dout(10) << __func__ << dendl; - update_mon_metadata(m->get_source().num(), std::move(m->data)); - } -} - -void Monitor::update_mon_metadata(int from, Metadata&& m) -{ - // NOTE: this is now for legacy (kraken or jewel) mons only. - pending_metadata[from] = std::move(m); - - MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); - bufferlist bl; - ::encode(pending_metadata, bl); - t->put(MONITOR_STORE_PREFIX, "last_metadata", bl); - paxos->trigger_propose(); -} - -int Monitor::load_metadata() -{ - bufferlist bl; - int r = store->get(MONITOR_STORE_PREFIX, "last_metadata", bl); - if (r) - return r; - bufferlist::iterator it = bl.begin(); - ::decode(mon_metadata, it); - - pending_metadata = mon_metadata; - return 0; -} - -int Monitor::get_mon_metadata(int mon, Formatter *f, ostream& err) -{ - assert(f); - if (!mon_metadata.count(mon)) { - err << "mon." << mon << " not found"; - return -EINVAL; - } - const Metadata& m = mon_metadata[mon]; - for (Metadata::const_iterator p = m.begin(); p != m.end(); ++p) { - f->dump_string(p->first.c_str(), p->second); - } - return 0; -} - -void Monitor::count_metadata(const string& field, map *out) -{ - for (auto& p : mon_metadata) { - auto q = p.second.find(field); - if (q == p.second.end()) { - (*out)["unknown"]++; - } else { - (*out)[q->second]++; - } - } -} - -void Monitor::count_metadata(const string& field, Formatter *f) -{ - map by_val; - count_metadata(field, &by_val); - f->open_object_section(field.c_str()); - for (auto& p : by_val) { - f->dump_int(p.first.c_str(), p.second); - } - f->close_section(); -} - -int Monitor::print_nodes(Formatter *f, ostream& err) -{ - map > mons; // hostname => mon - for (map::iterator it = mon_metadata.begin(); - it != mon_metadata.end(); ++it) { - const Metadata& m = it->second; - Metadata::const_iterator hostname = m.find("hostname"); - if (hostname == m.end()) { - // not likely though - continue; - } - mons[hostname->second].push_back(it->first); - } - - dump_services(f, mons, "mon"); - return 0; -} - -// ---------------------------------------------- -// scrub - -int Monitor::scrub_start() -{ - dout(10) << __func__ << dendl; - assert(is_leader()); - - if (!scrub_result.empty()) { - clog->info() << "scrub already in progress"; - return -EBUSY; - } - - scrub_event_cancel(); - scrub_result.clear(); - scrub_state.reset(new ScrubState); - - scrub(); - return 0; -} - -int Monitor::scrub() -{ - assert(is_leader()); - assert(scrub_state); - - scrub_cancel_timeout(); - wait_for_paxos_write(); - scrub_version = paxos->get_version(); - - - // scrub all keys if we're the only monitor in the quorum - int32_t num_keys = - (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys); - - for (set::iterator p = quorum.begin(); - p != quorum.end(); - ++p) { - if (*p == rank) - continue; - MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version, - num_keys); - r->key = scrub_state->last_key; - messenger->send_message(r, monmap->get_inst(*p)); - } - - // scrub my keys - bool r = _scrub(&scrub_result[rank], - &scrub_state->last_key, - &num_keys); - - scrub_state->finished = !r; - - // only after we got our scrub results do we really care whether the - // other monitors are late on their results. Also, this way we avoid - // triggering the timeout if we end up getting stuck in _scrub() for - // longer than the duration of the timeout. - scrub_reset_timeout(); - - if (quorum.size() == 1) { - assert(scrub_state->finished == true); - scrub_finish(); - } - return 0; -} - -void Monitor::handle_scrub(MonOpRequestRef op) -{ - MMonScrub *m = static_cast(op->get_req()); - dout(10) << __func__ << " " << *m << dendl; - switch (m->op) { - case MMonScrub::OP_SCRUB: - { - if (!is_peon()) - break; - - wait_for_paxos_write(); - - if (m->version != paxos->get_version()) - break; - - MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT, - m->version, - m->num_keys); - - reply->key = m->key; - _scrub(&reply->result, &reply->key, &reply->num_keys); - m->get_connection()->send_message(reply); - } - break; - - case MMonScrub::OP_RESULT: - { - if (!is_leader()) - break; - if (m->version != scrub_version) - break; - // reset the timeout each time we get a result - scrub_reset_timeout(); - - int from = m->get_source().num(); - assert(scrub_result.count(from) == 0); - scrub_result[from] = m->result; - - if (scrub_result.size() == quorum.size()) { - scrub_check_results(); - scrub_result.clear(); - if (scrub_state->finished) - scrub_finish(); - else - scrub(); - } - } - break; - } -} - -bool Monitor::_scrub(ScrubResult *r, - pair *start, - int *num_keys) -{ - assert(r != NULL); - assert(start != NULL); - assert(num_keys != NULL); - - set prefixes = get_sync_targets_names(); - prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc. - - dout(10) << __func__ << " start (" << *start << ")" - << " num_keys " << *num_keys << dendl; - - MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes); - - int scrubbed_keys = 0; - pair last_key; - - while (it->has_next_chunk()) { - - if (*num_keys > 0 && scrubbed_keys == *num_keys) - break; - - pair k = it->get_next_key(); - if (prefixes.count(k.first) == 0) - continue; - - if (cct->_conf->mon_scrub_inject_missing_keys > 0.0 && - (rand() % 10000 < cct->_conf->mon_scrub_inject_missing_keys*10000.0)) { - dout(10) << __func__ << " inject missing key, skipping (" << k << ")" - << dendl; - continue; - } - - bufferlist bl; - int err = store->get(k.first, k.second, bl); - assert(err == 0); - - uint32_t key_crc = bl.crc32c(0); - dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes" - << " crc " << key_crc << dendl; - r->prefix_keys[k.first]++; - if (r->prefix_crc.count(k.first) == 0) { - r->prefix_crc[k.first] = 0; - } - r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]); - - if (cct->_conf->mon_scrub_inject_crc_mismatch > 0.0 && - (rand() % 10000 < cct->_conf->mon_scrub_inject_crc_mismatch*10000.0)) { - dout(10) << __func__ << " inject failure at (" << k << ")" << dendl; - r->prefix_crc[k.first] += 1; - } - - ++scrubbed_keys; - last_key = k; - } - - dout(20) << __func__ << " last_key (" << last_key << ")" - << " scrubbed_keys " << scrubbed_keys - << " has_next " << it->has_next_chunk() << dendl; - - *start = last_key; - *num_keys = scrubbed_keys; - - return it->has_next_chunk(); -} - -void Monitor::scrub_check_results() -{ - dout(10) << __func__ << dendl; - - // compare - int errors = 0; - ScrubResult& mine = scrub_result[rank]; - for (map::iterator p = scrub_result.begin(); - p != scrub_result.end(); - ++p) { - if (p->first == rank) - continue; - if (p->second != mine) { - ++errors; - clog->error() << "scrub mismatch"; - clog->error() << " mon." << rank << " " << mine; - clog->error() << " mon." << p->first << " " << p->second; - } - } - if (!errors) - clog->debug() << "scrub ok on " << quorum << ": " << mine; -} - -inline void Monitor::scrub_timeout() -{ - dout(1) << __func__ << " restarting scrub" << dendl; - scrub_reset(); - scrub_start(); -} - -void Monitor::scrub_finish() -{ - dout(10) << __func__ << dendl; - scrub_reset(); - scrub_event_start(); -} - -void Monitor::scrub_reset() -{ - dout(10) << __func__ << dendl; - scrub_cancel_timeout(); - scrub_version = 0; - scrub_result.clear(); - scrub_state.reset(); -} - -inline void Monitor::scrub_update_interval(int secs) -{ - // we don't care about changes if we are not the leader. - // changes will be visible if we become the leader. - if (!is_leader()) - return; - - dout(1) << __func__ << " new interval = " << secs << dendl; - - // if scrub already in progress, all changes will already be visible during - // the next round. Nothing to do. - if (scrub_state != NULL) - return; - - scrub_event_cancel(); - scrub_event_start(); -} - -void Monitor::scrub_event_start() -{ - dout(10) << __func__ << dendl; - - if (scrub_event) - scrub_event_cancel(); - - if (cct->_conf->mon_scrub_interval <= 0) { - dout(1) << __func__ << " scrub event is disabled" - << " (mon_scrub_interval = " << cct->_conf->mon_scrub_interval - << ")" << dendl; - return; - } - - scrub_event = timer.add_event_after( - cct->_conf->mon_scrub_interval, - new C_MonContext(this, [this](int) { - scrub_start(); - })); -} - -void Monitor::scrub_event_cancel() -{ - dout(10) << __func__ << dendl; - if (scrub_event) { - timer.cancel_event(scrub_event); - scrub_event = NULL; - } -} - -inline void Monitor::scrub_cancel_timeout() -{ - if (scrub_timeout_event) { - timer.cancel_event(scrub_timeout_event); - scrub_timeout_event = NULL; - } -} - -void Monitor::scrub_reset_timeout() -{ - dout(15) << __func__ << " reset timeout event" << dendl; - scrub_cancel_timeout(); - scrub_timeout_event = timer.add_event_after( - g_conf->mon_scrub_timeout, - new C_MonContext(this, [this](int) { - scrub_timeout(); - })); -} - -/************ TICK ***************/ -void Monitor::new_tick() -{ - timer.add_event_after(g_conf->mon_tick_interval, new C_MonContext(this, [this](int) { - tick(); - })); -} - -void Monitor::tick() -{ - // ok go. - dout(11) << "tick" << dendl; - const utime_t now = ceph_clock_now(); - - // Check if we need to emit any delayed health check updated messages - if (is_leader()) { - const auto min_period = g_conf->get_val( - "mon_health_log_update_period"); - for (auto& svc : paxos_service) { - auto health = svc->get_health_checks(); - - for (const auto &i : health.checks) { - const std::string &code = i.first; - const std::string &summary = i.second.summary; - const health_status_t severity = i.second.severity; - - auto status_iter = health_check_log_times.find(code); - if (status_iter == health_check_log_times.end()) { - continue; - } - - auto &log_status = status_iter->second; - bool const changed = log_status.last_message != summary - || log_status.severity != severity; - - if (changed && now - log_status.updated_at > min_period) { - log_status.last_message = summary; - log_status.updated_at = now; - log_status.severity = severity; - - ostringstream ss; - ss << "Health check update: " << summary << " (" << code << ")"; - clog->health(severity) << ss.str(); - } - } - } - } - - - for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) { - (*p)->tick(); - (*p)->maybe_trim(); - } - - // trim sessions - { - Mutex::Locker l(session_map_lock); - auto p = session_map.sessions.begin(); - - bool out_for_too_long = (!exited_quorum.is_zero() && - now > (exited_quorum + 2*g_conf->mon_lease)); - - while (!p.end()) { - MonSession *s = *p; - ++p; - - // don't trim monitors - if (s->inst.name.is_mon()) - continue; - - if (s->session_timeout < now && s->con) { - // check keepalive, too - s->session_timeout = s->con->get_last_keepalive(); - s->session_timeout += g_conf->mon_session_timeout; - } - if (s->session_timeout < now) { - dout(10) << " trimming session " << s->con << " " << s->inst - << " (timeout " << s->session_timeout - << " < now " << now << ")" << dendl; - } else if (out_for_too_long) { - // boot the client Session because we've taken too long getting back in - dout(10) << " trimming session " << s->con << " " << s->inst - << " because we've been out of quorum too long" << dendl; - } else { - continue; - } - - s->con->mark_down(); - remove_session(s); - logger->inc(l_mon_session_trim); - } - } - sync_trim_providers(); - - if (!maybe_wait_for_quorum.empty()) { - finish_contexts(g_ceph_context, maybe_wait_for_quorum); - } - - if (is_leader() && paxos->is_active() && fingerprint.is_zero()) { - // this is only necessary on upgraded clusters. - MonitorDBStore::TransactionRef t = paxos->get_pending_transaction(); - prepare_new_fingerprint(t); - paxos->trigger_propose(); - } - - new_tick(); -} - -void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t) -{ - uuid_d nf; - nf.generate_random(); - dout(10) << __func__ << " proposing cluster_fingerprint " << nf << dendl; - - bufferlist bl; - ::encode(nf, bl); - t->put(MONITOR_NAME, "cluster_fingerprint", bl); -} - -int Monitor::check_fsid() -{ - bufferlist ebl; - int r = store->get(MONITOR_NAME, "cluster_uuid", ebl); - if (r == -ENOENT) - return r; - assert(r == 0); - - string es(ebl.c_str(), ebl.length()); - - // only keep the first line - size_t pos = es.find_first_of('\n'); - if (pos != string::npos) - es.resize(pos); - - dout(10) << "check_fsid cluster_uuid contains '" << es << "'" << dendl; - uuid_d ondisk; - if (!ondisk.parse(es.c_str())) { - derr << "error: unable to parse uuid" << dendl; - return -EINVAL; - } - - if (monmap->get_fsid() != ondisk) { - derr << "error: cluster_uuid file exists with value " << ondisk - << ", != our uuid " << monmap->get_fsid() << dendl; - return -EEXIST; - } - - return 0; -} - -int Monitor::write_fsid() -{ - auto t(std::make_shared()); - write_fsid(t); - int r = store->apply_transaction(t); - return r; -} - -int Monitor::write_fsid(MonitorDBStore::TransactionRef t) -{ - ostringstream ss; - ss << monmap->get_fsid() << "\n"; - string us = ss.str(); - - bufferlist b; - b.append(us); - - t->put(MONITOR_NAME, "cluster_uuid", b); - return 0; -} - -/* - * this is the closest thing to a traditional 'mkfs' for ceph. - * initialize the monitor state machines to their initial values. - */ -int Monitor::mkfs(bufferlist& osdmapbl) -{ - auto t(std::make_shared()); - - // verify cluster fsid - int r = check_fsid(); - if (r < 0 && r != -ENOENT) - return r; - - bufferlist magicbl; - magicbl.append(CEPH_MON_ONDISK_MAGIC); - magicbl.append("\n"); - t->put(MONITOR_NAME, "magic", magicbl); - - - features = get_initial_supported_features(); - write_features(t); - - // save monmap, osdmap, keyring. - bufferlist monmapbl; - monmap->encode(monmapbl, CEPH_FEATURES_ALL); - monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos() - t->put("mkfs", "monmap", monmapbl); - - if (osdmapbl.length()) { - // make sure it's a valid osdmap - try { - OSDMap om; - om.decode(osdmapbl); - } - catch (buffer::error& e) { - derr << "error decoding provided osdmap: " << e.what() << dendl; - return -EINVAL; - } - t->put("mkfs", "osdmap", osdmapbl); - } - - if (is_keyring_required()) { - KeyRing keyring; - string keyring_filename; - - r = ceph_resolve_file_search(g_conf->keyring, keyring_filename); - if (r) { - derr << "unable to find a keyring file on " << g_conf->keyring - << ": " << cpp_strerror(r) << dendl; - if (g_conf->key != "") { - string keyring_plaintext = "[mon.]\n\tkey = " + g_conf->key + - "\n\tcaps mon = \"allow *\"\n"; - bufferlist bl; - bl.append(keyring_plaintext); - try { - bufferlist::iterator i = bl.begin(); - keyring.decode_plaintext(i); - } - catch (const buffer::error& e) { - derr << "error decoding keyring " << keyring_plaintext - << ": " << e.what() << dendl; - return -EINVAL; - } - } else { - return -ENOENT; - } - } else { - r = keyring.load(g_ceph_context, keyring_filename); - if (r < 0) { - derr << "unable to load initial keyring " << g_conf->keyring << dendl; - return r; - } - } - - // put mon. key in external keyring; seed with everything else. - extract_save_mon_key(keyring); - - bufferlist keyringbl; - keyring.encode_plaintext(keyringbl); - t->put("mkfs", "keyring", keyringbl); - } - write_fsid(t); - store->apply_transaction(t); - - return 0; -} - -int Monitor::write_default_keyring(bufferlist& bl) -{ - ostringstream os; - os << g_conf->mon_data << "/keyring"; - - int err = 0; - int fd = ::open(os.str().c_str(), O_WRONLY|O_CREAT, 0600); - if (fd < 0) { - err = -errno; - dout(0) << __func__ << " failed to open " << os.str() - << ": " << cpp_strerror(err) << dendl; - return err; - } - - err = bl.write_fd(fd); - if (!err) - ::fsync(fd); - VOID_TEMP_FAILURE_RETRY(::close(fd)); - - return err; -} - -void Monitor::extract_save_mon_key(KeyRing& keyring) -{ - EntityName mon_name; - mon_name.set_type(CEPH_ENTITY_TYPE_MON); - EntityAuth mon_key; - if (keyring.get_auth(mon_name, mon_key)) { - dout(10) << "extract_save_mon_key moving mon. key to separate keyring" << dendl; - KeyRing pkey; - pkey.add(mon_name, mon_key); - bufferlist bl; - pkey.encode_plaintext(bl); - write_default_keyring(bl); - keyring.remove(mon_name); - } -} - -bool Monitor::ms_get_authorizer(int service_id, AuthAuthorizer **authorizer, - bool force_new) -{ - dout(10) << "ms_get_authorizer for " << ceph_entity_type_name(service_id) - << dendl; - - if (is_shutdown()) - return false; - - // we only connect to other monitors and mgr; every else connects to us. - if (service_id != CEPH_ENTITY_TYPE_MON && - service_id != CEPH_ENTITY_TYPE_MGR) - return false; - - if (!auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) { - // auth_none - dout(20) << __func__ << " building auth_none authorizer" << dendl; - AuthNoneClientHandler handler(g_ceph_context, nullptr); - handler.set_global_id(0); - *authorizer = handler.build_authorizer(service_id); - return true; - } - - CephXServiceTicketInfo auth_ticket_info; - CephXSessionAuthInfo info; - int ret; - - EntityName name; - name.set_type(CEPH_ENTITY_TYPE_MON); - auth_ticket_info.ticket.name = name; - auth_ticket_info.ticket.global_id = 0; - - if (service_id == CEPH_ENTITY_TYPE_MON) { - // mon to mon authentication uses the private monitor shared key and not the - // rotating key - CryptoKey secret; - if (!keyring.get_secret(name, secret) && - !key_server.get_secret(name, secret)) { - dout(0) << " couldn't get secret for mon service from keyring or keyserver" - << dendl; - stringstream ss, ds; - int err = key_server.list_secrets(ds); - if (err < 0) - ss << "no installed auth entries!"; - else - ss << "installed auth entries:"; - dout(0) << ss.str() << "\n" << ds.str() << dendl; - return false; - } - - ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info, - secret, (uint64_t)-1); - if (ret < 0) { - dout(0) << __func__ << " failed to build mon session_auth_info " - << cpp_strerror(ret) << dendl; - return false; - } - } else if (service_id == CEPH_ENTITY_TYPE_MGR) { - // mgr - ret = key_server.build_session_auth_info(service_id, auth_ticket_info, info); - if (ret < 0) { - derr << __func__ << " failed to build mgr service session_auth_info " - << cpp_strerror(ret) << dendl; - return false; - } - } else { - ceph_abort(); // see check at top of fn - } - - CephXTicketBlob blob; - if (!cephx_build_service_ticket_blob(cct, info, blob)) { - dout(0) << "ms_get_authorizer failed to build service ticket" << dendl; - return false; - } - bufferlist ticket_data; - ::encode(blob, ticket_data); - - bufferlist::iterator iter = ticket_data.begin(); - CephXTicketHandler handler(g_ceph_context, service_id); - ::decode(handler.ticket, iter); - - handler.session_key = info.session_key; - - *authorizer = handler.build_authorizer(0); - - return true; -} - -bool Monitor::ms_verify_authorizer(Connection *con, int peer_type, - int protocol, bufferlist& authorizer_data, - bufferlist& authorizer_reply, - bool& isvalid, CryptoKey& session_key) -{ - dout(10) << "ms_verify_authorizer " << con->get_peer_addr() - << " " << ceph_entity_type_name(peer_type) - << " protocol " << protocol << dendl; - - if (is_shutdown()) - return false; - - if (peer_type == CEPH_ENTITY_TYPE_MON && - auth_cluster_required.is_supported_auth(CEPH_AUTH_CEPHX)) { - // monitor, and cephx is enabled - isvalid = false; - if (protocol == CEPH_AUTH_CEPHX) { - bufferlist::iterator iter = authorizer_data.begin(); - CephXServiceTicketInfo auth_ticket_info; - - if (authorizer_data.length()) { - bool ret = cephx_verify_authorizer(g_ceph_context, &keyring, iter, - auth_ticket_info, authorizer_reply); - if (ret) { - session_key = auth_ticket_info.session_key; - isvalid = true; - } else { - dout(0) << "ms_verify_authorizer bad authorizer from mon " << con->get_peer_addr() << dendl; - } - } - } else { - dout(0) << "ms_verify_authorizer cephx enabled, but no authorizer (required for mon)" << dendl; - } - } else { - // who cares. - isvalid = true; - } - return true; -}