1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 John Spray <john.spray@inktank.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 // Include this first to get python headers earlier
15 #include "BaseMgrModule.h"
18 #include "common/errno.h"
19 #include "include/stringify.h"
21 #include "PyFormatter.h"
23 #include "osd/OSDMap.h"
24 #include "mon/MonMap.h"
26 #include "mgr/MgrContext.h"
28 // For ::config_prefix
29 #include "PyModuleRegistry.h"
31 #include "ActivePyModules.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_mgr
36 #define dout_prefix *_dout << "mgr " << __func__ << " "
39 ActivePyModules::ActivePyModules(PyModuleConfig const &config_,
40 DaemonStateIndex &ds, ClusterState &cs,
41 MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
42 Client &client_, Finisher &f)
43 : config_cache(config_), daemon_state(ds), cluster_state(cs),
44 monc(mc), clog(clog_), objecter(objecter_), client(client_), finisher(f),
45 lock("ActivePyModules")
48 ActivePyModules::~ActivePyModules() = default;
50 void ActivePyModules::dump_server(const std::string &hostname,
51 const DaemonStateCollection &dmc,
54 f->dump_string("hostname", hostname);
55 f->open_array_section("services");
56 std::string ceph_version;
58 for (const auto &i : dmc) {
59 Mutex::Locker l(i.second->lock);
60 const auto &key = i.first;
61 const std::string &str_type = key.first;
62 const std::string &svc_name = key.second;
64 // TODO: pick the highest version, and make sure that
65 // somewhere else (during health reporting?) we are
66 // indicating to the user if we see mixed versions
67 auto ver_iter = i.second->metadata.find("ceph_version");
68 if (ver_iter != i.second->metadata.end()) {
69 ceph_version = i.second->metadata.at("ceph_version");
72 f->open_object_section("service");
73 f->dump_string("type", str_type);
74 f->dump_string("id", svc_name);
79 f->dump_string("ceph_version", ceph_version);
84 PyObject *ActivePyModules::get_server_python(const std::string &hostname)
86 PyThreadState *tstate = PyEval_SaveThread();
87 Mutex::Locker l(lock);
88 PyEval_RestoreThread(tstate);
89 dout(10) << " (" << hostname << ")" << dendl;
91 auto dmc = daemon_state.get_by_server(hostname);
94 dump_server(hostname, dmc, &f);
99 PyObject *ActivePyModules::list_servers_python()
101 PyThreadState *tstate = PyEval_SaveThread();
102 Mutex::Locker l(lock);
103 PyEval_RestoreThread(tstate);
104 dout(10) << " >" << dendl;
106 PyFormatter f(false, true);
107 daemon_state.with_daemons_by_server([this, &f]
108 (const std::map<std::string, DaemonStateCollection> &all) {
109 for (const auto &i : all) {
110 const auto &hostname = i.first;
112 f.open_object_section("server");
113 dump_server(hostname, i.second, &f);
121 PyObject *ActivePyModules::get_metadata_python(
122 const std::string &svc_type,
123 const std::string &svc_id)
125 auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
126 if (metadata == nullptr) {
127 derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
131 Mutex::Locker l(metadata->lock);
133 f.dump_string("hostname", metadata->hostname);
134 for (const auto &i : metadata->metadata) {
135 f.dump_string(i.first.c_str(), i.second);
141 PyObject *ActivePyModules::get_daemon_status_python(
142 const std::string &svc_type,
143 const std::string &svc_id)
145 auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
146 if (metadata == nullptr) {
147 derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
151 Mutex::Locker l(metadata->lock);
153 for (const auto &i : metadata->service_status) {
154 f.dump_string(i.first.c_str(), i.second);
159 PyObject *ActivePyModules::get_python(const std::string &what)
161 PyThreadState *tstate = PyEval_SaveThread();
162 Mutex::Locker l(lock);
163 PyEval_RestoreThread(tstate);
165 if (what == "fs_map") {
167 cluster_state.with_fsmap([&f](const FSMap &fsmap) {
171 } else if (what == "osdmap_crush_map_text") {
173 cluster_state.with_osdmap([&rdata](const OSDMap &osd_map){
174 osd_map.crush->encode(rdata, CEPH_FEATURES_SUPPORTED_DEFAULT);
176 std::string crush_text = rdata.to_str();
177 return PyString_FromString(crush_text.c_str());
178 } else if (what.substr(0, 7) == "osd_map") {
180 cluster_state.with_osdmap([&f, &what](const OSDMap &osd_map){
181 if (what == "osd_map") {
183 } else if (what == "osd_map_tree") {
184 osd_map.print_tree(&f, nullptr);
185 } else if (what == "osd_map_crush") {
186 osd_map.crush->dump(&f);
190 } else if (what == "config") {
192 g_conf->show_config(&f);
194 } else if (what == "mon_map") {
196 cluster_state.with_monmap(
197 [&f](const MonMap &monmap) {
202 } else if (what == "service_map") {
204 cluster_state.with_servicemap(
205 [&f](const ServiceMap &service_map) {
206 service_map.dump(&f);
210 } else if (what == "osd_metadata") {
212 auto dmc = daemon_state.get_by_service("osd");
213 for (const auto &i : dmc) {
214 Mutex::Locker l(i.second->lock);
215 f.open_object_section(i.first.second.c_str());
216 f.dump_string("hostname", i.second->hostname);
217 for (const auto &j : i.second->metadata) {
218 f.dump_string(j.first.c_str(), j.second);
223 } else if (what == "pg_summary") {
225 cluster_state.with_pgmap(
226 [&f](const PGMap &pg_map) {
227 std::map<std::string, std::map<std::string, uint32_t> > osds;
228 std::map<std::string, std::map<std::string, uint32_t> > pools;
229 std::map<std::string, uint32_t> all;
230 for (const auto &i : pg_map.pg_stat) {
231 const auto pool = i.first.m_pool;
232 const std::string state = pg_state_string(i.second.state);
233 // Insert to per-pool map
234 pools[stringify(pool)][state]++;
235 for (const auto &osd_id : i.second.acting) {
236 osds[stringify(osd_id)][state]++;
240 f.open_object_section("by_osd");
241 for (const auto &i : osds) {
242 f.open_object_section(i.first.c_str());
243 for (const auto &j : i.second) {
244 f.dump_int(j.first.c_str(), j.second);
249 f.open_object_section("by_pool");
250 for (const auto &i : pools) {
251 f.open_object_section(i.first.c_str());
252 for (const auto &j : i.second) {
253 f.dump_int(j.first.c_str(), j.second);
258 f.open_object_section("all");
259 for (const auto &i : all) {
260 f.dump_int(i.first.c_str(), i.second);
266 } else if (what == "pg_status") {
268 cluster_state.with_pgmap(
269 [&f](const PGMap &pg_map) {
270 pg_map.print_summary(&f, nullptr);
274 } else if (what == "pg_dump") {
276 cluster_state.with_pgmap(
277 [&f](const PGMap &pg_map) {
282 } else if (what == "df") {
285 cluster_state.with_osdmap([this, &f](const OSDMap &osd_map){
286 cluster_state.with_pgmap(
287 [&osd_map, &f](const PGMap &pg_map) {
288 pg_map.dump_fs_stats(nullptr, &f, true);
289 pg_map.dump_pool_stats_full(osd_map, nullptr, &f, true);
293 } else if (what == "osd_stats") {
295 cluster_state.with_pgmap(
296 [&f](const PGMap &pg_map) {
297 pg_map.dump_osd_stats(&f);
300 } else if (what == "health" || what == "mon_status") {
303 if (what == "health") {
304 json = cluster_state.get_health();
305 } else if (what == "mon_status") {
306 json = cluster_state.get_mon_status();
310 f.dump_string("json", json.to_str());
312 } else if (what == "mgr_map") {
314 cluster_state.with_mgrmap([&f](const MgrMap &mgr_map) {
319 derr << "Python module requested unknown data '" << what << "'" << dendl;
324 int ActivePyModules::start_one(std::string const &module_name,
325 PyObject *pClass, const SafeThreadState &pMyThreadState)
327 Mutex::Locker l(lock);
329 assert(modules.count(module_name) == 0);
331 modules[module_name].reset(new ActivePyModule(
335 int r = modules[module_name]->load(this);
339 dout(4) << "Starting thread for " << module_name << dendl;
340 // Giving Thread the module's module_name member as its
341 // char* thread name: thread must not outlive module class lifetime.
342 modules[module_name]->thread.create(
343 modules[module_name]->get_name().c_str());
349 void ActivePyModules::shutdown()
351 Mutex::Locker locker(lock);
353 // Signal modules to drop out of serve() and/or tear down resources
354 for (auto &i : modules) {
355 auto module = i.second.get();
356 const auto& name = i.first;
359 dout(10) << "calling module " << name << " shutdown()" << dendl;
361 dout(10) << "module " << name << " shutdown() returned" << dendl;
365 // For modules implementing serve(), finish the threads where we
366 // were running that.
367 for (auto &i : modules) {
369 dout(10) << "joining module " << i.first << dendl;
370 i.second->thread.join();
371 dout(10) << "joined module " << i.first << dendl;
378 void ActivePyModules::notify_all(const std::string ¬ify_type,
379 const std::string ¬ify_id)
381 Mutex::Locker l(lock);
383 dout(10) << __func__ << ": notify_all " << notify_type << dendl;
384 for (auto& i : modules) {
385 auto module = i.second.get();
386 // Send all python calls down a Finisher to avoid blocking
387 // C++ code, and avoid any potential lock cycles.
388 finisher.queue(new FunctionContext([module, notify_type, notify_id](int r){
389 module->notify(notify_type, notify_id);
394 void ActivePyModules::notify_all(const LogEntry &log_entry)
396 Mutex::Locker l(lock);
398 dout(10) << __func__ << ": notify_all (clog)" << dendl;
399 for (auto& i : modules) {
400 auto module = i.second.get();
401 // Send all python calls down a Finisher to avoid blocking
402 // C++ code, and avoid any potential lock cycles.
404 // Note intentional use of non-reference lambda binding on
405 // log_entry: we take a copy because caller's instance is
406 // probably ephemeral.
407 finisher.queue(new FunctionContext([module, log_entry](int r){
408 module->notify_clog(log_entry);
413 bool ActivePyModules::get_config(const std::string &module_name,
414 const std::string &key, std::string *val) const
416 PyThreadState *tstate = PyEval_SaveThread();
417 Mutex::Locker l(lock);
418 PyEval_RestoreThread(tstate);
420 const std::string global_key = PyModuleRegistry::config_prefix
421 + module_name + "/" + key;
423 dout(4) << __func__ << "key: " << global_key << dendl;
425 if (config_cache.count(global_key)) {
426 *val = config_cache.at(global_key);
433 PyObject *ActivePyModules::get_config_prefix(const std::string &module_name,
434 const std::string &prefix) const
436 PyThreadState *tstate = PyEval_SaveThread();
437 Mutex::Locker l(lock);
438 PyEval_RestoreThread(tstate);
440 const std::string base_prefix = PyModuleRegistry::config_prefix
442 const std::string global_prefix = base_prefix + prefix;
443 dout(4) << __func__ << "prefix: " << global_prefix << dendl;
446 for (auto p = config_cache.lower_bound(global_prefix);
447 p != config_cache.end() && p->first.find(global_prefix) == 0;
449 f.dump_string(p->first.c_str() + base_prefix.size(), p->second);
454 void ActivePyModules::set_config(const std::string &module_name,
455 const std::string &key, const boost::optional<std::string>& val)
457 const std::string global_key = PyModuleRegistry::config_prefix
458 + module_name + "/" + key;
462 PyThreadState *tstate = PyEval_SaveThread();
463 Mutex::Locker l(lock);
464 PyEval_RestoreThread(tstate);
466 config_cache[global_key] = *val;
468 config_cache.erase(global_key);
471 std::ostringstream cmd_json;
473 jf.open_object_section("cmd");
475 jf.dump_string("prefix", "config-key set");
476 jf.dump_string("key", global_key);
477 jf.dump_string("val", *val);
479 jf.dump_string("prefix", "config-key del");
480 jf.dump_string("key", global_key);
484 set_cmd.run(&monc, cmd_json.str());
488 if (set_cmd.r != 0) {
489 // config-key set will fail if mgr's auth key has insufficient
490 // permission to set config keys
491 // FIXME: should this somehow raise an exception back into Python land?
492 dout(0) << "`config-key set " << global_key << " " << val << "` failed: "
493 << cpp_strerror(set_cmd.r) << dendl;
494 dout(0) << "mon returned " << set_cmd.r << ": " << set_cmd.outs << dendl;
498 std::vector<ModuleCommand> ActivePyModules::get_py_commands() const
500 Mutex::Locker l(lock);
502 std::vector<ModuleCommand> result;
503 for (const auto& i : modules) {
504 auto module = i.second.get();
505 auto mod_commands = module->get_commands();
506 for (auto j : mod_commands) {
514 std::vector<MonCommand> ActivePyModules::get_commands() const
516 std::vector<ModuleCommand> commands = get_py_commands();
517 std::vector<MonCommand> result;
518 for (auto &pyc: commands) {
519 result.push_back({pyc.cmdstring, pyc.helpstring, "mgr",
520 pyc.perm, "cli", MonCommand::FLAG_MGR});
526 std::map<std::string, std::string> ActivePyModules::get_services() const
528 std::map<std::string, std::string> result;
529 Mutex::Locker l(lock);
530 for (const auto& i : modules) {
531 const auto &module = i.second.get();
532 std::string svc_str = module->get_uri();
533 if (!svc_str.empty()) {
534 result[module->get_name()] = svc_str;
541 PyObject* ActivePyModules::get_counter_python(
542 const std::string &svc_name,
543 const std::string &svc_id,
544 const std::string &path)
546 PyThreadState *tstate = PyEval_SaveThread();
547 Mutex::Locker l(lock);
548 PyEval_RestoreThread(tstate);
551 f.open_array_section(path.c_str());
553 auto metadata = daemon_state.get(DaemonKey(svc_name, svc_id));
555 Mutex::Locker l2(metadata->lock);
556 if (metadata->perf_counters.instances.count(path)) {
557 auto counter_instance = metadata->perf_counters.instances.at(path);
558 const auto &data = counter_instance.get_data();
559 for (const auto &datapoint : data) {
560 f.open_array_section("datapoint");
561 f.dump_unsigned("t", datapoint.t.sec());
562 f.dump_unsigned("v", datapoint.v);
567 dout(4) << "Missing counter: '" << path << "' ("
568 << svc_name << "." << svc_id << ")" << dendl;
569 dout(20) << "Paths are:" << dendl;
570 for (const auto &i : metadata->perf_counters.instances) {
571 dout(20) << i.first << dendl;
575 dout(4) << "No daemon state for "
576 << svc_name << "." << svc_id << ")" << dendl;
582 PyObject* ActivePyModules::get_perf_schema_python(
583 const std::string svc_type,
584 const std::string &svc_id)
586 PyThreadState *tstate = PyEval_SaveThread();
587 Mutex::Locker l(lock);
588 PyEval_RestoreThread(tstate);
590 DaemonStateCollection daemons;
592 if (svc_type == "") {
593 daemons = std::move(daemon_state.get_all());
594 } else if (svc_id.empty()) {
595 daemons = std::move(daemon_state.get_by_service(svc_type));
597 auto key = DaemonKey(svc_type, svc_id);
598 // so that the below can be a loop in all cases
599 auto got = daemon_state.get(key);
600 if (got != nullptr) {
606 if (!daemons.empty()) {
607 for (auto statepair : daemons) {
608 auto key = statepair.first;
609 auto state = statepair.second;
611 std::ostringstream daemon_name;
612 daemon_name << key.first << "." << key.second;
613 f.open_object_section(daemon_name.str().c_str());
615 Mutex::Locker l(state->lock);
616 for (auto ctr_inst_iter : state->perf_counters.instances) {
617 const auto &counter_name = ctr_inst_iter.first;
618 f.open_object_section(counter_name.c_str());
619 auto type = state->perf_counters.types[counter_name];
620 f.dump_string("description", type.description);
621 if (!type.nick.empty()) {
622 f.dump_string("nick", type.nick);
624 f.dump_unsigned("type", type.type);
625 f.dump_unsigned("priority", type.priority);
631 dout(4) << __func__ << ": No daemon state found for "
632 << svc_type << "." << svc_id << ")" << dendl;
637 PyObject *ActivePyModules::get_context()
639 PyThreadState *tstate = PyEval_SaveThread();
640 Mutex::Locker l(lock);
641 PyEval_RestoreThread(tstate);
643 // Construct a capsule containing ceph context.
644 // Not incrementing/decrementing ref count on the context because
645 // it's the global one and it has process lifetime.
646 auto capsule = PyCapsule_New(g_ceph_context, nullptr, nullptr);
651 * Helper for our wrapped types that take a capsule in their constructor.
653 PyObject *construct_with_capsule(
654 const std::string &module_name,
655 const std::string &clsname,
658 // Look up the OSDMap type which we will construct
659 PyObject *module = PyImport_ImportModule(module_name.c_str());
661 derr << "Failed to import python module:" << dendl;
662 derr << handle_pyerror() << dendl;
666 PyObject *wrapper_type = PyObject_GetAttrString(
667 module, (const char*)clsname.c_str());
669 derr << "Failed to get python type:" << dendl;
670 derr << handle_pyerror() << dendl;
672 assert(wrapper_type);
674 // Construct a capsule containing an OSDMap.
675 auto wrapped_capsule = PyCapsule_New(wrapped, nullptr, nullptr);
676 assert(wrapped_capsule);
678 // Construct the python OSDMap
679 auto pArgs = PyTuple_Pack(1, wrapped_capsule);
680 auto wrapper_instance = PyObject_CallObject(wrapper_type, pArgs);
681 if (wrapper_instance == nullptr) {
682 derr << "Failed to construct python OSDMap:" << dendl;
683 derr << handle_pyerror() << dendl;
685 assert(wrapper_instance != nullptr);
687 Py_DECREF(wrapped_capsule);
689 Py_DECREF(wrapper_type);
692 return wrapper_instance;
695 PyObject *ActivePyModules::get_osdmap()
697 PyThreadState *tstate = PyEval_SaveThread();
698 Mutex::Locker l(lock);
699 PyEval_RestoreThread(tstate);
701 OSDMap *newmap = new OSDMap;
703 cluster_state.with_osdmap([&](const OSDMap& o) {
704 newmap->deepish_copy_from(o);
707 return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap);
710 void ActivePyModules::set_health_checks(const std::string& module_name,
711 health_check_map_t&& checks)
713 Mutex::Locker l(lock);
714 auto p = modules.find(module_name);
715 if (p != modules.end()) {
716 p->second->set_health_checks(std::move(checks));
720 void ActivePyModules::get_health_checks(health_check_map_t *checks)
722 Mutex::Locker l(lock);
723 for (auto& p : modules) {
724 p.second->get_health_checks(checks);
728 void ActivePyModules::set_uri(const std::string& module_name,
729 const std::string &uri)
731 Mutex::Locker l(lock);
733 dout(4) << " module " << module_name << " set URI '" << uri << "'" << dendl;
735 modules[module_name]->set_uri(uri);