X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmgr%2FDaemonServer.cc;fp=src%2Fceph%2Fsrc%2Fmgr%2FDaemonServer.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=321a38ad5349496154d1b4c6338e98fef52f0338;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/mgr/DaemonServer.cc b/src/ceph/src/mgr/DaemonServer.cc deleted file mode 100644 index 321a38a..0000000 --- a/src/ceph/src/mgr/DaemonServer.cc +++ /dev/null @@ -1,1495 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 John Spray - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - */ - -#include "DaemonServer.h" - -#include "include/str_list.h" -#include "auth/RotatingKeyRing.h" -#include "json_spirit/json_spirit_writer.h" - -#include "mgr/mgr_commands.h" -#include "mon/MonCommand.h" - -#include "messages/MMgrOpen.h" -#include "messages/MMgrConfigure.h" -#include "messages/MMonMgrReport.h" -#include "messages/MCommand.h" -#include "messages/MCommandReply.h" -#include "messages/MPGStats.h" -#include "messages/MOSDScrub.h" -#include "messages/MOSDForceRecovery.h" -#include "common/errno.h" - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_mgr -#undef dout_prefix -#define dout_prefix *_dout << "mgr.server " << __func__ << " " - - - -DaemonServer::DaemonServer(MonClient *monc_, - Finisher &finisher_, - DaemonStateIndex &daemon_state_, - ClusterState &cluster_state_, - PyModuleRegistry &py_modules_, - LogChannelRef clog_, - LogChannelRef audit_clog_) - : Dispatcher(g_ceph_context), - client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes", - g_conf->get_val("mgr_client_bytes"))), - client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages", - g_conf->get_val("mgr_client_messages"))), - osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes", - g_conf->get_val("mgr_osd_bytes"))), - osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages", - g_conf->get_val("mgr_osd_messages"))), - mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes", - g_conf->get_val("mgr_mds_bytes"))), - mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages", - g_conf->get_val("mgr_mds_messages"))), - mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes", - g_conf->get_val("mgr_mon_bytes"))), - mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages", - g_conf->get_val("mgr_mon_messages"))), - msgr(nullptr), - monc(monc_), - finisher(finisher_), - daemon_state(daemon_state_), - cluster_state(cluster_state_), - py_modules(py_modules_), - clog(clog_), - audit_clog(audit_clog_), - auth_registry(g_ceph_context, - g_conf->auth_supported.empty() ? - g_conf->auth_cluster_required : - g_conf->auth_supported), - lock("DaemonServer"), - pgmap_ready(false) -{ - g_conf->add_observer(this); -} - -DaemonServer::~DaemonServer() { - delete msgr; - g_conf->remove_observer(this); -} - -int DaemonServer::init(uint64_t gid, entity_addr_t client_addr) -{ - // Initialize Messenger - std::string public_msgr_type = g_conf->ms_public_type.empty() ? - g_conf->get_val("ms_type") : g_conf->ms_public_type; - msgr = Messenger::create(g_ceph_context, public_msgr_type, - entity_name_t::MGR(gid), - "mgr", - getpid(), 0); - msgr->set_default_policy(Messenger::Policy::stateless_server(0)); - - // throttle clients - msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT, - client_byte_throttler.get(), - client_msg_throttler.get()); - - // servers - msgr->set_policy_throttlers(entity_name_t::TYPE_OSD, - osd_byte_throttler.get(), - osd_msg_throttler.get()); - msgr->set_policy_throttlers(entity_name_t::TYPE_MDS, - mds_byte_throttler.get(), - mds_msg_throttler.get()); - msgr->set_policy_throttlers(entity_name_t::TYPE_MON, - mon_byte_throttler.get(), - mon_msg_throttler.get()); - - int r = msgr->bind(g_conf->public_addr); - if (r < 0) { - derr << "unable to bind mgr to " << g_conf->public_addr << dendl; - return r; - } - - msgr->set_myname(entity_name_t::MGR(gid)); - msgr->set_addr_unknowns(client_addr); - - msgr->start(); - msgr->add_dispatcher_tail(this); - - started_at = ceph_clock_now(); - - return 0; -} - -entity_addr_t DaemonServer::get_myaddr() const -{ - return msgr->get_myaddr(); -} - - -bool DaemonServer::ms_verify_authorizer(Connection *con, - int peer_type, - int protocol, - ceph::bufferlist& authorizer_data, - ceph::bufferlist& authorizer_reply, - bool& is_valid, - CryptoKey& session_key) -{ - auto handler = auth_registry.get_handler(protocol); - if (!handler) { - dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl; - is_valid = false; - return true; - } - - MgrSessionRef s(new MgrSession(cct)); - s->inst.addr = con->get_peer_addr(); - AuthCapsInfo caps_info; - - RotatingKeyRing *keys = monc->rotating_secrets.get(); - if (keys) { - is_valid = handler->verify_authorizer( - cct, keys, - authorizer_data, - authorizer_reply, s->entity_name, - s->global_id, caps_info, - session_key); - } else { - dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl; - is_valid = false; - } - - if (is_valid) { - if (caps_info.allow_all) { - dout(10) << " session " << s << " " << s->entity_name - << " allow_all" << dendl; - s->caps.set_allow_all(); - } - if (caps_info.caps.length() > 0) { - bufferlist::iterator p = caps_info.caps.begin(); - string str; - try { - ::decode(str, p); - } - catch (buffer::error& e) { - } - bool success = s->caps.parse(str); - if (success) { - dout(10) << " session " << s << " " << s->entity_name - << " has caps " << s->caps << " '" << str << "'" << dendl; - } else { - dout(10) << " session " << s << " " << s->entity_name - << " failed to parse caps '" << str << "'" << dendl; - is_valid = false; - } - } - con->set_priv(s->get()); - - if (peer_type == CEPH_ENTITY_TYPE_OSD) { - Mutex::Locker l(lock); - s->osd_id = atoi(s->entity_name.get_id().c_str()); - dout(10) << "registering osd." << s->osd_id << " session " - << s << " con " << con << dendl; - osd_cons[s->osd_id].insert(con); - } - } - - return true; -} - - -bool DaemonServer::ms_get_authorizer(int dest_type, - AuthAuthorizer **authorizer, bool force_new) -{ - dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl; - - if (dest_type == CEPH_ENTITY_TYPE_MON) { - return true; - } - - if (force_new) { - if (monc->wait_auth_rotating(10) < 0) - return false; - } - - *authorizer = monc->build_authorizer(dest_type); - dout(20) << "got authorizer " << *authorizer << dendl; - return *authorizer != NULL; -} - -bool DaemonServer::ms_handle_reset(Connection *con) -{ - if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) { - MgrSessionRef session(static_cast(con->get_priv())); - if (!session) { - return false; - } - session->put(); // SessionRef takes a ref - Mutex::Locker l(lock); - dout(10) << "unregistering osd." << session->osd_id - << " session " << session << " con " << con << dendl; - osd_cons[session->osd_id].erase(con); - - auto iter = daemon_connections.find(con); - if (iter != daemon_connections.end()) { - daemon_connections.erase(iter); - } - } - return false; -} - -bool DaemonServer::ms_handle_refused(Connection *con) -{ - // do nothing for now - return false; -} - -bool DaemonServer::ms_dispatch(Message *m) -{ - // Note that we do *not* take ::lock here, in order to avoid - // serializing all message handling. It's up to each handler - // to take whatever locks it needs. - switch (m->get_type()) { - case MSG_PGSTATS: - cluster_state.ingest_pgstats(static_cast(m)); - maybe_ready(m->get_source().num()); - m->put(); - return true; - case MSG_MGR_REPORT: - return handle_report(static_cast(m)); - case MSG_MGR_OPEN: - return handle_open(static_cast(m)); - case MSG_COMMAND: - return handle_command(static_cast(m)); - default: - dout(1) << "Unhandled message type " << m->get_type() << dendl; - return false; - }; -} - -void DaemonServer::maybe_ready(int32_t osd_id) -{ - if (pgmap_ready.load()) { - // Fast path: we don't need to take lock because pgmap_ready - // is already set - } else { - Mutex::Locker l(lock); - - if (reported_osds.find(osd_id) == reported_osds.end()) { - dout(4) << "initial report from osd " << osd_id << dendl; - reported_osds.insert(osd_id); - std::set up_osds; - - cluster_state.with_osdmap([&](const OSDMap& osdmap) { - osdmap.get_up_osds(up_osds); - }); - - std::set unreported_osds; - std::set_difference(up_osds.begin(), up_osds.end(), - reported_osds.begin(), reported_osds.end(), - std::inserter(unreported_osds, unreported_osds.begin())); - - if (unreported_osds.size() == 0) { - dout(4) << "all osds have reported, sending PG state to mon" << dendl; - pgmap_ready = true; - reported_osds.clear(); - // Avoid waiting for next tick - send_report(); - } else { - dout(4) << "still waiting for " << unreported_osds.size() << " osds" - " to report in before PGMap is ready" << dendl; - } - } - } -} - -void DaemonServer::shutdown() -{ - dout(10) << "begin" << dendl; - msgr->shutdown(); - msgr->wait(); - dout(10) << "done" << dendl; -} - - - -bool DaemonServer::handle_open(MMgrOpen *m) -{ - Mutex::Locker l(lock); - - DaemonKey key; - if (!m->service_name.empty()) { - key.first = m->service_name; - } else { - key.first = ceph_entity_type_name(m->get_connection()->get_peer_type()); - } - key.second = m->daemon_name; - - dout(4) << "from " << m->get_connection() << " " << key << dendl; - - _send_configure(m->get_connection()); - - DaemonStatePtr daemon; - if (daemon_state.exists(key)) { - daemon = daemon_state.get(key); - } - if (daemon) { - dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl; - Mutex::Locker l(daemon->lock); - daemon->perf_counters.clear(); - } - - if (m->service_daemon) { - if (!daemon) { - dout(4) << "constructing new DaemonState for " << key << dendl; - daemon = std::make_shared(daemon_state.types); - daemon->key = key; - if (m->daemon_metadata.count("hostname")) { - daemon->hostname = m->daemon_metadata["hostname"]; - } - daemon_state.insert(daemon); - } - Mutex::Locker l(daemon->lock); - daemon->service_daemon = true; - daemon->metadata = m->daemon_metadata; - daemon->service_status = m->daemon_status; - - utime_t now = ceph_clock_now(); - auto d = pending_service_map.get_daemon(m->service_name, - m->daemon_name); - if (d->gid != (uint64_t)m->get_source().num()) { - dout(10) << "registering " << key << " in pending_service_map" << dendl; - d->gid = m->get_source().num(); - d->addr = m->get_source_addr(); - d->start_epoch = pending_service_map.epoch; - d->start_stamp = now; - d->metadata = m->daemon_metadata; - pending_service_map_dirty = pending_service_map.epoch; - } - } - - if (m->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT && - m->service_name.empty()) - { - // Store in set of the daemon/service connections, i.e. those - // connections that require an update in the event of stats - // configuration changes. - daemon_connections.insert(m->get_connection()); - } - - m->put(); - return true; -} - -bool DaemonServer::handle_report(MMgrReport *m) -{ - DaemonKey key; - if (!m->service_name.empty()) { - key.first = m->service_name; - } else { - key.first = ceph_entity_type_name(m->get_connection()->get_peer_type()); - } - key.second = m->daemon_name; - - dout(4) << "from " << m->get_connection() << " " << key << dendl; - - if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT && - m->service_name.empty()) { - // Clients should not be sending us stats unless they are declaring - // themselves to be a daemon for some service. - dout(4) << "rejecting report from non-daemon client " << m->daemon_name - << dendl; - m->put(); - return true; - } - - // Look up the DaemonState - DaemonStatePtr daemon; - if (daemon_state.exists(key)) { - dout(20) << "updating existing DaemonState for " << key << dendl; - daemon = daemon_state.get(key); - } else { - dout(4) << "constructing new DaemonState for " << key << dendl; - daemon = std::make_shared(daemon_state.types); - // FIXME: crap, we don't know the hostname at this stage. - daemon->key = key; - daemon_state.insert(daemon); - // FIXME: we should avoid this case by rejecting MMgrReport from - // daemons without sessions, and ensuring that session open - // always contains metadata. - } - - // Update the DaemonState - assert(daemon != nullptr); - { - Mutex::Locker l(daemon->lock); - auto &daemon_counters = daemon->perf_counters; - daemon_counters.update(m); - - if (daemon->service_daemon) { - utime_t now = ceph_clock_now(); - if (m->daemon_status) { - daemon->service_status = *m->daemon_status; - daemon->service_status_stamp = now; - } - daemon->last_service_beacon = now; - } else if (m->daemon_status) { - derr << "got status from non-daemon " << key << dendl; - } - } - - // if there are any schema updates, notify the python modules - if (!m->declare_types.empty() || !m->undeclare_types.empty()) { - ostringstream oss; - oss << key.first << '.' << key.second; - py_modules.notify_all("perf_schema_update", oss.str()); - } - - m->put(); - return true; -} - - -void DaemonServer::_generate_command_map( - map& cmdmap, - map ¶m_str_map) -{ - for (map::const_iterator p = cmdmap.begin(); - p != cmdmap.end(); ++p) { - if (p->first == "prefix") - continue; - if (p->first == "caps") { - vector cv; - if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) && - cv.size() % 2 == 0) { - for (unsigned i = 0; i < cv.size(); i += 2) { - string k = string("caps_") + cv[i]; - param_str_map[k] = cv[i + 1]; - } - continue; - } - } - param_str_map[p->first] = cmd_vartype_stringify(p->second); - } -} - -const MonCommand *DaemonServer::_get_mgrcommand( - const string &cmd_prefix, - const std::vector &cmds) -{ - const MonCommand *this_cmd = nullptr; - for (const auto &cmd : cmds) { - if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) { - this_cmd = &cmd; - break; - } - } - return this_cmd; -} - -bool DaemonServer::_allowed_command( - MgrSession *s, - const string &module, - const string &prefix, - const map& cmdmap, - const map& param_str_map, - const MonCommand *this_cmd) { - - if (s->entity_name.is_mon()) { - // mon is all-powerful. even when it is forwarding commands on behalf of - // old clients; we expect the mon is validating commands before proxying! - return true; - } - - bool cmd_r = this_cmd->requires_perm('r'); - bool cmd_w = this_cmd->requires_perm('w'); - bool cmd_x = this_cmd->requires_perm('x'); - - bool capable = s->caps.is_capable( - g_ceph_context, - CEPH_ENTITY_TYPE_MGR, - s->entity_name, - module, prefix, param_str_map, - cmd_r, cmd_w, cmd_x); - - dout(10) << " " << s->entity_name << " " - << (capable ? "" : "not ") << "capable" << dendl; - return capable; -} - -bool DaemonServer::handle_command(MCommand *m) -{ - Mutex::Locker l(lock); - int r = 0; - std::stringstream ss; - std::string prefix; - - assert(lock.is_locked_by_me()); - - /** - * The working data for processing an MCommand. This lives in - * a class to enable passing it into other threads for processing - * outside of the thread/locks that called handle_command. - */ - class CommandContext - { - public: - MCommand *m; - bufferlist odata; - cmdmap_t cmdmap; - - CommandContext(MCommand *m_) - : m(m_) - { - } - - ~CommandContext() - { - m->put(); - } - - void reply(int r, const std::stringstream &ss) - { - reply(r, ss.str()); - } - - void reply(int r, const std::string &rs) - { - // Let the connection drop as soon as we've sent our response - ConnectionRef con = m->get_connection(); - if (con) { - con->mark_disposable(); - } - - dout(1) << "handle_command " << cpp_strerror(r) << " " << rs << dendl; - if (con) { - MCommandReply *reply = new MCommandReply(r, rs); - reply->set_tid(m->get_tid()); - reply->set_data(odata); - con->send_message(reply); - } - } - }; - - /** - * A context for receiving a bufferlist/error string from a background - * function and then calling back to a CommandContext when it's done - */ - class ReplyOnFinish : public Context { - std::shared_ptr cmdctx; - - public: - bufferlist from_mon; - string outs; - - ReplyOnFinish(std::shared_ptr cmdctx_) - : cmdctx(cmdctx_) - {} - void finish(int r) override { - cmdctx->odata.claim_append(from_mon); - cmdctx->reply(r, outs); - } - }; - - std::shared_ptr cmdctx = std::make_shared(m); - - MgrSessionRef session(static_cast(m->get_connection()->get_priv())); - if (!session) { - return true; - } - session->put(); // SessionRef takes a ref - if (session->inst.name == entity_name_t()) - session->inst.name = m->get_source(); - - std::string format; - boost::scoped_ptr f; - map param_str_map; - - if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) { - cmdctx->reply(-EINVAL, ss); - return true; - } - - { - cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain")); - f.reset(Formatter::create(format)); - } - - cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix); - - dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl; - dout(4) << "prefix=" << prefix << dendl; - - if (prefix == "get_command_descriptions") { - dout(10) << "reading commands from python modules" << dendl; - const auto py_commands = py_modules.get_commands(); - - int cmdnum = 0; - JSONFormatter f; - f.open_object_section("command_descriptions"); - - auto dump_cmd = [&cmdnum, &f](const MonCommand &mc){ - ostringstream secname; - secname << "cmd" << setfill('0') << std::setw(3) << cmdnum; - dump_cmddesc_to_json(&f, secname.str(), mc.cmdstring, mc.helpstring, - mc.module, mc.req_perms, mc.availability, 0); - cmdnum++; - }; - - for (const auto &pyc : py_commands) { - dump_cmd(pyc); - } - - for (const auto &mgr_cmd : mgr_commands) { - dump_cmd(mgr_cmd); - } - - f.close_section(); // command_descriptions - f.flush(cmdctx->odata); - cmdctx->reply(0, ss); - return true; - } - - // lookup command - const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands); - _generate_command_map(cmdctx->cmdmap, param_str_map); - if (!mgr_cmd) { - MonCommand py_command = {"", "", "py", "rw", "cli"}; - if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap, - param_str_map, &py_command)) { - dout(1) << " access denied" << dendl; - ss << "access denied; does your client key have mgr caps?" - " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication"; - cmdctx->reply(-EACCES, ss); - return true; - } - } else { - // validate user's permissions for requested command - if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap, - param_str_map, mgr_cmd)) { - dout(1) << " access denied" << dendl; - audit_clog->info() << "from='" << session->inst << "' " - << "entity='" << session->entity_name << "' " - << "cmd=" << m->cmd << ": access denied"; - ss << "access denied' does your client key have mgr caps?" - " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication"; - cmdctx->reply(-EACCES, ss); - return true; - } - } - - audit_clog->debug() - << "from='" << session->inst << "' " - << "entity='" << session->entity_name << "' " - << "cmd=" << m->cmd << ": dispatch"; - - // ---------------- - // service map commands - if (prefix == "service dump") { - if (!f) - f.reset(Formatter::create("json-pretty")); - cluster_state.with_servicemap([&](const ServiceMap &service_map) { - f->dump_object("service_map", service_map); - }); - f->flush(cmdctx->odata); - cmdctx->reply(0, ss); - return true; - } - if (prefix == "service status") { - if (!f) - f.reset(Formatter::create("json-pretty")); - // only include state from services that are in the persisted service map - f->open_object_section("service_status"); - ServiceMap s; - cluster_state.with_servicemap([&](const ServiceMap& service_map) { - s = service_map; - }); - for (auto& p : s.services) { - f->open_object_section(p.first.c_str()); - for (auto& q : p.second.daemons) { - f->open_object_section(q.first.c_str()); - DaemonKey key(p.first, q.first); - assert(daemon_state.exists(key)); - auto daemon = daemon_state.get(key); - Mutex::Locker l(daemon->lock); - f->dump_stream("status_stamp") << daemon->service_status_stamp; - f->dump_stream("last_beacon") << daemon->last_service_beacon; - f->open_object_section("status"); - for (auto& r : daemon->service_status) { - f->dump_string(r.first.c_str(), r.second); - } - f->close_section(); - f->close_section(); - } - f->close_section(); - } - f->close_section(); - f->flush(cmdctx->odata); - cmdctx->reply(0, ss); - return true; - } - - if (prefix == "config set") { - std::string key; - std::string val; - cmd_getval(cct, cmdctx->cmdmap, "key", key); - cmd_getval(cct, cmdctx->cmdmap, "value", val); - r = cct->_conf->set_val(key, val, true, &ss); - if (r == 0) { - cct->_conf->apply_changes(nullptr); - } - cmdctx->reply(0, ss); - return true; - } - - // ----------- - // PG commands - - if (prefix == "pg scrub" || - prefix == "pg repair" || - prefix == "pg deep-scrub") { - string scrubop = prefix.substr(3, string::npos); - pg_t pgid; - string pgidstr; - cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr); - if (!pgid.parse(pgidstr.c_str())) { - ss << "invalid pgid '" << pgidstr << "'"; - cmdctx->reply(-EINVAL, ss); - return true; - } - bool pg_exists = false; - cluster_state.with_osdmap([&](const OSDMap& osdmap) { - pg_exists = osdmap.pg_exists(pgid); - }); - if (!pg_exists) { - ss << "pg " << pgid << " dne"; - cmdctx->reply(-ENOENT, ss); - return true; - } - int acting_primary = -1; - cluster_state.with_osdmap([&](const OSDMap& osdmap) { - acting_primary = osdmap.get_pg_acting_primary(pgid); - }); - if (acting_primary == -1) { - ss << "pg " << pgid << " has no primary osd"; - cmdctx->reply(-EAGAIN, ss); - return true; - } - auto p = osd_cons.find(acting_primary); - if (p == osd_cons.end()) { - ss << "pg " << pgid << " primary osd." << acting_primary - << " is not currently connected"; - cmdctx->reply(-EAGAIN, ss); - } - vector pgs = { pgid }; - for (auto& con : p->second) { - con->send_message(new MOSDScrub(monc->get_fsid(), - pgs, - scrubop == "repair", - scrubop == "deep-scrub")); - } - ss << "instructing pg " << pgid << " on osd." << acting_primary - << " to " << scrubop; - cmdctx->reply(0, ss); - return true; - } else if (prefix == "osd scrub" || - prefix == "osd deep-scrub" || - prefix == "osd repair") { - string whostr; - cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr); - vector pvec; - get_str_vec(prefix, pvec); - - set osds; - if (whostr == "*" || whostr == "all" || whostr == "any") { - cluster_state.with_osdmap([&](const OSDMap& osdmap) { - for (int i = 0; i < osdmap.get_max_osd(); i++) - if (osdmap.is_up(i)) { - osds.insert(i); - } - }); - } else { - long osd = parse_osd_id(whostr.c_str(), &ss); - if (osd < 0) { - ss << "invalid osd '" << whostr << "'"; - cmdctx->reply(-EINVAL, ss); - return true; - } - cluster_state.with_osdmap([&](const OSDMap& osdmap) { - if (osdmap.is_up(osd)) { - osds.insert(osd); - } - }); - if (osds.empty()) { - ss << "osd." << osd << " is not up"; - cmdctx->reply(-EAGAIN, ss); - return true; - } - } - set sent_osds, failed_osds; - for (auto osd : osds) { - auto p = osd_cons.find(osd); - if (p == osd_cons.end()) { - failed_osds.insert(osd); - } else { - sent_osds.insert(osd); - for (auto& con : p->second) { - con->send_message(new MOSDScrub(monc->get_fsid(), - pvec.back() == "repair", - pvec.back() == "deep-scrub")); - } - } - } - if (failed_osds.size() == osds.size()) { - ss << "failed to instruct osd(s) " << osds << " to " << pvec.back() - << " (not connected)"; - r = -EAGAIN; - } else { - ss << "instructed osd(s) " << sent_osds << " to " << pvec.back(); - if (!failed_osds.empty()) { - ss << "; osd(s) " << failed_osds << " were not connected"; - } - r = 0; - } - cmdctx->reply(0, ss); - return true; - } else if (prefix == "osd reweight-by-pg" || - prefix == "osd reweight-by-utilization" || - prefix == "osd test-reweight-by-pg" || - prefix == "osd test-reweight-by-utilization") { - bool by_pg = - prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg"; - bool dry_run = - prefix == "osd test-reweight-by-pg" || - prefix == "osd test-reweight-by-utilization"; - int64_t oload; - cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120)); - set pools; - vector poolnames; - cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames); - cluster_state.with_osdmap([&](const OSDMap& osdmap) { - for (const auto& poolname : poolnames) { - int64_t pool = osdmap.lookup_pg_pool_name(poolname); - if (pool < 0) { - ss << "pool '" << poolname << "' does not exist"; - r = -ENOENT; - } - pools.insert(pool); - } - }); - if (r) { - cmdctx->reply(r, ss); - return true; - } - double max_change = g_conf->mon_reweight_max_change; - cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change); - if (max_change <= 0.0) { - ss << "max_change " << max_change << " must be positive"; - cmdctx->reply(-EINVAL, ss); - return true; - } - int64_t max_osds = g_conf->mon_reweight_max_osds; - cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds); - if (max_osds <= 0) { - ss << "max_osds " << max_osds << " must be positive"; - cmdctx->reply(-EINVAL, ss); - return true; - } - string no_increasing; - cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing); - string out_str; - mempool::osdmap::map new_weights; - r = cluster_state.with_pgmap([&](const PGMap& pgmap) { - return cluster_state.with_osdmap([&](const OSDMap& osdmap) { - return reweight::by_utilization(osdmap, pgmap, - oload, - max_change, - max_osds, - by_pg, - pools.empty() ? NULL : &pools, - no_increasing == "--no-increasing", - &new_weights, - &ss, &out_str, f.get()); - }); - }); - if (r >= 0) { - dout(10) << "reweight::by_utilization: finished with " << out_str << dendl; - } - if (f) { - f->flush(cmdctx->odata); - } else { - cmdctx->odata.append(out_str); - } - if (r < 0) { - ss << "FAILED reweight-by-pg"; - cmdctx->reply(r, ss); - return true; - } else if (r == 0 || dry_run) { - ss << "no change"; - cmdctx->reply(r, ss); - return true; - } else { - json_spirit::Object json_object; - for (const auto& osd_weight : new_weights) { - json_spirit::Config::add(json_object, - std::to_string(osd_weight.first), - std::to_string(osd_weight.second)); - } - string s = json_spirit::write(json_object); - std::replace(begin(s), end(s), '\"', '\''); - const string cmd = - "{" - "\"prefix\": \"osd reweightn\", " - "\"weights\": \"" + s + "\"" - "}"; - auto on_finish = new ReplyOnFinish(cmdctx); - monc->start_mon_command({cmd}, {}, - &on_finish->from_mon, &on_finish->outs, on_finish); - return true; - } - } else if (prefix == "osd df") { - string method; - cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method); - r = cluster_state.with_pgservice([&](const PGMapStatService& pgservice) { - return cluster_state.with_osdmap([&](const OSDMap& osdmap) { - print_osd_utilization(osdmap, &pgservice, ss, - f.get(), method == "tree"); - - cmdctx->odata.append(ss); - return 0; - }); - }); - cmdctx->reply(r, ""); - return true; - } else if (prefix == "osd safe-to-destroy") { - vector ids; - cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids); - set osds; - int r; - cluster_state.with_osdmap([&](const OSDMap& osdmap) { - r = osdmap.parse_osd_id_list(ids, &osds, &ss); - }); - if (!r && osds.empty()) { - ss << "must specify one or more OSDs"; - r = -EINVAL; - } - if (r < 0) { - cmdctx->reply(r, ss); - return true; - } - set active_osds, missing_stats, stored_pgs; - int affected_pgs = 0; - cluster_state.with_pgmap([&](const PGMap& pg_map) { - if (pg_map.num_pg_unknown > 0) { - ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw" - << " any conclusions"; - r = -EAGAIN; - return; - } - int num_active_clean = 0; - for (auto& p : pg_map.num_pg_by_state) { - unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN; - if ((p.first & want) == want) { - num_active_clean += p.second; - } - } - cluster_state.with_osdmap([&](const OSDMap& osdmap) { - for (auto osd : osds) { - if (!osdmap.exists(osd)) { - continue; // clearly safe to destroy - } - auto q = pg_map.num_pg_by_osd.find(osd); - if (q != pg_map.num_pg_by_osd.end()) { - if (q->second.acting > 0 || q->second.up > 0) { - active_osds.insert(osd); - affected_pgs += q->second.acting + q->second.up; - continue; - } - } - if (num_active_clean < pg_map.num_pg) { - // all pgs aren't active+clean; we need to be careful. - auto p = pg_map.osd_stat.find(osd); - if (p == pg_map.osd_stat.end()) { - missing_stats.insert(osd); - } - if (p->second.num_pgs > 0) { - stored_pgs.insert(osd); - } - } - } - }); - }); - if (!r && !active_osds.empty()) { - ss << "OSD(s) " << active_osds << " have " << affected_pgs - << " pgs currently mapped to them"; - r = -EBUSY; - } else if (!missing_stats.empty()) { - ss << "OSD(s) " << missing_stats << " have no reported stats, and not all" - << " PGs are active+clean; we cannot draw any conclusions"; - r = -EAGAIN; - } else if (!stored_pgs.empty()) { - ss << "OSD(s) " << stored_pgs << " last reported they still store some PG" - << " data, and not all PGs are active+clean; we cannot be sure they" - << " aren't still needed."; - r = -EBUSY; - } - if (r) { - cmdctx->reply(r, ss); - return true; - } - ss << "OSD(s) " << osds << " are safe to destroy without reducing data" - << " durability."; - cmdctx->reply(0, ss); - return true; - } else if (prefix == "osd ok-to-stop") { - vector ids; - cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids); - set osds; - int r; - cluster_state.with_osdmap([&](const OSDMap& osdmap) { - r = osdmap.parse_osd_id_list(ids, &osds, &ss); - }); - if (!r && osds.empty()) { - ss << "must specify one or more OSDs"; - r = -EINVAL; - } - if (r < 0) { - cmdctx->reply(r, ss); - return true; - } - map pg_delta; // pgid -> net acting set size change - int dangerous_pgs = 0; - cluster_state.with_pgmap([&](const PGMap& pg_map) { - return cluster_state.with_osdmap([&](const OSDMap& osdmap) { - if (pg_map.num_pg_unknown > 0) { - ss << pg_map.num_pg_unknown << " pgs have unknown state; " - << "cannot draw any conclusions"; - r = -EAGAIN; - return; - } - for (auto osd : osds) { - auto p = pg_map.pg_by_osd.find(osd); - if (p != pg_map.pg_by_osd.end()) { - for (auto& pgid : p->second) { - --pg_delta[pgid]; - } - } - } - for (auto& p : pg_delta) { - auto q = pg_map.pg_stat.find(p.first); - if (q == pg_map.pg_stat.end()) { - ss << "missing information about " << p.first << "; cannot draw" - << " any conclusions"; - r = -EAGAIN; - return; - } - if (!(q->second.state & PG_STATE_ACTIVE) || - (q->second.state & PG_STATE_DEGRADED)) { - // we don't currently have a good way to tell *how* degraded - // a degraded PG is, so we have to assume we cannot remove - // any more replicas/shards. - ++dangerous_pgs; - continue; - } - const pg_pool_t *pi = osdmap.get_pg_pool(p.first.pool()); - if (!pi) { - ++dangerous_pgs; // pool is creating or deleting - } else { - if (q->second.acting.size() + p.second < pi->min_size) { - ++dangerous_pgs; - } - } - } - }); - }); - if (r) { - cmdctx->reply(r, ss); - return true; - } - if (dangerous_pgs) { - ss << dangerous_pgs << " PGs are already degraded or might become " - << "unavailable"; - cmdctx->reply(-EBUSY, ss); - return true; - } - ss << "OSD(s) " << osds << " are ok to stop without reducing" - << " availability, provided there are no other concurrent failures" - << " or interventions. " << pg_delta.size() << " PGs are likely to be" - << " degraded (but remain available) as a result."; - cmdctx->reply(0, ss); - return true; - } else if (prefix == "pg force-recovery" || - prefix == "pg force-backfill" || - prefix == "pg cancel-force-recovery" || - prefix == "pg cancel-force-backfill") { - string forceop = prefix.substr(3, string::npos); - list parsed_pgs; - map > osdpgs; - - // figure out actual op just once - int actual_op = 0; - if (forceop == "force-recovery") { - actual_op = OFR_RECOVERY; - } else if (forceop == "force-backfill") { - actual_op = OFR_BACKFILL; - } else if (forceop == "cancel-force-backfill") { - actual_op = OFR_BACKFILL | OFR_CANCEL; - } else if (forceop == "cancel-force-recovery") { - actual_op = OFR_RECOVERY | OFR_CANCEL; - } - - // covnert pg names to pgs, discard any invalid ones while at it - { - // we don't want to keep pgidstr and pgidstr_nodup forever - vector pgidstr; - // get pgids to process and prune duplicates - cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr); - set pgidstr_nodup(pgidstr.begin(), pgidstr.end()); - if (pgidstr.size() != pgidstr_nodup.size()) { - // move elements only when there were duplicates, as this - // reorders them - pgidstr.resize(pgidstr_nodup.size()); - auto it = pgidstr_nodup.begin(); - for (size_t i = 0 ; i < pgidstr_nodup.size(); i++) { - pgidstr[i] = std::move(*it++); - } - } - - cluster_state.with_pgmap([&](const PGMap& pg_map) { - for (auto& pstr : pgidstr) { - pg_t parsed_pg; - if (!parsed_pg.parse(pstr.c_str())) { - ss << "invalid pgid '" << pstr << "'; "; - r = -EINVAL; - } else { - auto workit = pg_map.pg_stat.find(parsed_pg); - if (workit == pg_map.pg_stat.end()) { - ss << "pg " << pstr << " not exists; "; - r = -ENOENT; - } else { - pg_stat_t workpg = workit->second; - - // discard pgs for which user requests are pointless - switch (actual_op) - { - case OFR_RECOVERY: - if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING)) == 0) { - // don't return error, user script may be racing with cluster. not fatal. - ss << "pg " << pstr << " doesn't require recovery; "; - continue; - } else if (workpg.state & PG_STATE_FORCED_RECOVERY) { - ss << "pg " << pstr << " recovery already forced; "; - // return error, as it may be a bug in user script - r = -EINVAL; - continue; - } - break; - case OFR_BACKFILL: - if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING)) == 0) { - ss << "pg " << pstr << " doesn't require backfilling; "; - continue; - } else if (workpg.state & PG_STATE_FORCED_BACKFILL) { - ss << "pg " << pstr << " backfill already forced; "; - r = -EINVAL; - continue; - } - break; - case OFR_BACKFILL | OFR_CANCEL: - if ((workpg.state & PG_STATE_FORCED_BACKFILL) == 0) { - ss << "pg " << pstr << " backfill not forced; "; - continue; - } - break; - case OFR_RECOVERY | OFR_CANCEL: - if ((workpg.state & PG_STATE_FORCED_RECOVERY) == 0) { - ss << "pg " << pstr << " recovery not forced; "; - continue; - } - break; - default: - assert(0 == "actual_op value is not supported"); - } - - parsed_pgs.push_back(std::move(parsed_pg)); - } - } - } - - // group pgs to process by osd - for (auto& pgid : parsed_pgs) { - auto workit = pg_map.pg_stat.find(pgid); - if (workit != pg_map.pg_stat.end()) { - pg_stat_t workpg = workit->second; - set osds(workpg.up.begin(), workpg.up.end()); - osds.insert(workpg.acting.begin(), workpg.acting.end()); - for (auto i : osds) { - osdpgs[i].push_back(pgid); - } - } - } - - }); - } - - // respond with error only when no pgs are correct - // yes, in case of mixed errors, only the last one will be emitted, - // but the message presented will be fine - if (parsed_pgs.size() != 0) { - // clear error to not confuse users/scripts - r = 0; - } - - // optimize the command -> messages conversion, use only one message per distinct OSD - cluster_state.with_osdmap([&](const OSDMap& osdmap) { - for (auto& i : osdpgs) { - if (osdmap.is_up(i.first)) { - vector pgvec(make_move_iterator(i.second.begin()), make_move_iterator(i.second.end())); - auto p = osd_cons.find(i.first); - if (p == osd_cons.end()) { - ss << "osd." << i.first << " is not currently connected"; - r = -EAGAIN; - continue; - } - for (auto& con : p->second) { - con->send_message(new MOSDForceRecovery(monc->get_fsid(), pgvec, actual_op)); - } - ss << "instructing pg(s) " << i.second << " on osd." << i.first << " to " << forceop << "; "; - } - } - }); - ss << std::endl; - cmdctx->reply(r, ss); - return true; - } else { - r = cluster_state.with_pgmap([&](const PGMap& pg_map) { - return cluster_state.with_osdmap([&](const OSDMap& osdmap) { - return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap, - f.get(), &ss, &cmdctx->odata); - }); - }); - - if (r != -EOPNOTSUPP) { - cmdctx->reply(r, ss); - return true; - } - } - - // None of the special native commands, - ActivePyModule *handler = nullptr; - auto py_commands = py_modules.get_py_commands(); - for (const auto &pyc : py_commands) { - auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring); - dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl; - if (pyc_prefix == prefix) { - handler = pyc.handler; - break; - } - } - - if (handler == nullptr) { - ss << "No handler found for '" << prefix << "'"; - dout(4) << "No handler found for '" << prefix << "'" << dendl; - cmdctx->reply(-EINVAL, ss); - return true; - } else { - // Okay, now we have a handler to call, but we must not call it - // in this thread, because the python handlers can do anything, - // including blocking, and including calling back into mgr. - dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl; - finisher.queue(new FunctionContext([cmdctx, handler](int r_) { - std::stringstream ds; - std::stringstream ss; - int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss); - cmdctx->odata.append(ds); - cmdctx->reply(r, ss); - })); - return true; - } -} - -void DaemonServer::_prune_pending_service_map() -{ - utime_t cutoff = ceph_clock_now(); - cutoff -= g_conf->get_val("mgr_service_beacon_grace"); - auto p = pending_service_map.services.begin(); - while (p != pending_service_map.services.end()) { - auto q = p->second.daemons.begin(); - while (q != p->second.daemons.end()) { - DaemonKey key(p->first, q->first); - if (!daemon_state.exists(key)) { - derr << "missing key " << key << dendl; - ++q; - continue; - } - auto daemon = daemon_state.get(key); - Mutex::Locker l(daemon->lock); - if (daemon->last_service_beacon == utime_t()) { - // we must have just restarted; assume they are alive now. - daemon->last_service_beacon = ceph_clock_now(); - ++q; - continue; - } - if (daemon->last_service_beacon < cutoff) { - dout(10) << "pruning stale " << p->first << "." << q->first - << " last_beacon " << daemon->last_service_beacon << dendl; - q = p->second.daemons.erase(q); - pending_service_map_dirty = pending_service_map.epoch; - } else { - ++q; - } - } - if (p->second.daemons.empty()) { - p = pending_service_map.services.erase(p); - pending_service_map_dirty = pending_service_map.epoch; - } else { - ++p; - } - } -} - -void DaemonServer::send_report() -{ - if (!pgmap_ready) { - if (ceph_clock_now() - started_at > g_conf->get_val("mgr_stats_period") * 4.0) { - pgmap_ready = true; - reported_osds.clear(); - dout(1) << "Giving up on OSDs that haven't reported yet, sending " - << "potentially incomplete PG state to mon" << dendl; - } else { - dout(1) << "Not sending PG status to monitor yet, waiting for OSDs" - << dendl; - return; - } - } - - auto m = new MMonMgrReport(); - py_modules.get_health_checks(&m->health_checks); - - cluster_state.with_pgmap([&](const PGMap& pg_map) { - cluster_state.update_delta_stats(); - - if (pending_service_map.epoch) { - _prune_pending_service_map(); - if (pending_service_map_dirty >= pending_service_map.epoch) { - pending_service_map.modified = ceph_clock_now(); - ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL); - dout(10) << "sending service_map e" << pending_service_map.epoch - << dendl; - pending_service_map.epoch++; - } - } - - cluster_state.with_osdmap([&](const OSDMap& osdmap) { - // FIXME: no easy way to get mon features here. this will do for - // now, though, as long as we don't make a backward-incompat change. - pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL); - dout(10) << pg_map << dendl; - - pg_map.get_health_checks(g_ceph_context, osdmap, - &m->health_checks); - - dout(10) << m->health_checks.checks.size() << " health checks" - << dendl; - dout(20) << "health checks:\n"; - JSONFormatter jf(true); - jf.dump_object("health_checks", m->health_checks); - jf.flush(*_dout); - *_dout << dendl; - }); - }); - // TODO? We currently do not notify the PyModules - // TODO: respect needs_send, so we send the report only if we are asked to do - // so, or the state is updated. - monc->send_mon_message(m); -} - -void DaemonServer::got_service_map() -{ - Mutex::Locker l(lock); - - cluster_state.with_servicemap([&](const ServiceMap& service_map) { - if (pending_service_map.epoch == 0) { - // we just started up - dout(10) << "got initial map e" << service_map.epoch << dendl; - pending_service_map = service_map; - } else { - // we we already active and therefore must have persisted it, - // which means ours is the same or newer. - dout(10) << "got updated map e" << service_map.epoch << dendl; - } - pending_service_map.epoch = service_map.epoch + 1; - }); - - // cull missing daemons, populate new ones - for (auto& p : pending_service_map.services) { - std::set names; - for (auto& q : p.second.daemons) { - names.insert(q.first); - DaemonKey key(p.first, q.first); - if (!daemon_state.exists(key)) { - auto daemon = std::make_shared(daemon_state.types); - daemon->key = key; - daemon->metadata = q.second.metadata; - if (q.second.metadata.count("hostname")) { - daemon->hostname = q.second.metadata["hostname"]; - } - daemon->service_daemon = true; - daemon_state.insert(daemon); - dout(10) << "added missing " << key << dendl; - } - } - daemon_state.cull(p.first, names); - } -} - - -const char** DaemonServer::get_tracked_conf_keys() const -{ - static const char *KEYS[] = { - "mgr_stats_threshold", - "mgr_stats_period", - nullptr - }; - - return KEYS; -} - -void DaemonServer::handle_conf_change(const struct md_config_t *conf, - const std::set &changed) -{ - dout(4) << "ohai" << dendl; - // We may be called within lock (via MCommand `config set`) or outwith the - // lock (via admin socket `config set`), so handle either case. - const bool initially_locked = lock.is_locked_by_me(); - if (!initially_locked) { - lock.Lock(); - } - - if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) { - dout(4) << "Updating stats threshold/period on " - << daemon_connections.size() << " clients" << dendl; - // Send a fresh MMgrConfigure to all clients, so that they can follow - // the new policy for transmitting stats - for (auto &c : daemon_connections) { - _send_configure(c); - } - } -} - -void DaemonServer::_send_configure(ConnectionRef c) -{ - assert(lock.is_locked_by_me()); - - auto configure = new MMgrConfigure(); - configure->stats_period = g_conf->get_val("mgr_stats_period"); - configure->stats_threshold = g_conf->get_val("mgr_stats_threshold"); - c->send_message(configure); -} -