X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmgr%2FDaemonServer.cc;fp=src%2Fceph%2Fsrc%2Fmgr%2FDaemonServer.cc;h=321a38ad5349496154d1b4c6338e98fef52f0338;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/mgr/DaemonServer.cc b/src/ceph/src/mgr/DaemonServer.cc new file mode 100644 index 0000000..321a38a --- /dev/null +++ b/src/ceph/src/mgr/DaemonServer.cc @@ -0,0 +1,1495 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "DaemonServer.h" + +#include "include/str_list.h" +#include "auth/RotatingKeyRing.h" +#include "json_spirit/json_spirit_writer.h" + +#include "mgr/mgr_commands.h" +#include "mon/MonCommand.h" + +#include "messages/MMgrOpen.h" +#include "messages/MMgrConfigure.h" +#include "messages/MMonMgrReport.h" +#include "messages/MCommand.h" +#include "messages/MCommandReply.h" +#include "messages/MPGStats.h" +#include "messages/MOSDScrub.h" +#include "messages/MOSDForceRecovery.h" +#include "common/errno.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr.server " << __func__ << " " + + + +DaemonServer::DaemonServer(MonClient *monc_, + Finisher &finisher_, + DaemonStateIndex &daemon_state_, + ClusterState &cluster_state_, + PyModuleRegistry &py_modules_, + LogChannelRef clog_, + LogChannelRef audit_clog_) + : Dispatcher(g_ceph_context), + client_byte_throttler(new Throttle(g_ceph_context, "mgr_client_bytes", + g_conf->get_val("mgr_client_bytes"))), + client_msg_throttler(new Throttle(g_ceph_context, "mgr_client_messages", + g_conf->get_val("mgr_client_messages"))), + osd_byte_throttler(new Throttle(g_ceph_context, "mgr_osd_bytes", + g_conf->get_val("mgr_osd_bytes"))), + osd_msg_throttler(new Throttle(g_ceph_context, "mgr_osd_messsages", + g_conf->get_val("mgr_osd_messages"))), + mds_byte_throttler(new Throttle(g_ceph_context, "mgr_mds_bytes", + g_conf->get_val("mgr_mds_bytes"))), + mds_msg_throttler(new Throttle(g_ceph_context, "mgr_mds_messsages", + g_conf->get_val("mgr_mds_messages"))), + mon_byte_throttler(new Throttle(g_ceph_context, "mgr_mon_bytes", + g_conf->get_val("mgr_mon_bytes"))), + mon_msg_throttler(new Throttle(g_ceph_context, "mgr_mon_messsages", + g_conf->get_val("mgr_mon_messages"))), + msgr(nullptr), + monc(monc_), + finisher(finisher_), + daemon_state(daemon_state_), + cluster_state(cluster_state_), + py_modules(py_modules_), + clog(clog_), + audit_clog(audit_clog_), + auth_registry(g_ceph_context, + g_conf->auth_supported.empty() ? + g_conf->auth_cluster_required : + g_conf->auth_supported), + lock("DaemonServer"), + pgmap_ready(false) +{ + g_conf->add_observer(this); +} + +DaemonServer::~DaemonServer() { + delete msgr; + g_conf->remove_observer(this); +} + +int DaemonServer::init(uint64_t gid, entity_addr_t client_addr) +{ + // Initialize Messenger + std::string public_msgr_type = g_conf->ms_public_type.empty() ? + g_conf->get_val("ms_type") : g_conf->ms_public_type; + msgr = Messenger::create(g_ceph_context, public_msgr_type, + entity_name_t::MGR(gid), + "mgr", + getpid(), 0); + msgr->set_default_policy(Messenger::Policy::stateless_server(0)); + + // throttle clients + msgr->set_policy_throttlers(entity_name_t::TYPE_CLIENT, + client_byte_throttler.get(), + client_msg_throttler.get()); + + // servers + msgr->set_policy_throttlers(entity_name_t::TYPE_OSD, + osd_byte_throttler.get(), + osd_msg_throttler.get()); + msgr->set_policy_throttlers(entity_name_t::TYPE_MDS, + mds_byte_throttler.get(), + mds_msg_throttler.get()); + msgr->set_policy_throttlers(entity_name_t::TYPE_MON, + mon_byte_throttler.get(), + mon_msg_throttler.get()); + + int r = msgr->bind(g_conf->public_addr); + if (r < 0) { + derr << "unable to bind mgr to " << g_conf->public_addr << dendl; + return r; + } + + msgr->set_myname(entity_name_t::MGR(gid)); + msgr->set_addr_unknowns(client_addr); + + msgr->start(); + msgr->add_dispatcher_tail(this); + + started_at = ceph_clock_now(); + + return 0; +} + +entity_addr_t DaemonServer::get_myaddr() const +{ + return msgr->get_myaddr(); +} + + +bool DaemonServer::ms_verify_authorizer(Connection *con, + int peer_type, + int protocol, + ceph::bufferlist& authorizer_data, + ceph::bufferlist& authorizer_reply, + bool& is_valid, + CryptoKey& session_key) +{ + auto handler = auth_registry.get_handler(protocol); + if (!handler) { + dout(0) << "No AuthAuthorizeHandler found for protocol " << protocol << dendl; + is_valid = false; + return true; + } + + MgrSessionRef s(new MgrSession(cct)); + s->inst.addr = con->get_peer_addr(); + AuthCapsInfo caps_info; + + RotatingKeyRing *keys = monc->rotating_secrets.get(); + if (keys) { + is_valid = handler->verify_authorizer( + cct, keys, + authorizer_data, + authorizer_reply, s->entity_name, + s->global_id, caps_info, + session_key); + } else { + dout(10) << __func__ << " no rotating_keys (yet), denied" << dendl; + is_valid = false; + } + + if (is_valid) { + if (caps_info.allow_all) { + dout(10) << " session " << s << " " << s->entity_name + << " allow_all" << dendl; + s->caps.set_allow_all(); + } + if (caps_info.caps.length() > 0) { + bufferlist::iterator p = caps_info.caps.begin(); + string str; + try { + ::decode(str, p); + } + catch (buffer::error& e) { + } + bool success = s->caps.parse(str); + if (success) { + dout(10) << " session " << s << " " << s->entity_name + << " has caps " << s->caps << " '" << str << "'" << dendl; + } else { + dout(10) << " session " << s << " " << s->entity_name + << " failed to parse caps '" << str << "'" << dendl; + is_valid = false; + } + } + con->set_priv(s->get()); + + if (peer_type == CEPH_ENTITY_TYPE_OSD) { + Mutex::Locker l(lock); + s->osd_id = atoi(s->entity_name.get_id().c_str()); + dout(10) << "registering osd." << s->osd_id << " session " + << s << " con " << con << dendl; + osd_cons[s->osd_id].insert(con); + } + } + + return true; +} + + +bool DaemonServer::ms_get_authorizer(int dest_type, + AuthAuthorizer **authorizer, bool force_new) +{ + dout(10) << "type=" << ceph_entity_type_name(dest_type) << dendl; + + if (dest_type == CEPH_ENTITY_TYPE_MON) { + return true; + } + + if (force_new) { + if (monc->wait_auth_rotating(10) < 0) + return false; + } + + *authorizer = monc->build_authorizer(dest_type); + dout(20) << "got authorizer " << *authorizer << dendl; + return *authorizer != NULL; +} + +bool DaemonServer::ms_handle_reset(Connection *con) +{ + if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) { + MgrSessionRef session(static_cast(con->get_priv())); + if (!session) { + return false; + } + session->put(); // SessionRef takes a ref + Mutex::Locker l(lock); + dout(10) << "unregistering osd." << session->osd_id + << " session " << session << " con " << con << dendl; + osd_cons[session->osd_id].erase(con); + + auto iter = daemon_connections.find(con); + if (iter != daemon_connections.end()) { + daemon_connections.erase(iter); + } + } + return false; +} + +bool DaemonServer::ms_handle_refused(Connection *con) +{ + // do nothing for now + return false; +} + +bool DaemonServer::ms_dispatch(Message *m) +{ + // Note that we do *not* take ::lock here, in order to avoid + // serializing all message handling. It's up to each handler + // to take whatever locks it needs. + switch (m->get_type()) { + case MSG_PGSTATS: + cluster_state.ingest_pgstats(static_cast(m)); + maybe_ready(m->get_source().num()); + m->put(); + return true; + case MSG_MGR_REPORT: + return handle_report(static_cast(m)); + case MSG_MGR_OPEN: + return handle_open(static_cast(m)); + case MSG_COMMAND: + return handle_command(static_cast(m)); + default: + dout(1) << "Unhandled message type " << m->get_type() << dendl; + return false; + }; +} + +void DaemonServer::maybe_ready(int32_t osd_id) +{ + if (pgmap_ready.load()) { + // Fast path: we don't need to take lock because pgmap_ready + // is already set + } else { + Mutex::Locker l(lock); + + if (reported_osds.find(osd_id) == reported_osds.end()) { + dout(4) << "initial report from osd " << osd_id << dendl; + reported_osds.insert(osd_id); + std::set up_osds; + + cluster_state.with_osdmap([&](const OSDMap& osdmap) { + osdmap.get_up_osds(up_osds); + }); + + std::set unreported_osds; + std::set_difference(up_osds.begin(), up_osds.end(), + reported_osds.begin(), reported_osds.end(), + std::inserter(unreported_osds, unreported_osds.begin())); + + if (unreported_osds.size() == 0) { + dout(4) << "all osds have reported, sending PG state to mon" << dendl; + pgmap_ready = true; + reported_osds.clear(); + // Avoid waiting for next tick + send_report(); + } else { + dout(4) << "still waiting for " << unreported_osds.size() << " osds" + " to report in before PGMap is ready" << dendl; + } + } + } +} + +void DaemonServer::shutdown() +{ + dout(10) << "begin" << dendl; + msgr->shutdown(); + msgr->wait(); + dout(10) << "done" << dendl; +} + + + +bool DaemonServer::handle_open(MMgrOpen *m) +{ + Mutex::Locker l(lock); + + DaemonKey key; + if (!m->service_name.empty()) { + key.first = m->service_name; + } else { + key.first = ceph_entity_type_name(m->get_connection()->get_peer_type()); + } + key.second = m->daemon_name; + + dout(4) << "from " << m->get_connection() << " " << key << dendl; + + _send_configure(m->get_connection()); + + DaemonStatePtr daemon; + if (daemon_state.exists(key)) { + daemon = daemon_state.get(key); + } + if (daemon) { + dout(20) << "updating existing DaemonState for " << m->daemon_name << dendl; + Mutex::Locker l(daemon->lock); + daemon->perf_counters.clear(); + } + + if (m->service_daemon) { + if (!daemon) { + dout(4) << "constructing new DaemonState for " << key << dendl; + daemon = std::make_shared(daemon_state.types); + daemon->key = key; + if (m->daemon_metadata.count("hostname")) { + daemon->hostname = m->daemon_metadata["hostname"]; + } + daemon_state.insert(daemon); + } + Mutex::Locker l(daemon->lock); + daemon->service_daemon = true; + daemon->metadata = m->daemon_metadata; + daemon->service_status = m->daemon_status; + + utime_t now = ceph_clock_now(); + auto d = pending_service_map.get_daemon(m->service_name, + m->daemon_name); + if (d->gid != (uint64_t)m->get_source().num()) { + dout(10) << "registering " << key << " in pending_service_map" << dendl; + d->gid = m->get_source().num(); + d->addr = m->get_source_addr(); + d->start_epoch = pending_service_map.epoch; + d->start_stamp = now; + d->metadata = m->daemon_metadata; + pending_service_map_dirty = pending_service_map.epoch; + } + } + + if (m->get_connection()->get_peer_type() != entity_name_t::TYPE_CLIENT && + m->service_name.empty()) + { + // Store in set of the daemon/service connections, i.e. those + // connections that require an update in the event of stats + // configuration changes. + daemon_connections.insert(m->get_connection()); + } + + m->put(); + return true; +} + +bool DaemonServer::handle_report(MMgrReport *m) +{ + DaemonKey key; + if (!m->service_name.empty()) { + key.first = m->service_name; + } else { + key.first = ceph_entity_type_name(m->get_connection()->get_peer_type()); + } + key.second = m->daemon_name; + + dout(4) << "from " << m->get_connection() << " " << key << dendl; + + if (m->get_connection()->get_peer_type() == entity_name_t::TYPE_CLIENT && + m->service_name.empty()) { + // Clients should not be sending us stats unless they are declaring + // themselves to be a daemon for some service. + dout(4) << "rejecting report from non-daemon client " << m->daemon_name + << dendl; + m->put(); + return true; + } + + // Look up the DaemonState + DaemonStatePtr daemon; + if (daemon_state.exists(key)) { + dout(20) << "updating existing DaemonState for " << key << dendl; + daemon = daemon_state.get(key); + } else { + dout(4) << "constructing new DaemonState for " << key << dendl; + daemon = std::make_shared(daemon_state.types); + // FIXME: crap, we don't know the hostname at this stage. + daemon->key = key; + daemon_state.insert(daemon); + // FIXME: we should avoid this case by rejecting MMgrReport from + // daemons without sessions, and ensuring that session open + // always contains metadata. + } + + // Update the DaemonState + assert(daemon != nullptr); + { + Mutex::Locker l(daemon->lock); + auto &daemon_counters = daemon->perf_counters; + daemon_counters.update(m); + + if (daemon->service_daemon) { + utime_t now = ceph_clock_now(); + if (m->daemon_status) { + daemon->service_status = *m->daemon_status; + daemon->service_status_stamp = now; + } + daemon->last_service_beacon = now; + } else if (m->daemon_status) { + derr << "got status from non-daemon " << key << dendl; + } + } + + // if there are any schema updates, notify the python modules + if (!m->declare_types.empty() || !m->undeclare_types.empty()) { + ostringstream oss; + oss << key.first << '.' << key.second; + py_modules.notify_all("perf_schema_update", oss.str()); + } + + m->put(); + return true; +} + + +void DaemonServer::_generate_command_map( + map& cmdmap, + map ¶m_str_map) +{ + for (map::const_iterator p = cmdmap.begin(); + p != cmdmap.end(); ++p) { + if (p->first == "prefix") + continue; + if (p->first == "caps") { + vector cv; + if (cmd_getval(g_ceph_context, cmdmap, "caps", cv) && + cv.size() % 2 == 0) { + for (unsigned i = 0; i < cv.size(); i += 2) { + string k = string("caps_") + cv[i]; + param_str_map[k] = cv[i + 1]; + } + continue; + } + } + param_str_map[p->first] = cmd_vartype_stringify(p->second); + } +} + +const MonCommand *DaemonServer::_get_mgrcommand( + const string &cmd_prefix, + const std::vector &cmds) +{ + const MonCommand *this_cmd = nullptr; + for (const auto &cmd : cmds) { + if (cmd.cmdstring.compare(0, cmd_prefix.size(), cmd_prefix) == 0) { + this_cmd = &cmd; + break; + } + } + return this_cmd; +} + +bool DaemonServer::_allowed_command( + MgrSession *s, + const string &module, + const string &prefix, + const map& cmdmap, + const map& param_str_map, + const MonCommand *this_cmd) { + + if (s->entity_name.is_mon()) { + // mon is all-powerful. even when it is forwarding commands on behalf of + // old clients; we expect the mon is validating commands before proxying! + return true; + } + + bool cmd_r = this_cmd->requires_perm('r'); + bool cmd_w = this_cmd->requires_perm('w'); + bool cmd_x = this_cmd->requires_perm('x'); + + bool capable = s->caps.is_capable( + g_ceph_context, + CEPH_ENTITY_TYPE_MGR, + s->entity_name, + module, prefix, param_str_map, + cmd_r, cmd_w, cmd_x); + + dout(10) << " " << s->entity_name << " " + << (capable ? "" : "not ") << "capable" << dendl; + return capable; +} + +bool DaemonServer::handle_command(MCommand *m) +{ + Mutex::Locker l(lock); + int r = 0; + std::stringstream ss; + std::string prefix; + + assert(lock.is_locked_by_me()); + + /** + * The working data for processing an MCommand. This lives in + * a class to enable passing it into other threads for processing + * outside of the thread/locks that called handle_command. + */ + class CommandContext + { + public: + MCommand *m; + bufferlist odata; + cmdmap_t cmdmap; + + CommandContext(MCommand *m_) + : m(m_) + { + } + + ~CommandContext() + { + m->put(); + } + + void reply(int r, const std::stringstream &ss) + { + reply(r, ss.str()); + } + + void reply(int r, const std::string &rs) + { + // Let the connection drop as soon as we've sent our response + ConnectionRef con = m->get_connection(); + if (con) { + con->mark_disposable(); + } + + dout(1) << "handle_command " << cpp_strerror(r) << " " << rs << dendl; + if (con) { + MCommandReply *reply = new MCommandReply(r, rs); + reply->set_tid(m->get_tid()); + reply->set_data(odata); + con->send_message(reply); + } + } + }; + + /** + * A context for receiving a bufferlist/error string from a background + * function and then calling back to a CommandContext when it's done + */ + class ReplyOnFinish : public Context { + std::shared_ptr cmdctx; + + public: + bufferlist from_mon; + string outs; + + ReplyOnFinish(std::shared_ptr cmdctx_) + : cmdctx(cmdctx_) + {} + void finish(int r) override { + cmdctx->odata.claim_append(from_mon); + cmdctx->reply(r, outs); + } + }; + + std::shared_ptr cmdctx = std::make_shared(m); + + MgrSessionRef session(static_cast(m->get_connection()->get_priv())); + if (!session) { + return true; + } + session->put(); // SessionRef takes a ref + if (session->inst.name == entity_name_t()) + session->inst.name = m->get_source(); + + std::string format; + boost::scoped_ptr f; + map param_str_map; + + if (!cmdmap_from_json(m->cmd, &(cmdctx->cmdmap), ss)) { + cmdctx->reply(-EINVAL, ss); + return true; + } + + { + cmd_getval(g_ceph_context, cmdctx->cmdmap, "format", format, string("plain")); + f.reset(Formatter::create(format)); + } + + cmd_getval(cct, cmdctx->cmdmap, "prefix", prefix); + + dout(4) << "decoded " << cmdctx->cmdmap.size() << dendl; + dout(4) << "prefix=" << prefix << dendl; + + if (prefix == "get_command_descriptions") { + dout(10) << "reading commands from python modules" << dendl; + const auto py_commands = py_modules.get_commands(); + + int cmdnum = 0; + JSONFormatter f; + f.open_object_section("command_descriptions"); + + auto dump_cmd = [&cmdnum, &f](const MonCommand &mc){ + ostringstream secname; + secname << "cmd" << setfill('0') << std::setw(3) << cmdnum; + dump_cmddesc_to_json(&f, secname.str(), mc.cmdstring, mc.helpstring, + mc.module, mc.req_perms, mc.availability, 0); + cmdnum++; + }; + + for (const auto &pyc : py_commands) { + dump_cmd(pyc); + } + + for (const auto &mgr_cmd : mgr_commands) { + dump_cmd(mgr_cmd); + } + + f.close_section(); // command_descriptions + f.flush(cmdctx->odata); + cmdctx->reply(0, ss); + return true; + } + + // lookup command + const MonCommand *mgr_cmd = _get_mgrcommand(prefix, mgr_commands); + _generate_command_map(cmdctx->cmdmap, param_str_map); + if (!mgr_cmd) { + MonCommand py_command = {"", "", "py", "rw", "cli"}; + if (!_allowed_command(session.get(), py_command.module, prefix, cmdctx->cmdmap, + param_str_map, &py_command)) { + dout(1) << " access denied" << dendl; + ss << "access denied; does your client key have mgr caps?" + " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication"; + cmdctx->reply(-EACCES, ss); + return true; + } + } else { + // validate user's permissions for requested command + if (!_allowed_command(session.get(), mgr_cmd->module, prefix, cmdctx->cmdmap, + param_str_map, mgr_cmd)) { + dout(1) << " access denied" << dendl; + audit_clog->info() << "from='" << session->inst << "' " + << "entity='" << session->entity_name << "' " + << "cmd=" << m->cmd << ": access denied"; + ss << "access denied' does your client key have mgr caps?" + " See http://docs.ceph.com/docs/master/mgr/administrator/#client-authentication"; + cmdctx->reply(-EACCES, ss); + return true; + } + } + + audit_clog->debug() + << "from='" << session->inst << "' " + << "entity='" << session->entity_name << "' " + << "cmd=" << m->cmd << ": dispatch"; + + // ---------------- + // service map commands + if (prefix == "service dump") { + if (!f) + f.reset(Formatter::create("json-pretty")); + cluster_state.with_servicemap([&](const ServiceMap &service_map) { + f->dump_object("service_map", service_map); + }); + f->flush(cmdctx->odata); + cmdctx->reply(0, ss); + return true; + } + if (prefix == "service status") { + if (!f) + f.reset(Formatter::create("json-pretty")); + // only include state from services that are in the persisted service map + f->open_object_section("service_status"); + ServiceMap s; + cluster_state.with_servicemap([&](const ServiceMap& service_map) { + s = service_map; + }); + for (auto& p : s.services) { + f->open_object_section(p.first.c_str()); + for (auto& q : p.second.daemons) { + f->open_object_section(q.first.c_str()); + DaemonKey key(p.first, q.first); + assert(daemon_state.exists(key)); + auto daemon = daemon_state.get(key); + Mutex::Locker l(daemon->lock); + f->dump_stream("status_stamp") << daemon->service_status_stamp; + f->dump_stream("last_beacon") << daemon->last_service_beacon; + f->open_object_section("status"); + for (auto& r : daemon->service_status) { + f->dump_string(r.first.c_str(), r.second); + } + f->close_section(); + f->close_section(); + } + f->close_section(); + } + f->close_section(); + f->flush(cmdctx->odata); + cmdctx->reply(0, ss); + return true; + } + + if (prefix == "config set") { + std::string key; + std::string val; + cmd_getval(cct, cmdctx->cmdmap, "key", key); + cmd_getval(cct, cmdctx->cmdmap, "value", val); + r = cct->_conf->set_val(key, val, true, &ss); + if (r == 0) { + cct->_conf->apply_changes(nullptr); + } + cmdctx->reply(0, ss); + return true; + } + + // ----------- + // PG commands + + if (prefix == "pg scrub" || + prefix == "pg repair" || + prefix == "pg deep-scrub") { + string scrubop = prefix.substr(3, string::npos); + pg_t pgid; + string pgidstr; + cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr); + if (!pgid.parse(pgidstr.c_str())) { + ss << "invalid pgid '" << pgidstr << "'"; + cmdctx->reply(-EINVAL, ss); + return true; + } + bool pg_exists = false; + cluster_state.with_osdmap([&](const OSDMap& osdmap) { + pg_exists = osdmap.pg_exists(pgid); + }); + if (!pg_exists) { + ss << "pg " << pgid << " dne"; + cmdctx->reply(-ENOENT, ss); + return true; + } + int acting_primary = -1; + cluster_state.with_osdmap([&](const OSDMap& osdmap) { + acting_primary = osdmap.get_pg_acting_primary(pgid); + }); + if (acting_primary == -1) { + ss << "pg " << pgid << " has no primary osd"; + cmdctx->reply(-EAGAIN, ss); + return true; + } + auto p = osd_cons.find(acting_primary); + if (p == osd_cons.end()) { + ss << "pg " << pgid << " primary osd." << acting_primary + << " is not currently connected"; + cmdctx->reply(-EAGAIN, ss); + } + vector pgs = { pgid }; + for (auto& con : p->second) { + con->send_message(new MOSDScrub(monc->get_fsid(), + pgs, + scrubop == "repair", + scrubop == "deep-scrub")); + } + ss << "instructing pg " << pgid << " on osd." << acting_primary + << " to " << scrubop; + cmdctx->reply(0, ss); + return true; + } else if (prefix == "osd scrub" || + prefix == "osd deep-scrub" || + prefix == "osd repair") { + string whostr; + cmd_getval(g_ceph_context, cmdctx->cmdmap, "who", whostr); + vector pvec; + get_str_vec(prefix, pvec); + + set osds; + if (whostr == "*" || whostr == "all" || whostr == "any") { + cluster_state.with_osdmap([&](const OSDMap& osdmap) { + for (int i = 0; i < osdmap.get_max_osd(); i++) + if (osdmap.is_up(i)) { + osds.insert(i); + } + }); + } else { + long osd = parse_osd_id(whostr.c_str(), &ss); + if (osd < 0) { + ss << "invalid osd '" << whostr << "'"; + cmdctx->reply(-EINVAL, ss); + return true; + } + cluster_state.with_osdmap([&](const OSDMap& osdmap) { + if (osdmap.is_up(osd)) { + osds.insert(osd); + } + }); + if (osds.empty()) { + ss << "osd." << osd << " is not up"; + cmdctx->reply(-EAGAIN, ss); + return true; + } + } + set sent_osds, failed_osds; + for (auto osd : osds) { + auto p = osd_cons.find(osd); + if (p == osd_cons.end()) { + failed_osds.insert(osd); + } else { + sent_osds.insert(osd); + for (auto& con : p->second) { + con->send_message(new MOSDScrub(monc->get_fsid(), + pvec.back() == "repair", + pvec.back() == "deep-scrub")); + } + } + } + if (failed_osds.size() == osds.size()) { + ss << "failed to instruct osd(s) " << osds << " to " << pvec.back() + << " (not connected)"; + r = -EAGAIN; + } else { + ss << "instructed osd(s) " << sent_osds << " to " << pvec.back(); + if (!failed_osds.empty()) { + ss << "; osd(s) " << failed_osds << " were not connected"; + } + r = 0; + } + cmdctx->reply(0, ss); + return true; + } else if (prefix == "osd reweight-by-pg" || + prefix == "osd reweight-by-utilization" || + prefix == "osd test-reweight-by-pg" || + prefix == "osd test-reweight-by-utilization") { + bool by_pg = + prefix == "osd reweight-by-pg" || prefix == "osd test-reweight-by-pg"; + bool dry_run = + prefix == "osd test-reweight-by-pg" || + prefix == "osd test-reweight-by-utilization"; + int64_t oload; + cmd_getval(g_ceph_context, cmdctx->cmdmap, "oload", oload, int64_t(120)); + set pools; + vector poolnames; + cmd_getval(g_ceph_context, cmdctx->cmdmap, "pools", poolnames); + cluster_state.with_osdmap([&](const OSDMap& osdmap) { + for (const auto& poolname : poolnames) { + int64_t pool = osdmap.lookup_pg_pool_name(poolname); + if (pool < 0) { + ss << "pool '" << poolname << "' does not exist"; + r = -ENOENT; + } + pools.insert(pool); + } + }); + if (r) { + cmdctx->reply(r, ss); + return true; + } + double max_change = g_conf->mon_reweight_max_change; + cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_change", max_change); + if (max_change <= 0.0) { + ss << "max_change " << max_change << " must be positive"; + cmdctx->reply(-EINVAL, ss); + return true; + } + int64_t max_osds = g_conf->mon_reweight_max_osds; + cmd_getval(g_ceph_context, cmdctx->cmdmap, "max_osds", max_osds); + if (max_osds <= 0) { + ss << "max_osds " << max_osds << " must be positive"; + cmdctx->reply(-EINVAL, ss); + return true; + } + string no_increasing; + cmd_getval(g_ceph_context, cmdctx->cmdmap, "no_increasing", no_increasing); + string out_str; + mempool::osdmap::map new_weights; + r = cluster_state.with_pgmap([&](const PGMap& pgmap) { + return cluster_state.with_osdmap([&](const OSDMap& osdmap) { + return reweight::by_utilization(osdmap, pgmap, + oload, + max_change, + max_osds, + by_pg, + pools.empty() ? NULL : &pools, + no_increasing == "--no-increasing", + &new_weights, + &ss, &out_str, f.get()); + }); + }); + if (r >= 0) { + dout(10) << "reweight::by_utilization: finished with " << out_str << dendl; + } + if (f) { + f->flush(cmdctx->odata); + } else { + cmdctx->odata.append(out_str); + } + if (r < 0) { + ss << "FAILED reweight-by-pg"; + cmdctx->reply(r, ss); + return true; + } else if (r == 0 || dry_run) { + ss << "no change"; + cmdctx->reply(r, ss); + return true; + } else { + json_spirit::Object json_object; + for (const auto& osd_weight : new_weights) { + json_spirit::Config::add(json_object, + std::to_string(osd_weight.first), + std::to_string(osd_weight.second)); + } + string s = json_spirit::write(json_object); + std::replace(begin(s), end(s), '\"', '\''); + const string cmd = + "{" + "\"prefix\": \"osd reweightn\", " + "\"weights\": \"" + s + "\"" + "}"; + auto on_finish = new ReplyOnFinish(cmdctx); + monc->start_mon_command({cmd}, {}, + &on_finish->from_mon, &on_finish->outs, on_finish); + return true; + } + } else if (prefix == "osd df") { + string method; + cmd_getval(g_ceph_context, cmdctx->cmdmap, "output_method", method); + r = cluster_state.with_pgservice([&](const PGMapStatService& pgservice) { + return cluster_state.with_osdmap([&](const OSDMap& osdmap) { + print_osd_utilization(osdmap, &pgservice, ss, + f.get(), method == "tree"); + + cmdctx->odata.append(ss); + return 0; + }); + }); + cmdctx->reply(r, ""); + return true; + } else if (prefix == "osd safe-to-destroy") { + vector ids; + cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids); + set osds; + int r; + cluster_state.with_osdmap([&](const OSDMap& osdmap) { + r = osdmap.parse_osd_id_list(ids, &osds, &ss); + }); + if (!r && osds.empty()) { + ss << "must specify one or more OSDs"; + r = -EINVAL; + } + if (r < 0) { + cmdctx->reply(r, ss); + return true; + } + set active_osds, missing_stats, stored_pgs; + int affected_pgs = 0; + cluster_state.with_pgmap([&](const PGMap& pg_map) { + if (pg_map.num_pg_unknown > 0) { + ss << pg_map.num_pg_unknown << " pgs have unknown state; cannot draw" + << " any conclusions"; + r = -EAGAIN; + return; + } + int num_active_clean = 0; + for (auto& p : pg_map.num_pg_by_state) { + unsigned want = PG_STATE_ACTIVE|PG_STATE_CLEAN; + if ((p.first & want) == want) { + num_active_clean += p.second; + } + } + cluster_state.with_osdmap([&](const OSDMap& osdmap) { + for (auto osd : osds) { + if (!osdmap.exists(osd)) { + continue; // clearly safe to destroy + } + auto q = pg_map.num_pg_by_osd.find(osd); + if (q != pg_map.num_pg_by_osd.end()) { + if (q->second.acting > 0 || q->second.up > 0) { + active_osds.insert(osd); + affected_pgs += q->second.acting + q->second.up; + continue; + } + } + if (num_active_clean < pg_map.num_pg) { + // all pgs aren't active+clean; we need to be careful. + auto p = pg_map.osd_stat.find(osd); + if (p == pg_map.osd_stat.end()) { + missing_stats.insert(osd); + } + if (p->second.num_pgs > 0) { + stored_pgs.insert(osd); + } + } + } + }); + }); + if (!r && !active_osds.empty()) { + ss << "OSD(s) " << active_osds << " have " << affected_pgs + << " pgs currently mapped to them"; + r = -EBUSY; + } else if (!missing_stats.empty()) { + ss << "OSD(s) " << missing_stats << " have no reported stats, and not all" + << " PGs are active+clean; we cannot draw any conclusions"; + r = -EAGAIN; + } else if (!stored_pgs.empty()) { + ss << "OSD(s) " << stored_pgs << " last reported they still store some PG" + << " data, and not all PGs are active+clean; we cannot be sure they" + << " aren't still needed."; + r = -EBUSY; + } + if (r) { + cmdctx->reply(r, ss); + return true; + } + ss << "OSD(s) " << osds << " are safe to destroy without reducing data" + << " durability."; + cmdctx->reply(0, ss); + return true; + } else if (prefix == "osd ok-to-stop") { + vector ids; + cmd_getval(g_ceph_context, cmdctx->cmdmap, "ids", ids); + set osds; + int r; + cluster_state.with_osdmap([&](const OSDMap& osdmap) { + r = osdmap.parse_osd_id_list(ids, &osds, &ss); + }); + if (!r && osds.empty()) { + ss << "must specify one or more OSDs"; + r = -EINVAL; + } + if (r < 0) { + cmdctx->reply(r, ss); + return true; + } + map pg_delta; // pgid -> net acting set size change + int dangerous_pgs = 0; + cluster_state.with_pgmap([&](const PGMap& pg_map) { + return cluster_state.with_osdmap([&](const OSDMap& osdmap) { + if (pg_map.num_pg_unknown > 0) { + ss << pg_map.num_pg_unknown << " pgs have unknown state; " + << "cannot draw any conclusions"; + r = -EAGAIN; + return; + } + for (auto osd : osds) { + auto p = pg_map.pg_by_osd.find(osd); + if (p != pg_map.pg_by_osd.end()) { + for (auto& pgid : p->second) { + --pg_delta[pgid]; + } + } + } + for (auto& p : pg_delta) { + auto q = pg_map.pg_stat.find(p.first); + if (q == pg_map.pg_stat.end()) { + ss << "missing information about " << p.first << "; cannot draw" + << " any conclusions"; + r = -EAGAIN; + return; + } + if (!(q->second.state & PG_STATE_ACTIVE) || + (q->second.state & PG_STATE_DEGRADED)) { + // we don't currently have a good way to tell *how* degraded + // a degraded PG is, so we have to assume we cannot remove + // any more replicas/shards. + ++dangerous_pgs; + continue; + } + const pg_pool_t *pi = osdmap.get_pg_pool(p.first.pool()); + if (!pi) { + ++dangerous_pgs; // pool is creating or deleting + } else { + if (q->second.acting.size() + p.second < pi->min_size) { + ++dangerous_pgs; + } + } + } + }); + }); + if (r) { + cmdctx->reply(r, ss); + return true; + } + if (dangerous_pgs) { + ss << dangerous_pgs << " PGs are already degraded or might become " + << "unavailable"; + cmdctx->reply(-EBUSY, ss); + return true; + } + ss << "OSD(s) " << osds << " are ok to stop without reducing" + << " availability, provided there are no other concurrent failures" + << " or interventions. " << pg_delta.size() << " PGs are likely to be" + << " degraded (but remain available) as a result."; + cmdctx->reply(0, ss); + return true; + } else if (prefix == "pg force-recovery" || + prefix == "pg force-backfill" || + prefix == "pg cancel-force-recovery" || + prefix == "pg cancel-force-backfill") { + string forceop = prefix.substr(3, string::npos); + list parsed_pgs; + map > osdpgs; + + // figure out actual op just once + int actual_op = 0; + if (forceop == "force-recovery") { + actual_op = OFR_RECOVERY; + } else if (forceop == "force-backfill") { + actual_op = OFR_BACKFILL; + } else if (forceop == "cancel-force-backfill") { + actual_op = OFR_BACKFILL | OFR_CANCEL; + } else if (forceop == "cancel-force-recovery") { + actual_op = OFR_RECOVERY | OFR_CANCEL; + } + + // covnert pg names to pgs, discard any invalid ones while at it + { + // we don't want to keep pgidstr and pgidstr_nodup forever + vector pgidstr; + // get pgids to process and prune duplicates + cmd_getval(g_ceph_context, cmdctx->cmdmap, "pgid", pgidstr); + set pgidstr_nodup(pgidstr.begin(), pgidstr.end()); + if (pgidstr.size() != pgidstr_nodup.size()) { + // move elements only when there were duplicates, as this + // reorders them + pgidstr.resize(pgidstr_nodup.size()); + auto it = pgidstr_nodup.begin(); + for (size_t i = 0 ; i < pgidstr_nodup.size(); i++) { + pgidstr[i] = std::move(*it++); + } + } + + cluster_state.with_pgmap([&](const PGMap& pg_map) { + for (auto& pstr : pgidstr) { + pg_t parsed_pg; + if (!parsed_pg.parse(pstr.c_str())) { + ss << "invalid pgid '" << pstr << "'; "; + r = -EINVAL; + } else { + auto workit = pg_map.pg_stat.find(parsed_pg); + if (workit == pg_map.pg_stat.end()) { + ss << "pg " << pstr << " not exists; "; + r = -ENOENT; + } else { + pg_stat_t workpg = workit->second; + + // discard pgs for which user requests are pointless + switch (actual_op) + { + case OFR_RECOVERY: + if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_RECOVERY_WAIT | PG_STATE_RECOVERING)) == 0) { + // don't return error, user script may be racing with cluster. not fatal. + ss << "pg " << pstr << " doesn't require recovery; "; + continue; + } else if (workpg.state & PG_STATE_FORCED_RECOVERY) { + ss << "pg " << pstr << " recovery already forced; "; + // return error, as it may be a bug in user script + r = -EINVAL; + continue; + } + break; + case OFR_BACKFILL: + if ((workpg.state & (PG_STATE_DEGRADED | PG_STATE_BACKFILL_WAIT | PG_STATE_BACKFILLING)) == 0) { + ss << "pg " << pstr << " doesn't require backfilling; "; + continue; + } else if (workpg.state & PG_STATE_FORCED_BACKFILL) { + ss << "pg " << pstr << " backfill already forced; "; + r = -EINVAL; + continue; + } + break; + case OFR_BACKFILL | OFR_CANCEL: + if ((workpg.state & PG_STATE_FORCED_BACKFILL) == 0) { + ss << "pg " << pstr << " backfill not forced; "; + continue; + } + break; + case OFR_RECOVERY | OFR_CANCEL: + if ((workpg.state & PG_STATE_FORCED_RECOVERY) == 0) { + ss << "pg " << pstr << " recovery not forced; "; + continue; + } + break; + default: + assert(0 == "actual_op value is not supported"); + } + + parsed_pgs.push_back(std::move(parsed_pg)); + } + } + } + + // group pgs to process by osd + for (auto& pgid : parsed_pgs) { + auto workit = pg_map.pg_stat.find(pgid); + if (workit != pg_map.pg_stat.end()) { + pg_stat_t workpg = workit->second; + set osds(workpg.up.begin(), workpg.up.end()); + osds.insert(workpg.acting.begin(), workpg.acting.end()); + for (auto i : osds) { + osdpgs[i].push_back(pgid); + } + } + } + + }); + } + + // respond with error only when no pgs are correct + // yes, in case of mixed errors, only the last one will be emitted, + // but the message presented will be fine + if (parsed_pgs.size() != 0) { + // clear error to not confuse users/scripts + r = 0; + } + + // optimize the command -> messages conversion, use only one message per distinct OSD + cluster_state.with_osdmap([&](const OSDMap& osdmap) { + for (auto& i : osdpgs) { + if (osdmap.is_up(i.first)) { + vector pgvec(make_move_iterator(i.second.begin()), make_move_iterator(i.second.end())); + auto p = osd_cons.find(i.first); + if (p == osd_cons.end()) { + ss << "osd." << i.first << " is not currently connected"; + r = -EAGAIN; + continue; + } + for (auto& con : p->second) { + con->send_message(new MOSDForceRecovery(monc->get_fsid(), pgvec, actual_op)); + } + ss << "instructing pg(s) " << i.second << " on osd." << i.first << " to " << forceop << "; "; + } + } + }); + ss << std::endl; + cmdctx->reply(r, ss); + return true; + } else { + r = cluster_state.with_pgmap([&](const PGMap& pg_map) { + return cluster_state.with_osdmap([&](const OSDMap& osdmap) { + return process_pg_map_command(prefix, cmdctx->cmdmap, pg_map, osdmap, + f.get(), &ss, &cmdctx->odata); + }); + }); + + if (r != -EOPNOTSUPP) { + cmdctx->reply(r, ss); + return true; + } + } + + // None of the special native commands, + ActivePyModule *handler = nullptr; + auto py_commands = py_modules.get_py_commands(); + for (const auto &pyc : py_commands) { + auto pyc_prefix = cmddesc_get_prefix(pyc.cmdstring); + dout(1) << "pyc_prefix: '" << pyc_prefix << "'" << dendl; + if (pyc_prefix == prefix) { + handler = pyc.handler; + break; + } + } + + if (handler == nullptr) { + ss << "No handler found for '" << prefix << "'"; + dout(4) << "No handler found for '" << prefix << "'" << dendl; + cmdctx->reply(-EINVAL, ss); + return true; + } else { + // Okay, now we have a handler to call, but we must not call it + // in this thread, because the python handlers can do anything, + // including blocking, and including calling back into mgr. + dout(4) << "passing through " << cmdctx->cmdmap.size() << dendl; + finisher.queue(new FunctionContext([cmdctx, handler](int r_) { + std::stringstream ds; + std::stringstream ss; + int r = handler->handle_command(cmdctx->cmdmap, &ds, &ss); + cmdctx->odata.append(ds); + cmdctx->reply(r, ss); + })); + return true; + } +} + +void DaemonServer::_prune_pending_service_map() +{ + utime_t cutoff = ceph_clock_now(); + cutoff -= g_conf->get_val("mgr_service_beacon_grace"); + auto p = pending_service_map.services.begin(); + while (p != pending_service_map.services.end()) { + auto q = p->second.daemons.begin(); + while (q != p->second.daemons.end()) { + DaemonKey key(p->first, q->first); + if (!daemon_state.exists(key)) { + derr << "missing key " << key << dendl; + ++q; + continue; + } + auto daemon = daemon_state.get(key); + Mutex::Locker l(daemon->lock); + if (daemon->last_service_beacon == utime_t()) { + // we must have just restarted; assume they are alive now. + daemon->last_service_beacon = ceph_clock_now(); + ++q; + continue; + } + if (daemon->last_service_beacon < cutoff) { + dout(10) << "pruning stale " << p->first << "." << q->first + << " last_beacon " << daemon->last_service_beacon << dendl; + q = p->second.daemons.erase(q); + pending_service_map_dirty = pending_service_map.epoch; + } else { + ++q; + } + } + if (p->second.daemons.empty()) { + p = pending_service_map.services.erase(p); + pending_service_map_dirty = pending_service_map.epoch; + } else { + ++p; + } + } +} + +void DaemonServer::send_report() +{ + if (!pgmap_ready) { + if (ceph_clock_now() - started_at > g_conf->get_val("mgr_stats_period") * 4.0) { + pgmap_ready = true; + reported_osds.clear(); + dout(1) << "Giving up on OSDs that haven't reported yet, sending " + << "potentially incomplete PG state to mon" << dendl; + } else { + dout(1) << "Not sending PG status to monitor yet, waiting for OSDs" + << dendl; + return; + } + } + + auto m = new MMonMgrReport(); + py_modules.get_health_checks(&m->health_checks); + + cluster_state.with_pgmap([&](const PGMap& pg_map) { + cluster_state.update_delta_stats(); + + if (pending_service_map.epoch) { + _prune_pending_service_map(); + if (pending_service_map_dirty >= pending_service_map.epoch) { + pending_service_map.modified = ceph_clock_now(); + ::encode(pending_service_map, m->service_map_bl, CEPH_FEATURES_ALL); + dout(10) << "sending service_map e" << pending_service_map.epoch + << dendl; + pending_service_map.epoch++; + } + } + + cluster_state.with_osdmap([&](const OSDMap& osdmap) { + // FIXME: no easy way to get mon features here. this will do for + // now, though, as long as we don't make a backward-incompat change. + pg_map.encode_digest(osdmap, m->get_data(), CEPH_FEATURES_ALL); + dout(10) << pg_map << dendl; + + pg_map.get_health_checks(g_ceph_context, osdmap, + &m->health_checks); + + dout(10) << m->health_checks.checks.size() << " health checks" + << dendl; + dout(20) << "health checks:\n"; + JSONFormatter jf(true); + jf.dump_object("health_checks", m->health_checks); + jf.flush(*_dout); + *_dout << dendl; + }); + }); + // TODO? We currently do not notify the PyModules + // TODO: respect needs_send, so we send the report only if we are asked to do + // so, or the state is updated. + monc->send_mon_message(m); +} + +void DaemonServer::got_service_map() +{ + Mutex::Locker l(lock); + + cluster_state.with_servicemap([&](const ServiceMap& service_map) { + if (pending_service_map.epoch == 0) { + // we just started up + dout(10) << "got initial map e" << service_map.epoch << dendl; + pending_service_map = service_map; + } else { + // we we already active and therefore must have persisted it, + // which means ours is the same or newer. + dout(10) << "got updated map e" << service_map.epoch << dendl; + } + pending_service_map.epoch = service_map.epoch + 1; + }); + + // cull missing daemons, populate new ones + for (auto& p : pending_service_map.services) { + std::set names; + for (auto& q : p.second.daemons) { + names.insert(q.first); + DaemonKey key(p.first, q.first); + if (!daemon_state.exists(key)) { + auto daemon = std::make_shared(daemon_state.types); + daemon->key = key; + daemon->metadata = q.second.metadata; + if (q.second.metadata.count("hostname")) { + daemon->hostname = q.second.metadata["hostname"]; + } + daemon->service_daemon = true; + daemon_state.insert(daemon); + dout(10) << "added missing " << key << dendl; + } + } + daemon_state.cull(p.first, names); + } +} + + +const char** DaemonServer::get_tracked_conf_keys() const +{ + static const char *KEYS[] = { + "mgr_stats_threshold", + "mgr_stats_period", + nullptr + }; + + return KEYS; +} + +void DaemonServer::handle_conf_change(const struct md_config_t *conf, + const std::set &changed) +{ + dout(4) << "ohai" << dendl; + // We may be called within lock (via MCommand `config set`) or outwith the + // lock (via admin socket `config set`), so handle either case. + const bool initially_locked = lock.is_locked_by_me(); + if (!initially_locked) { + lock.Lock(); + } + + if (changed.count("mgr_stats_threshold") || changed.count("mgr_stats_period")) { + dout(4) << "Updating stats threshold/period on " + << daemon_connections.size() << " clients" << dendl; + // Send a fresh MMgrConfigure to all clients, so that they can follow + // the new policy for transmitting stats + for (auto &c : daemon_connections) { + _send_configure(c); + } + } +} + +void DaemonServer::_send_configure(ConnectionRef c) +{ + assert(lock.is_locked_by_me()); + + auto configure = new MMgrConfigure(); + configure->stats_period = g_conf->get_val("mgr_stats_period"); + configure->stats_threshold = g_conf->get_val("mgr_stats_threshold"); + c->send_message(configure); +} +