X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fmgr%2FMgr.cc;fp=src%2Fceph%2Fsrc%2Fmgr%2FMgr.cc;h=0000000000000000000000000000000000000000;hb=7da45d65be36d36b880cc55c5036e96c24b53f00;hp=c17d0418786454d883437e690bd01b14895a7037;hpb=691462d09d0987b47e112d6ee8740375df3c51b2;p=stor4nfv.git diff --git a/src/ceph/src/mgr/Mgr.cc b/src/ceph/src/mgr/Mgr.cc deleted file mode 100644 index c17d041..0000000 --- a/src/ceph/src/mgr/Mgr.cc +++ /dev/null @@ -1,657 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 John Spray - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - */ - -#include - -#include "osdc/Objecter.h" -#include "client/Client.h" -#include "common/errno.h" -#include "mon/MonClient.h" -#include "include/stringify.h" -#include "global/global_context.h" -#include "global/signal_handler.h" - -#include "mgr/MgrContext.h" -#include "mgr/mgr_commands.h" - -//#include "MgrPyModule.h" -#include "DaemonServer.h" -#include "messages/MMgrBeacon.h" -#include "messages/MMgrDigest.h" -#include "messages/MCommand.h" -#include "messages/MCommandReply.h" -#include "messages/MLog.h" -#include "messages/MServiceMap.h" - -#include "Mgr.h" - -#define dout_context g_ceph_context -#define dout_subsys ceph_subsys_mgr -#undef dout_prefix -#define dout_prefix *_dout << "mgr " << __func__ << " " - - -Mgr::Mgr(MonClient *monc_, const MgrMap& mgrmap, - PyModuleRegistry *py_module_registry_, - Messenger *clientm_, Objecter *objecter_, - Client* client_, LogChannelRef clog_, LogChannelRef audit_clog_) : - monc(monc_), - objecter(objecter_), - client(client_), - client_messenger(clientm_), - lock("Mgr::lock"), - timer(g_ceph_context, lock), - finisher(g_ceph_context, "Mgr", "mgr-fin"), - digest_received(false), - py_module_registry(py_module_registry_), - cluster_state(monc, nullptr, mgrmap), - server(monc, finisher, daemon_state, cluster_state, *py_module_registry, - clog_, audit_clog_), - clog(clog_), - audit_clog(audit_clog_), - initialized(false), - initializing(false) -{ - cluster_state.set_objecter(objecter); -} - - -Mgr::~Mgr() -{ -} - - -/** - * Context for completion of metadata mon commands: take - * the result and stash it in DaemonStateIndex - */ -class MetadataUpdate : public Context -{ - DaemonStateIndex &daemon_state; - DaemonKey key; - - std::map defaults; - -public: - bufferlist outbl; - std::string outs; - - MetadataUpdate(DaemonStateIndex &daemon_state_, const DaemonKey &key_) - : daemon_state(daemon_state_), key(key_) {} - - void set_default(const std::string &k, const std::string &v) - { - defaults[k] = v; - } - - void finish(int r) override - { - daemon_state.clear_updating(key); - if (r == 0) { - if (key.first == "mds") { - json_spirit::mValue json_result; - bool read_ok = json_spirit::read( - outbl.to_str(), json_result); - if (!read_ok) { - dout(1) << "mon returned invalid JSON for " - << key.first << "." << key.second << dendl; - return; - } - - json_spirit::mObject daemon_meta = json_result.get_obj(); - - // Apply any defaults - for (const auto &i : defaults) { - if (daemon_meta.find(i.first) == daemon_meta.end()) { - daemon_meta[i.first] = i.second; - } - } - - DaemonStatePtr state; - if (daemon_state.exists(key)) { - state = daemon_state.get(key); - Mutex::Locker l(state->lock); - daemon_meta.erase("name"); - daemon_meta.erase("hostname"); - state->metadata.clear(); - for (const auto &i : daemon_meta) { - state->metadata[i.first] = i.second.get_str(); - } - } else { - state = std::make_shared(daemon_state.types); - state->key = key; - state->hostname = daemon_meta.at("hostname").get_str(); - - for (const auto &i : daemon_meta) { - state->metadata[i.first] = i.second.get_str(); - } - - daemon_state.insert(state); - } - } else if (key.first == "osd") { - } else { - ceph_abort(); - } - } else { - dout(1) << "mon failed to return metadata for " - << key.first << "." << key.second << ": " - << cpp_strerror(r) << dendl; - } - } -}; - - -void Mgr::background_init(Context *completion) -{ - Mutex::Locker l(lock); - assert(!initializing); - assert(!initialized); - initializing = true; - - finisher.start(); - - finisher.queue(new FunctionContext([this, completion](int r){ - init(); - completion->complete(0); - })); -} - -void Mgr::init() -{ - Mutex::Locker l(lock); - assert(initializing); - assert(!initialized); - - // Start communicating with daemons to learn statistics etc - int r = server.init(monc->get_global_id(), client_messenger->get_myaddr()); - if (r < 0) { - derr << "Initialize server fail"<< dendl; - return; - } - dout(4) << "Initialized server at " << server.get_myaddr() << dendl; - - // Preload all daemon metadata (will subsequently keep this - // up to date by watching maps, so do the initial load before - // we subscribe to any maps) - dout(4) << "Loading daemon metadata..." << dendl; - load_all_metadata(); - - // subscribe to all the maps - monc->sub_want("log-info", 0, 0); - monc->sub_want("mgrdigest", 0, 0); - monc->sub_want("fsmap", 0, 0); - monc->sub_want("servicemap", 0, 0); - - dout(4) << "waiting for OSDMap..." << dendl; - // Subscribe to OSDMap update to pass on to ClusterState - objecter->maybe_request_map(); - - // reset the mon session. we get these maps through subscriptions which - // are stateful with the connection, so even if *we* don't have them a - // previous incarnation sharing the same MonClient may have. - monc->reopen_session(); - - // Start Objecter and wait for OSD map - lock.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch - objecter->wait_for_osd_map(); - lock.Lock(); - - // Populate PGs in ClusterState - objecter->with_osdmap([this](const OSDMap &osd_map) { - cluster_state.notify_osdmap(osd_map); - }); - - // Wait for FSMap - dout(4) << "waiting for FSMap..." << dendl; - while (!cluster_state.have_fsmap()) { - fs_map_cond.Wait(lock); - } - - dout(4) << "waiting for config-keys..." << dendl; - - // Preload config keys (`get` for plugins is to be a fast local - // operation, we we don't have to synchronize these later because - // all sets will come via mgr) - auto loaded_config = load_config(); - - // Wait for MgrDigest... - dout(4) << "waiting for MgrDigest..." << dendl; - while (!digest_received) { - digest_cond.Wait(lock); - } - - // assume finisher already initialized in background_init - dout(4) << "starting python modules..." << dendl; - py_module_registry->active_start(loaded_config, daemon_state, cluster_state, *monc, - clog, *objecter, *client, finisher); - - dout(4) << "Complete." << dendl; - initializing = false; - initialized = true; -} - -void Mgr::load_all_metadata() -{ - assert(lock.is_locked_by_me()); - - JSONCommand mds_cmd; - mds_cmd.run(monc, "{\"prefix\": \"mds metadata\"}"); - JSONCommand osd_cmd; - osd_cmd.run(monc, "{\"prefix\": \"osd metadata\"}"); - JSONCommand mon_cmd; - mon_cmd.run(monc, "{\"prefix\": \"mon metadata\"}"); - - lock.Unlock(); - mds_cmd.wait(); - osd_cmd.wait(); - mon_cmd.wait(); - lock.Lock(); - - assert(mds_cmd.r == 0); - assert(mon_cmd.r == 0); - assert(osd_cmd.r == 0); - - for (auto &metadata_val : mds_cmd.json_result.get_array()) { - json_spirit::mObject daemon_meta = metadata_val.get_obj(); - if (daemon_meta.count("hostname") == 0) { - dout(1) << "Skipping incomplete metadata entry" << dendl; - continue; - } - - DaemonStatePtr dm = std::make_shared(daemon_state.types); - dm->key = DaemonKey("mds", - daemon_meta.at("name").get_str()); - dm->hostname = daemon_meta.at("hostname").get_str(); - - daemon_meta.erase("name"); - daemon_meta.erase("hostname"); - - for (const auto &i : daemon_meta) { - dm->metadata[i.first] = i.second.get_str(); - } - - daemon_state.insert(dm); - } - - for (auto &metadata_val : mon_cmd.json_result.get_array()) { - json_spirit::mObject daemon_meta = metadata_val.get_obj(); - if (daemon_meta.count("hostname") == 0) { - dout(1) << "Skipping incomplete metadata entry" << dendl; - continue; - } - - DaemonStatePtr dm = std::make_shared(daemon_state.types); - dm->key = DaemonKey("mon", - daemon_meta.at("name").get_str()); - dm->hostname = daemon_meta.at("hostname").get_str(); - - daemon_meta.erase("name"); - daemon_meta.erase("hostname"); - - for (const auto &i : daemon_meta) { - dm->metadata[i.first] = i.second.get_str(); - } - - daemon_state.insert(dm); - } - - for (auto &osd_metadata_val : osd_cmd.json_result.get_array()) { - json_spirit::mObject osd_metadata = osd_metadata_val.get_obj(); - if (osd_metadata.count("hostname") == 0) { - dout(1) << "Skipping incomplete metadata entry" << dendl; - continue; - } - dout(4) << osd_metadata.at("hostname").get_str() << dendl; - - DaemonStatePtr dm = std::make_shared(daemon_state.types); - dm->key = DaemonKey("osd", - stringify(osd_metadata.at("id").get_int())); - dm->hostname = osd_metadata.at("hostname").get_str(); - - osd_metadata.erase("id"); - osd_metadata.erase("hostname"); - - for (const auto &i : osd_metadata) { - dm->metadata[i.first] = i.second.get_str(); - } - - daemon_state.insert(dm); - } -} - -std::map Mgr::load_config() -{ - assert(lock.is_locked_by_me()); - - dout(10) << "listing keys" << dendl; - JSONCommand cmd; - cmd.run(monc, "{\"prefix\": \"config-key ls\"}"); - lock.Unlock(); - cmd.wait(); - lock.Lock(); - assert(cmd.r == 0); - - std::map loaded; - - for (auto &key_str : cmd.json_result.get_array()) { - std::string const key = key_str.get_str(); - dout(20) << "saw key '" << key << "'" << dendl; - - const std::string config_prefix = PyModuleRegistry::config_prefix; - - if (key.substr(0, config_prefix.size()) == config_prefix) { - dout(20) << "fetching '" << key << "'" << dendl; - Command get_cmd; - std::ostringstream cmd_json; - cmd_json << "{\"prefix\": \"config-key get\", \"key\": \"" << key << "\"}"; - get_cmd.run(monc, cmd_json.str()); - lock.Unlock(); - get_cmd.wait(); - lock.Lock(); - assert(get_cmd.r == 0); - loaded[key] = get_cmd.outbl.to_str(); - } - } - - return loaded; -} - -void Mgr::shutdown() -{ - finisher.queue(new FunctionContext([&](int) { - { - Mutex::Locker l(lock); - monc->sub_unwant("log-info"); - monc->sub_unwant("mgrdigest"); - monc->sub_unwant("fsmap"); - // First stop the server so that we're not taking any more incoming - // requests - server.shutdown(); - } - // after the messenger is stopped, signal modules to shutdown via finisher - py_module_registry->active_shutdown(); - })); - - // Then stop the finisher to ensure its enqueued contexts aren't going - // to touch references to the things we're about to tear down - finisher.wait_for_empty(); - finisher.stop(); -} - -void Mgr::handle_osd_map() -{ - assert(lock.is_locked_by_me()); - - std::set names_exist; - - /** - * When we see a new OSD map, inspect the entity addrs to - * see if they have changed (service restart), and if so - * reload the metadata. - */ - objecter->with_osdmap([this, &names_exist](const OSDMap &osd_map) { - for (unsigned int osd_id = 0; osd_id < osd_map.get_num_osds(); ++osd_id) { - if (!osd_map.exists(osd_id)) { - continue; - } - - // Remember which OSDs exist so that we can cull any that don't - names_exist.insert(stringify(osd_id)); - - // Consider whether to update the daemon metadata (new/restarted daemon) - bool update_meta = false; - const auto k = DaemonKey("osd", stringify(osd_id)); - if (daemon_state.is_updating(k)) { - continue; - } - - if (daemon_state.exists(k)) { - auto metadata = daemon_state.get(k); - Mutex::Locker l(metadata->lock); - auto addr_iter = metadata->metadata.find("front_addr"); - if (addr_iter != metadata->metadata.end()) { - const std::string &metadata_addr = addr_iter->second; - const auto &map_addr = osd_map.get_addr(osd_id); - - if (metadata_addr != stringify(map_addr)) { - dout(4) << "OSD[" << osd_id << "] addr change " << metadata_addr - << " != " << stringify(map_addr) << dendl; - update_meta = true; - } else { - dout(20) << "OSD[" << osd_id << "] addr unchanged: " - << metadata_addr << dendl; - } - } else { - // Awkward case where daemon went into DaemonState because it - // sent us a report but its metadata didn't get loaded yet - update_meta = true; - } - } else { - update_meta = true; - } - - if (update_meta) { - daemon_state.notify_updating(k); - auto c = new MetadataUpdate(daemon_state, k); - std::ostringstream cmd; - cmd << "{\"prefix\": \"osd metadata\", \"id\": " - << osd_id << "}"; - monc->start_mon_command( - {cmd.str()}, - {}, &c->outbl, &c->outs, c); - } - } - - cluster_state.notify_osdmap(osd_map); - }); - - // TODO: same culling for MonMap - daemon_state.cull("osd", names_exist); -} - -void Mgr::handle_log(MLog *m) -{ - for (const auto &e : m->entries) { - py_module_registry->notify_all(e); - } - - m->put(); -} - -void Mgr::handle_service_map(MServiceMap *m) -{ - dout(10) << "e" << m->service_map.epoch << dendl; - cluster_state.set_service_map(m->service_map); - server.got_service_map(); -} - -bool Mgr::ms_dispatch(Message *m) -{ - dout(4) << *m << dendl; - Mutex::Locker l(lock); - - switch (m->get_type()) { - case MSG_MGR_DIGEST: - handle_mgr_digest(static_cast(m)); - break; - case CEPH_MSG_MON_MAP: - py_module_registry->notify_all("mon_map", ""); - m->put(); - break; - case CEPH_MSG_FS_MAP: - py_module_registry->notify_all("fs_map", ""); - handle_fs_map((MFSMap*)m); - return false; // I shall let this pass through for Client - break; - case CEPH_MSG_OSD_MAP: - handle_osd_map(); - - py_module_registry->notify_all("osd_map", ""); - - // Continuous subscribe, so that we can generate notifications - // for our MgrPyModules - objecter->maybe_request_map(); - m->put(); - break; - case MSG_SERVICE_MAP: - handle_service_map((MServiceMap*)m); - py_module_registry->notify_all("service_map", ""); - m->put(); - break; - case MSG_LOG: - handle_log(static_cast(m)); - break; - - default: - return false; - } - return true; -} - - -void Mgr::handle_fs_map(MFSMap* m) -{ - assert(lock.is_locked_by_me()); - - std::set names_exist; - - const FSMap &new_fsmap = m->get_fsmap(); - - fs_map_cond.Signal(); - - // TODO: callers (e.g. from python land) are potentially going to see - // the new fsmap before we've bothered populating all the resulting - // daemon_state. Maybe we should block python land while we're making - // this kind of update? - - cluster_state.set_fsmap(new_fsmap); - - auto mds_info = new_fsmap.get_mds_info(); - for (const auto &i : mds_info) { - const auto &info = i.second; - - if (!new_fsmap.gid_exists(i.first)){ - continue; - } - - // Remember which MDS exists so that we can cull any that don't - names_exist.insert(info.name); - - const auto k = DaemonKey("mds", info.name); - if (daemon_state.is_updating(k)) { - continue; - } - - bool update = false; - if (daemon_state.exists(k)) { - auto metadata = daemon_state.get(k); - Mutex::Locker l(metadata->lock); - if (metadata->metadata.empty() || - metadata->metadata.count("addr") == 0) { - update = true; - } else { - auto metadata_addr = metadata->metadata.at("addr"); - const auto map_addr = info.addr; - update = metadata_addr != stringify(map_addr); - if (update) { - dout(4) << "MDS[" << info.name << "] addr change " << metadata_addr - << " != " << stringify(map_addr) << dendl; - } - } - } else { - update = true; - } - - if (update) { - daemon_state.notify_updating(k); - auto c = new MetadataUpdate(daemon_state, k); - - // Older MDS daemons don't have addr in the metadata, so - // fake it if the returned metadata doesn't have the field. - c->set_default("addr", stringify(info.addr)); - - std::ostringstream cmd; - cmd << "{\"prefix\": \"mds metadata\", \"who\": \"" - << info.name << "\"}"; - monc->start_mon_command( - {cmd.str()}, - {}, &c->outbl, &c->outs, c); - } - } - daemon_state.cull("mds", names_exist); -} - -bool Mgr::got_mgr_map(const MgrMap& m) -{ - Mutex::Locker l(lock); - dout(10) << m << dendl; - - set old_modules; - cluster_state.with_mgrmap([&](const MgrMap& m) { - old_modules = m.modules; - }); - if (m.modules != old_modules) { - derr << "mgrmap module list changed to (" << m.modules << "), respawn" - << dendl; - return true; - } - - cluster_state.set_mgr_map(m); - - return false; -} - -void Mgr::handle_mgr_digest(MMgrDigest* m) -{ - dout(10) << m->mon_status_json.length() << dendl; - dout(10) << m->health_json.length() << dendl; - cluster_state.load_digest(m); - py_module_registry->notify_all("mon_status", ""); - py_module_registry->notify_all("health", ""); - - // Hack: use this as a tick/opportunity to prompt python-land that - // the pgmap might have changed since last time we were here. - py_module_registry->notify_all("pg_summary", ""); - dout(10) << "done." << dendl; - - m->put(); - - if (!digest_received) { - digest_received = true; - digest_cond.Signal(); - } -} - -void Mgr::tick() -{ - dout(10) << dendl; - server.send_report(); -} - -std::vector Mgr::get_command_set() const -{ - Mutex::Locker l(lock); - - std::vector commands = mgr_commands; - std::vector py_commands = py_module_registry->get_commands(); - commands.insert(commands.end(), py_commands.begin(), py_commands.end()); - return commands; -} - -std::map Mgr::get_services() const -{ - Mutex::Locker l(lock); - - return py_module_registry->get_services(); -} -