X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmgr%2FMgr.cc;fp=src%2Fceph%2Fsrc%2Fmgr%2FMgr.cc;h=c17d0418786454d883437e690bd01b14895a7037;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/mgr/Mgr.cc b/src/ceph/src/mgr/Mgr.cc new file mode 100644 index 0000000..c17d041 --- /dev/null +++ b/src/ceph/src/mgr/Mgr.cc @@ -0,0 +1,657 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include + +#include "osdc/Objecter.h" +#include "client/Client.h" +#include "common/errno.h" +#include "mon/MonClient.h" +#include "include/stringify.h" +#include "global/global_context.h" +#include "global/signal_handler.h" + +#include "mgr/MgrContext.h" +#include "mgr/mgr_commands.h" + +//#include "MgrPyModule.h" +#include "DaemonServer.h" +#include "messages/MMgrBeacon.h" +#include "messages/MMgrDigest.h" +#include "messages/MCommand.h" +#include "messages/MCommandReply.h" +#include "messages/MLog.h" +#include "messages/MServiceMap.h" + +#include "Mgr.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr " << __func__ << " " + + +Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap, + PyModuleRegistry *py_module_registry_, + Messenger *clientm_, Objecter *objecter_, + Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) : + monc(monc_), + objecter(objecter_), + client(client_), + client_messenger(clientm_), + lock("Mgr::lock"), + timer(g_ceph_context, lock), + finisher(g_ceph_context, "Mgr", "mgr-fin"), + digest_received(false), + py_module_registry(py_module_registry_), + cluster_state(monc, nullptr, mgrmap), + server(monc, finisher, daemon_state, cluster_state, *py_module_registry, + clog_, audit_clog_), + clog(clog_), + audit_clog(audit_clog_), + initialized(false), + initializing(false) +{ + cluster_state.set_objecter(objecter); +} + + +Mgr::~Mgr() +{ +} + + +/** + * Context for completion of metadata mon commands: take + * the result and stash it in DaemonStateIndex + */ +class MetadataUpdate : public Context +{ + DaemonStateIndex &daemon_state; + DaemonKey key; + + std::map defaults; + +public: + bufferlist outbl; + std::string outs; + + MetadataUpdate(DaemonStateIndex &daemon_state_, const DaemonKey &key_) + : daemon_state(daemon_state_), key(key_) {} + + void set_default(const std::string &k, const std::string &v) + { + defaults[k] = v; + } + + void finish(int r) override + { + daemon_state.clear_updating(key); + if (r == 0) { + if (key.first == "mds") { + json_spirit::mValue json_result; + bool read_ok = json_spirit::read( + outbl.to_str(), json_result); + if (!read_ok) { + dout(1) << "mon returned invalid JSON for " + << key.first << "." << key.second << dendl; + return; + } + + json_spirit::mObject daemon_meta = json_result.get_obj(); + + // Apply any defaults + for (const auto &i : defaults) { + if (daemon_meta.find(i.first) == daemon_meta.end()) { + daemon_meta[i.first] = i.second; + } + } + + DaemonStatePtr state; + if (daemon_state.exists(key)) { + state = daemon_state.get(key); + Mutex::Locker l(state->lock); + daemon_meta.erase("name"); + daemon_meta.erase("hostname"); + state->metadata.clear(); + for (const auto &i : daemon_meta) { + state->metadata[i.first] = i.second.get_str(); + } + } else { + state = std::make_shared(daemon_state.types); + state->key = key; + state->hostname = daemon_meta.at("hostname").get_str(); + + for (const auto &i : daemon_meta) { + state->metadata[i.first] = i.second.get_str(); + } + + daemon_state.insert(state); + } + } else if (key.first == "osd") { + } else { + ceph_abort(); + } + } else { + dout(1) << "mon failed to return metadata for " + << key.first << "." << key.second << ": " + << cpp_strerror(r) << dendl; + } + } +}; + + +void Mgr::background_init(Context *completion) +{ + Mutex::Locker l(lock); + assert(!initializing); + assert(!initialized); + initializing = true; + + finisher.start(); + + finisher.queue(new FunctionContext([this, completion](int r){ + init(); + completion->complete(0); + })); +} + +void Mgr::init() +{ + Mutex::Locker l(lock); + assert(initializing); + assert(!initialized); + + // Start communicating with daemons to learn statistics etc + int r = server.init(monc->get_global_id(), client_messenger->get_myaddr()); + if (r < 0) { + derr << "Initialize server fail"<< dendl; + return; + } + dout(4) << "Initialized server at " << server.get_myaddr() << dendl; + + // Preload all daemon metadata (will subsequently keep this + // up to date by watching maps, so do the initial load before + // we subscribe to any maps) + dout(4) << "Loading daemon metadata..." << dendl; + load_all_metadata(); + + // subscribe to all the maps + monc->sub_want("log-info", 0, 0); + monc->sub_want("mgrdigest", 0, 0); + monc->sub_want("fsmap", 0, 0); + monc->sub_want("servicemap", 0, 0); + + dout(4) << "waiting for OSDMap..." << dendl; + // Subscribe to OSDMap update to pass on to ClusterState + objecter->maybe_request_map(); + + // reset the mon session. we get these maps through subscriptions which + // are stateful with the connection, so even if *we* don't have them a + // previous incarnation sharing the same MonClient may have. + monc->reopen_session(); + + // Start Objecter and wait for OSD map + lock.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch + objecter->wait_for_osd_map(); + lock.Lock(); + + // Populate PGs in ClusterState + objecter->with_osdmap([this](const OSDMap &osd_map) { + cluster_state.notify_osdmap(osd_map); + }); + + // Wait for FSMap + dout(4) << "waiting for FSMap..." << dendl; + while (!cluster_state.have_fsmap()) { + fs_map_cond.Wait(lock); + } + + dout(4) << "waiting for config-keys..." << dendl; + + // Preload config keys (`get` for plugins is to be a fast local + // operation, we we don't have to synchronize these later because + // all sets will come via mgr) + auto loaded_config = load_config(); + + // Wait for MgrDigest... + dout(4) << "waiting for MgrDigest..." << dendl; + while (!digest_received) { + digest_cond.Wait(lock); + } + + // assume finisher already initialized in background_init + dout(4) << "starting python modules..." << dendl; + py_module_registry->active_start(loaded_config, daemon_state, cluster_state, *monc, + clog, *objecter, *client, finisher); + + dout(4) << "Complete." << dendl; + initializing = false; + initialized = true; +} + +void Mgr::load_all_metadata() +{ + assert(lock.is_locked_by_me()); + + JSONCommand mds_cmd; + mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}"); + JSONCommand osd_cmd; + osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}"); + JSONCommand mon_cmd; + mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}"); + + lock.Unlock(); + mds_cmd.wait(); + osd_cmd.wait(); + mon_cmd.wait(); + lock.Lock(); + + assert(mds_cmd.r == 0); + assert(mon_cmd.r == 0); + assert(osd_cmd.r == 0); + + for (auto &metadata_val : mds_cmd.json_result.get_array()) { + json_spirit::mObject daemon_meta = metadata_val.get_obj(); + if (daemon_meta.count("hostname") == 0) { + dout(1) << "Skipping incomplete metadata entry" << dendl; + continue; + } + + DaemonStatePtr dm = std::make_shared(daemon_state.types); + dm->key = DaemonKey("mds", + daemon_meta.at("name").get_str()); + dm->hostname = daemon_meta.at("hostname").get_str(); + + daemon_meta.erase("name"); + daemon_meta.erase("hostname"); + + for (const auto &i : daemon_meta) { + dm->metadata[i.first] = i.second.get_str(); + } + + daemon_state.insert(dm); + } + + for (auto &metadata_val : mon_cmd.json_result.get_array()) { + json_spirit::mObject daemon_meta = metadata_val.get_obj(); + if (daemon_meta.count("hostname") == 0) { + dout(1) << "Skipping incomplete metadata entry" << dendl; + continue; + } + + DaemonStatePtr dm = std::make_shared(daemon_state.types); + dm->key = DaemonKey("mon", + daemon_meta.at("name").get_str()); + dm->hostname = daemon_meta.at("hostname").get_str(); + + daemon_meta.erase("name"); + daemon_meta.erase("hostname"); + + for (const auto &i : daemon_meta) { + dm->metadata[i.first] = i.second.get_str(); + } + + daemon_state.insert(dm); + } + + for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) { + json_spirit::mObject osd_metadata = osd_metadata_val.get_obj(); + if (osd_metadata.count("hostname") == 0) { + dout(1) << "Skipping incomplete metadata entry" << dendl; + continue; + } + dout(4) << osd_metadata.at("hostname").get_str() << dendl; + + DaemonStatePtr dm = std::make_shared(daemon_state.types); + dm->key = DaemonKey("osd", + stringify(osd_metadata.at("id").get_int())); + dm->hostname = osd_metadata.at("hostname").get_str(); + + osd_metadata.erase("id"); + osd_metadata.erase("hostname"); + + for (const auto &i : osd_metadata) { + dm->metadata[i.first] = i.second.get_str(); + } + + daemon_state.insert(dm); + } +} + +std::map Mgr::load_config() +{ + assert(lock.is_locked_by_me()); + + dout(10) << "listing keys" << dendl; + JSONCommand cmd; + cmd.run(monc, "{\"prefix\": \"config-key ls\"}"); + lock.Unlock(); + cmd.wait(); + lock.Lock(); + assert(cmd.r == 0); + + std::map loaded; + + for (auto &key_str : cmd.json_result.get_array()) { + std::string const key = key_str.get_str(); + dout(20) << "saw key '" << key << "'" << dendl; + + const std::string config_prefix = PyModuleRegistry::config_prefix; + + if (key.substr(0, config_prefix.size()) == config_prefix) { + dout(20) << "fetching '" << key << "'" << dendl; + Command get_cmd; + std::ostringstream cmd_json; + cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}"; + get_cmd.run(monc, cmd_json.str()); + lock.Unlock(); + get_cmd.wait(); + lock.Lock(); + assert(get_cmd.r == 0); + loaded[key] = get_cmd.outbl.to_str(); + } + } + + return loaded; +} + +void Mgr::shutdown() +{ + finisher.queue(new FunctionContext([&](int) { + { + Mutex::Locker l(lock); + monc->sub_unwant("log-info"); + monc->sub_unwant("mgrdigest"); + monc->sub_unwant("fsmap"); + // First stop the server so that we're not taking any more incoming + // requests + server.shutdown(); + } + // after the messenger is stopped, signal modules to shutdown via finisher + py_module_registry->active_shutdown(); + })); + + // Then stop the finisher to ensure its enqueued contexts aren't going + // to touch references to the things we're about to tear down + finisher.wait_for_empty(); + finisher.stop(); +} + +void Mgr::handle_osd_map() +{ + assert(lock.is_locked_by_me()); + + std::set names_exist; + + /** + * When we see a new OSD map, inspect the entity addrs to + * see if they have changed (service restart), and if so + * reload the metadata. + */ + objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) { + for (unsigned int osd_id = 0; osd_id < osd_map.get_num_osds(); ++osd_id) { + if (!osd_map.exists(osd_id)) { + continue; + } + + // Remember which OSDs exist so that we can cull any that don't + names_exist.insert(stringify(osd_id)); + + // Consider whether to update the daemon metadata (new/restarted daemon) + bool update_meta = false; + const auto k = DaemonKey("osd", stringify(osd_id)); + if (daemon_state.is_updating(k)) { + continue; + } + + if (daemon_state.exists(k)) { + auto metadata = daemon_state.get(k); + Mutex::Locker l(metadata->lock); + auto addr_iter = metadata->metadata.find("front_addr"); + if (addr_iter != metadata->metadata.end()) { + const std::string &metadata_addr = addr_iter->second; + const auto &map_addr = osd_map.get_addr(osd_id); + + if (metadata_addr != stringify(map_addr)) { + dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr + << " != " << stringify(map_addr) << dendl; + update_meta = true; + } else { + dout(20) << "OSD[" << osd_id << "] addr unchanged: " + << metadata_addr << dendl; + } + } else { + // Awkward case where daemon went into DaemonState because it + // sent us a report but its metadata didn't get loaded yet + update_meta = true; + } + } else { + update_meta = true; + } + + if (update_meta) { + daemon_state.notify_updating(k); + auto c = new MetadataUpdate(daemon_state, k); + std::ostringstream cmd; + cmd << "{\"prefix\": \"osd metadata\", \"id\": " + << osd_id << "}"; + monc->start_mon_command( + {cmd.str()}, + {}, &c->outbl, &c->outs, c); + } + } + + cluster_state.notify_osdmap(osd_map); + }); + + // TODO: same culling for MonMap + daemon_state.cull("osd", names_exist); +} + +void Mgr::handle_log(MLog *m) +{ + for (const auto &e : m->entries) { + py_module_registry->notify_all(e); + } + + m->put(); +} + +void Mgr::handle_service_map(MServiceMap *m) +{ + dout(10) << "e" << m->service_map.epoch << dendl; + cluster_state.set_service_map(m->service_map); + server.got_service_map(); +} + +bool Mgr::ms_dispatch(Message *m) +{ + dout(4) << *m << dendl; + Mutex::Locker l(lock); + + switch (m->get_type()) { + case MSG_MGR_DIGEST: + handle_mgr_digest(static_cast(m)); + break; + case CEPH_MSG_MON_MAP: + py_module_registry->notify_all("mon_map", ""); + m->put(); + break; + case CEPH_MSG_FS_MAP: + py_module_registry->notify_all("fs_map", ""); + handle_fs_map((MFSMap*)m); + return false; // I shall let this pass through for Client + break; + case CEPH_MSG_OSD_MAP: + handle_osd_map(); + + py_module_registry->notify_all("osd_map", ""); + + // Continuous subscribe, so that we can generate notifications + // for our MgrPyModules + objecter->maybe_request_map(); + m->put(); + break; + case MSG_SERVICE_MAP: + handle_service_map((MServiceMap*)m); + py_module_registry->notify_all("service_map", ""); + m->put(); + break; + case MSG_LOG: + handle_log(static_cast(m)); + break; + + default: + return false; + } + return true; +} + + +void Mgr::handle_fs_map(MFSMap* m) +{ + assert(lock.is_locked_by_me()); + + std::set names_exist; + + const FSMap &new_fsmap = m->get_fsmap(); + + fs_map_cond.Signal(); + + // TODO: callers (e.g. from python land) are potentially going to see + // the new fsmap before we've bothered populating all the resulting + // daemon_state. Maybe we should block python land while we're making + // this kind of update? + + cluster_state.set_fsmap(new_fsmap); + + auto mds_info = new_fsmap.get_mds_info(); + for (const auto &i : mds_info) { + const auto &info = i.second; + + if (!new_fsmap.gid_exists(i.first)){ + continue; + } + + // Remember which MDS exists so that we can cull any that don't + names_exist.insert(info.name); + + const auto k = DaemonKey("mds", info.name); + if (daemon_state.is_updating(k)) { + continue; + } + + bool update = false; + if (daemon_state.exists(k)) { + auto metadata = daemon_state.get(k); + Mutex::Locker l(metadata->lock); + if (metadata->metadata.empty() || + metadata->metadata.count("addr") == 0) { + update = true; + } else { + auto metadata_addr = metadata->metadata.at("addr"); + const auto map_addr = info.addr; + update = metadata_addr != stringify(map_addr); + if (update) { + dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr + << " != " << stringify(map_addr) << dendl; + } + } + } else { + update = true; + } + + if (update) { + daemon_state.notify_updating(k); + auto c = new MetadataUpdate(daemon_state, k); + + // Older MDS daemons don't have addr in the metadata, so + // fake it if the returned metadata doesn't have the field. + c->set_default("addr", stringify(info.addr)); + + std::ostringstream cmd; + cmd << "{\"prefix\": \"mds metadata\", \"who\": \"" + << info.name << "\"}"; + monc->start_mon_command( + {cmd.str()}, + {}, &c->outbl, &c->outs, c); + } + } + daemon_state.cull("mds", names_exist); +} + +bool Mgr::got_mgr_map(const MgrMap& m) +{ + Mutex::Locker l(lock); + dout(10) << m << dendl; + + set old_modules; + cluster_state.with_mgrmap([&](const MgrMap& m) { + old_modules = m.modules; + }); + if (m.modules != old_modules) { + derr << "mgrmap module list changed to (" << m.modules << "), respawn" + << dendl; + return true; + } + + cluster_state.set_mgr_map(m); + + return false; +} + +void Mgr::handle_mgr_digest(MMgrDigest* m) +{ + dout(10) << m->mon_status_json.length() << dendl; + dout(10) << m->health_json.length() << dendl; + cluster_state.load_digest(m); + py_module_registry->notify_all("mon_status", ""); + py_module_registry->notify_all("health", ""); + + // Hack: use this as a tick/opportunity to prompt python-land that + // the pgmap might have changed since last time we were here. + py_module_registry->notify_all("pg_summary", ""); + dout(10) << "done." << dendl; + + m->put(); + + if (!digest_received) { + digest_received = true; + digest_cond.Signal(); + } +} + +void Mgr::tick() +{ + dout(10) << dendl; + server.send_report(); +} + +std::vector Mgr::get_command_set() const +{ + Mutex::Locker l(lock); + + std::vector commands = mgr_commands; + std::vector py_commands = py_module_registry->get_commands(); + commands.insert(commands.end(), py_commands.begin(), py_commands.end()); + return commands; +} + +std::map Mgr::get_services() const +{ + Mutex::Locker l(lock); + + return py_module_registry->get_services(); +} +