Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / mgr / ActivePyModules.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4  * Ceph - scalable distributed file system
5  *
6  * Copyright (C) 2014 John Spray <john.spray@inktank.com>
7  *
8  * This is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License version 2.1, as published by the Free Software
11  * Foundation.  See file COPYING.
12  */
13
14 // Include this first to get python headers earlier
15 #include "BaseMgrModule.h"
16 #include "Gil.h"
17
18 #include "common/errno.h"
19 #include "include/stringify.h"
20
21 #include "PyFormatter.h"
22
23 #include "osd/OSDMap.h"
24 #include "mon/MonMap.h"
25
26 #include "mgr/MgrContext.h"
27
28 // For ::config_prefix
29 #include "PyModuleRegistry.h"
30
31 #include "ActivePyModules.h"
32
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_mgr
35 #undef dout_prefix
36 #define dout_prefix *_dout << "mgr " << __func__ << " "
37
38
39 ActivePyModules::ActivePyModules(PyModuleConfig const &config_,
40           DaemonStateIndex &ds, ClusterState &cs,
41           MonClient &mc, LogChannelRef clog_, Objecter &objecter_,
42           Client &client_, Finisher &f)
43   : config_cache(config_), daemon_state(ds), cluster_state(cs),
44     monc(mc), clog(clog_), objecter(objecter_), client(client_), finisher(f),
45     lock("ActivePyModules")
46 {}
47
48 ActivePyModules::~ActivePyModules() = default;
49
50 void ActivePyModules::dump_server(const std::string &hostname,
51                       const DaemonStateCollection &dmc,
52                       Formatter *f)
53 {
54   f->dump_string("hostname", hostname);
55   f->open_array_section("services");
56   std::string ceph_version;
57
58   for (const auto &i : dmc) {
59     Mutex::Locker l(i.second->lock);
60     const auto &key = i.first;
61     const std::string &str_type = key.first;
62     const std::string &svc_name = key.second;
63
64     // TODO: pick the highest version, and make sure that
65     // somewhere else (during health reporting?) we are
66     // indicating to the user if we see mixed versions
67     auto ver_iter = i.second->metadata.find("ceph_version");
68     if (ver_iter != i.second->metadata.end()) {
69       ceph_version = i.second->metadata.at("ceph_version");
70     }
71
72     f->open_object_section("service");
73     f->dump_string("type", str_type);
74     f->dump_string("id", svc_name);
75     f->close_section();
76   }
77   f->close_section();
78
79   f->dump_string("ceph_version", ceph_version);
80 }
81
82
83
84 PyObject *ActivePyModules::get_server_python(const std::string &hostname)
85 {
86   PyThreadState *tstate = PyEval_SaveThread();
87   Mutex::Locker l(lock);
88   PyEval_RestoreThread(tstate);
89   dout(10) << " (" << hostname << ")" << dendl;
90
91   auto dmc = daemon_state.get_by_server(hostname);
92
93   PyFormatter f;
94   dump_server(hostname, dmc, &f);
95   return f.get();
96 }
97
98
99 PyObject *ActivePyModules::list_servers_python()
100 {
101   PyThreadState *tstate = PyEval_SaveThread();
102   Mutex::Locker l(lock);
103   PyEval_RestoreThread(tstate);
104   dout(10) << " >" << dendl;
105
106   PyFormatter f(false, true);
107   daemon_state.with_daemons_by_server([this, &f]
108       (const std::map<std::string, DaemonStateCollection> &all) {
109     for (const auto &i : all) {
110       const auto &hostname = i.first;
111
112       f.open_object_section("server");
113       dump_server(hostname, i.second, &f);
114       f.close_section();
115     }
116   });
117
118   return f.get();
119 }
120
121 PyObject *ActivePyModules::get_metadata_python(
122   const std::string &svc_type,
123   const std::string &svc_id)
124 {
125   auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
126   if (metadata == nullptr) {
127     derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
128     Py_RETURN_NONE;
129   }
130
131   Mutex::Locker l(metadata->lock);
132   PyFormatter f;
133   f.dump_string("hostname", metadata->hostname);
134   for (const auto &i : metadata->metadata) {
135     f.dump_string(i.first.c_str(), i.second);
136   }
137
138   return f.get();
139 }
140
141 PyObject *ActivePyModules::get_daemon_status_python(
142   const std::string &svc_type,
143   const std::string &svc_id)
144 {
145   auto metadata = daemon_state.get(DaemonKey(svc_type, svc_id));
146   if (metadata == nullptr) {
147     derr << "Requested missing service " << svc_type << "." << svc_id << dendl;
148     Py_RETURN_NONE;
149   }
150
151   Mutex::Locker l(metadata->lock);
152   PyFormatter f;
153   for (const auto &i : metadata->service_status) {
154     f.dump_string(i.first.c_str(), i.second);
155   }
156   return f.get();
157 }
158
159 PyObject *ActivePyModules::get_python(const std::string &what)
160 {
161   PyThreadState *tstate = PyEval_SaveThread();
162   Mutex::Locker l(lock);
163   PyEval_RestoreThread(tstate);
164
165   if (what == "fs_map") {
166     PyFormatter f;
167     cluster_state.with_fsmap([&f](const FSMap &fsmap) {
168       fsmap.dump(&f);
169     });
170     return f.get();
171   } else if (what == "osdmap_crush_map_text") {
172     bufferlist rdata;
173     cluster_state.with_osdmap([&rdata](const OSDMap &osd_map){
174         osd_map.crush->encode(rdata, CEPH_FEATURES_SUPPORTED_DEFAULT);
175     });
176     std::string crush_text = rdata.to_str();
177     return PyString_FromString(crush_text.c_str());
178   } else if (what.substr(0, 7) == "osd_map") {
179     PyFormatter f;
180     cluster_state.with_osdmap([&f, &what](const OSDMap &osd_map){
181       if (what == "osd_map") {
182         osd_map.dump(&f);
183       } else if (what == "osd_map_tree") {
184         osd_map.print_tree(&f, nullptr);
185       } else if (what == "osd_map_crush") {
186         osd_map.crush->dump(&f);
187       }
188     });
189     return f.get();
190   } else if (what == "config") {
191     PyFormatter f;
192     g_conf->show_config(&f);
193     return f.get();
194   } else if (what == "mon_map") {
195     PyFormatter f;
196     cluster_state.with_monmap(
197       [&f](const MonMap &monmap) {
198         monmap.dump(&f);
199       }
200     );
201     return f.get();
202   } else if (what == "service_map") {
203     PyFormatter f;
204     cluster_state.with_servicemap(
205       [&f](const ServiceMap &service_map) {
206         service_map.dump(&f);
207       }
208     );
209     return f.get();
210   } else if (what == "osd_metadata") {
211     PyFormatter f;
212     auto dmc = daemon_state.get_by_service("osd");
213     for (const auto &i : dmc) {
214       Mutex::Locker l(i.second->lock);
215       f.open_object_section(i.first.second.c_str());
216       f.dump_string("hostname", i.second->hostname);
217       for (const auto &j : i.second->metadata) {
218         f.dump_string(j.first.c_str(), j.second);
219       }
220       f.close_section();
221     }
222     return f.get();
223   } else if (what == "pg_summary") {
224     PyFormatter f;
225     cluster_state.with_pgmap(
226         [&f](const PGMap &pg_map) {
227           std::map<std::string, std::map<std::string, uint32_t> > osds;
228           std::map<std::string, std::map<std::string, uint32_t> > pools;
229           std::map<std::string, uint32_t> all;
230           for (const auto &i : pg_map.pg_stat) {
231             const auto pool = i.first.m_pool;
232             const std::string state = pg_state_string(i.second.state);
233             // Insert to per-pool map
234             pools[stringify(pool)][state]++;
235             for (const auto &osd_id : i.second.acting) {
236               osds[stringify(osd_id)][state]++;
237             }
238             all[state]++;
239           }
240           f.open_object_section("by_osd");
241           for (const auto &i : osds) {
242             f.open_object_section(i.first.c_str());
243             for (const auto &j : i.second) {
244               f.dump_int(j.first.c_str(), j.second);
245             }
246             f.close_section();
247           }
248           f.close_section();
249           f.open_object_section("by_pool");
250           for (const auto &i : pools) {
251             f.open_object_section(i.first.c_str());
252             for (const auto &j : i.second) {
253               f.dump_int(j.first.c_str(), j.second);
254             }
255             f.close_section();
256           }
257           f.close_section();
258           f.open_object_section("all");
259           for (const auto &i : all) {
260             f.dump_int(i.first.c_str(), i.second);
261           }
262           f.close_section();
263         }
264     );
265     return f.get();
266   } else if (what == "pg_status") {
267     PyFormatter f;
268     cluster_state.with_pgmap(
269         [&f](const PGMap &pg_map) {
270           pg_map.print_summary(&f, nullptr);
271         }
272     );
273     return f.get();
274   } else if (what == "pg_dump") {
275     PyFormatter f;
276         cluster_state.with_pgmap(
277         [&f](const PGMap &pg_map) {
278           pg_map.dump(&f);
279         }
280     );
281     return f.get();
282   } else if (what == "df") {
283     PyFormatter f;
284
285     cluster_state.with_osdmap([this, &f](const OSDMap &osd_map){
286       cluster_state.with_pgmap(
287           [&osd_map, &f](const PGMap &pg_map) {
288         pg_map.dump_fs_stats(nullptr, &f, true);
289         pg_map.dump_pool_stats_full(osd_map, nullptr, &f, true);
290       });
291     });
292     return f.get();
293   } else if (what == "osd_stats") {
294     PyFormatter f;
295     cluster_state.with_pgmap(
296         [&f](const PGMap &pg_map) {
297       pg_map.dump_osd_stats(&f);
298     });
299     return f.get();
300   } else if (what == "health" || what == "mon_status") {
301     PyFormatter f;
302     bufferlist json;
303     if (what == "health") {
304       json = cluster_state.get_health();
305     } else if (what == "mon_status") {
306       json = cluster_state.get_mon_status();
307     } else {
308       assert(false);
309     }
310     f.dump_string("json", json.to_str());
311     return f.get();
312   } else if (what == "mgr_map") {
313     PyFormatter f;
314     cluster_state.with_mgrmap([&f](const MgrMap &mgr_map) {
315       mgr_map.dump(&f);
316     });
317     return f.get();
318   } else {
319     derr << "Python module requested unknown data '" << what << "'" << dendl;
320     Py_RETURN_NONE;
321   }
322 }
323
324 int ActivePyModules::start_one(std::string const &module_name,
325     PyObject *pClass, const SafeThreadState &pMyThreadState)
326 {
327   Mutex::Locker l(lock);
328
329   assert(modules.count(module_name) == 0);
330
331   modules[module_name].reset(new ActivePyModule(
332       module_name, pClass,
333       pMyThreadState));
334
335   int r = modules[module_name]->load(this);
336   if (r != 0) {
337     return r;
338   } else {
339     dout(4) << "Starting thread for " << module_name << dendl;
340     // Giving Thread the module's module_name member as its
341     // char* thread name: thread must not outlive module class lifetime.
342     modules[module_name]->thread.create(
343         modules[module_name]->get_name().c_str());
344
345     return 0;
346   }
347 }
348
349 void ActivePyModules::shutdown()
350 {
351   Mutex::Locker locker(lock);
352
353   // Signal modules to drop out of serve() and/or tear down resources
354   for (auto &i : modules) {
355     auto module = i.second.get();
356     const auto& name = i.first;
357
358     lock.Unlock();
359     dout(10) << "calling module " << name << " shutdown()" << dendl;
360     module->shutdown();
361     dout(10) << "module " << name << " shutdown() returned" << dendl;
362     lock.Lock();
363   }
364
365   // For modules implementing serve(), finish the threads where we
366   // were running that.
367   for (auto &i : modules) {
368     lock.Unlock();
369     dout(10) << "joining module " << i.first << dendl;
370     i.second->thread.join();
371     dout(10) << "joined module " << i.first << dendl;
372     lock.Lock();
373   }
374
375   modules.clear();
376 }
377
378 void ActivePyModules::notify_all(const std::string &notify_type,
379                      const std::string &notify_id)
380 {
381   Mutex::Locker l(lock);
382
383   dout(10) << __func__ << ": notify_all " << notify_type << dendl;
384   for (auto& i : modules) {
385     auto module = i.second.get();
386     // Send all python calls down a Finisher to avoid blocking
387     // C++ code, and avoid any potential lock cycles.
388     finisher.queue(new FunctionContext([module, notify_type, notify_id](int r){
389       module->notify(notify_type, notify_id);
390     }));
391   }
392 }
393
394 void ActivePyModules::notify_all(const LogEntry &log_entry)
395 {
396   Mutex::Locker l(lock);
397
398   dout(10) << __func__ << ": notify_all (clog)" << dendl;
399   for (auto& i : modules) {
400     auto module = i.second.get();
401     // Send all python calls down a Finisher to avoid blocking
402     // C++ code, and avoid any potential lock cycles.
403     //
404     // Note intentional use of non-reference lambda binding on
405     // log_entry: we take a copy because caller's instance is
406     // probably ephemeral.
407     finisher.queue(new FunctionContext([module, log_entry](int r){
408       module->notify_clog(log_entry);
409     }));
410   }
411 }
412
413 bool ActivePyModules::get_config(const std::string &module_name,
414     const std::string &key, std::string *val) const
415 {
416   PyThreadState *tstate = PyEval_SaveThread();
417   Mutex::Locker l(lock);
418   PyEval_RestoreThread(tstate);
419
420   const std::string global_key = PyModuleRegistry::config_prefix
421     + module_name + "/" + key;
422
423   dout(4) << __func__ << "key: " << global_key << dendl;
424
425   if (config_cache.count(global_key)) {
426     *val = config_cache.at(global_key);
427     return true;
428   } else {
429     return false;
430   }
431 }
432
433 PyObject *ActivePyModules::get_config_prefix(const std::string &module_name,
434     const std::string &prefix) const
435 {
436   PyThreadState *tstate = PyEval_SaveThread();
437   Mutex::Locker l(lock);
438   PyEval_RestoreThread(tstate);
439
440   const std::string base_prefix = PyModuleRegistry::config_prefix
441                                     + module_name + "/";
442   const std::string global_prefix = base_prefix + prefix;
443   dout(4) << __func__ << "prefix: " << global_prefix << dendl;
444
445   PyFormatter f;
446   for (auto p = config_cache.lower_bound(global_prefix);
447        p != config_cache.end() && p->first.find(global_prefix) == 0;
448        ++p) {
449     f.dump_string(p->first.c_str() + base_prefix.size(), p->second);
450   }
451   return f.get();
452 }
453
454 void ActivePyModules::set_config(const std::string &module_name,
455     const std::string &key, const boost::optional<std::string>& val)
456 {
457   const std::string global_key = PyModuleRegistry::config_prefix
458                                    + module_name + "/" + key;
459
460   Command set_cmd;
461   {
462     PyThreadState *tstate = PyEval_SaveThread();
463     Mutex::Locker l(lock);
464     PyEval_RestoreThread(tstate);
465     if (val) {
466       config_cache[global_key] = *val;
467     } else {
468       config_cache.erase(global_key);
469     }
470
471     std::ostringstream cmd_json;
472     JSONFormatter jf;
473     jf.open_object_section("cmd");
474     if (val) {
475       jf.dump_string("prefix", "config-key set");
476       jf.dump_string("key", global_key);
477       jf.dump_string("val", *val);
478     } else {
479       jf.dump_string("prefix", "config-key del");
480       jf.dump_string("key", global_key);
481     }
482     jf.close_section();
483     jf.flush(cmd_json);
484     set_cmd.run(&monc, cmd_json.str());
485   }
486   set_cmd.wait();
487
488   if (set_cmd.r != 0) {
489     // config-key set will fail if mgr's auth key has insufficient
490     // permission to set config keys
491     // FIXME: should this somehow raise an exception back into Python land?
492     dout(0) << "`config-key set " << global_key << " " << val << "` failed: "
493       << cpp_strerror(set_cmd.r) << dendl;
494     dout(0) << "mon returned " << set_cmd.r << ": " << set_cmd.outs << dendl;
495   }
496 }
497
498 std::vector<ModuleCommand> ActivePyModules::get_py_commands() const
499 {
500   Mutex::Locker l(lock);
501
502   std::vector<ModuleCommand> result;
503   for (const auto& i : modules) {
504     auto module = i.second.get();
505     auto mod_commands = module->get_commands();
506     for (auto j : mod_commands) {
507       result.push_back(j);
508     }
509   }
510
511   return result;
512 }
513
514 std::vector<MonCommand> ActivePyModules::get_commands() const
515 {
516   std::vector<ModuleCommand> commands = get_py_commands();
517   std::vector<MonCommand> result;
518   for (auto &pyc: commands) {
519     result.push_back({pyc.cmdstring, pyc.helpstring, "mgr",
520                         pyc.perm, "cli", MonCommand::FLAG_MGR});
521   }
522   return result;
523 }
524
525
526 std::map<std::string, std::string> ActivePyModules::get_services() const
527 {
528   std::map<std::string, std::string> result;
529   Mutex::Locker l(lock);
530   for (const auto& i : modules) {
531     const auto &module = i.second.get();
532     std::string svc_str = module->get_uri();
533     if (!svc_str.empty()) {
534       result[module->get_name()] = svc_str;
535     }
536   }
537
538   return result;
539 }
540
541 PyObject* ActivePyModules::get_counter_python(
542     const std::string &svc_name,
543     const std::string &svc_id,
544     const std::string &path)
545 {
546   PyThreadState *tstate = PyEval_SaveThread();
547   Mutex::Locker l(lock);
548   PyEval_RestoreThread(tstate);
549
550   PyFormatter f;
551   f.open_array_section(path.c_str());
552
553   auto metadata = daemon_state.get(DaemonKey(svc_name, svc_id));
554   if (metadata) {
555     Mutex::Locker l2(metadata->lock);
556     if (metadata->perf_counters.instances.count(path)) {
557       auto counter_instance = metadata->perf_counters.instances.at(path);
558       const auto &data = counter_instance.get_data();
559       for (const auto &datapoint : data) {
560         f.open_array_section("datapoint");
561         f.dump_unsigned("t", datapoint.t.sec());
562         f.dump_unsigned("v", datapoint.v);
563         f.close_section();
564
565       }
566     } else {
567       dout(4) << "Missing counter: '" << path << "' ("
568               << svc_name << "." << svc_id << ")" << dendl;
569       dout(20) << "Paths are:" << dendl;
570       for (const auto &i : metadata->perf_counters.instances) {
571         dout(20) << i.first << dendl;
572       }
573     }
574   } else {
575     dout(4) << "No daemon state for "
576               << svc_name << "." << svc_id << ")" << dendl;
577   }
578   f.close_section();
579   return f.get();
580 }
581
582 PyObject* ActivePyModules::get_perf_schema_python(
583     const std::string svc_type,
584     const std::string &svc_id)
585 {
586   PyThreadState *tstate = PyEval_SaveThread();
587   Mutex::Locker l(lock);
588   PyEval_RestoreThread(tstate);
589
590   DaemonStateCollection daemons;
591
592   if (svc_type == "") {
593     daemons = std::move(daemon_state.get_all());
594   } else if (svc_id.empty()) {
595     daemons = std::move(daemon_state.get_by_service(svc_type));
596   } else {
597     auto key = DaemonKey(svc_type, svc_id);
598     // so that the below can be a loop in all cases
599     auto got = daemon_state.get(key);
600     if (got != nullptr) {
601       daemons[key] = got;
602     }
603   }
604
605   PyFormatter f;
606   if (!daemons.empty()) {
607     for (auto statepair : daemons) {
608       auto key = statepair.first;
609       auto state = statepair.second;
610
611       std::ostringstream daemon_name;
612       daemon_name << key.first << "." << key.second;
613       f.open_object_section(daemon_name.str().c_str());
614
615       Mutex::Locker l(state->lock);
616       for (auto ctr_inst_iter : state->perf_counters.instances) {
617         const auto &counter_name = ctr_inst_iter.first;
618         f.open_object_section(counter_name.c_str());
619         auto type = state->perf_counters.types[counter_name];
620         f.dump_string("description", type.description);
621         if (!type.nick.empty()) {
622           f.dump_string("nick", type.nick);
623         }
624         f.dump_unsigned("type", type.type);
625         f.dump_unsigned("priority", type.priority);
626         f.close_section();
627       }
628       f.close_section();
629     }
630   } else {
631     dout(4) << __func__ << ": No daemon state found for "
632               << svc_type << "." << svc_id << ")" << dendl;
633   }
634   return f.get();
635 }
636
637 PyObject *ActivePyModules::get_context()
638 {
639   PyThreadState *tstate = PyEval_SaveThread();
640   Mutex::Locker l(lock);
641   PyEval_RestoreThread(tstate);
642
643   // Construct a capsule containing ceph context.
644   // Not incrementing/decrementing ref count on the context because
645   // it's the global one and it has process lifetime.
646   auto capsule = PyCapsule_New(g_ceph_context, nullptr, nullptr);
647   return capsule;
648 }
649
650 /**
651  * Helper for our wrapped types that take a capsule in their constructor.
652  */
653 PyObject *construct_with_capsule(
654     const std::string &module_name,
655     const std::string &clsname,
656     void *wrapped)
657 {
658   // Look up the OSDMap type which we will construct
659   PyObject *module = PyImport_ImportModule(module_name.c_str());
660   if (!module) {
661     derr << "Failed to import python module:" << dendl;
662     derr << handle_pyerror() << dendl;
663   }
664   assert(module);
665
666   PyObject *wrapper_type = PyObject_GetAttrString(
667       module, (const char*)clsname.c_str());
668   if (!wrapper_type) {
669     derr << "Failed to get python type:" << dendl;
670     derr << handle_pyerror() << dendl;
671   }
672   assert(wrapper_type);
673
674   // Construct a capsule containing an OSDMap.
675   auto wrapped_capsule = PyCapsule_New(wrapped, nullptr, nullptr);
676   assert(wrapped_capsule);
677
678   // Construct the python OSDMap
679   auto pArgs = PyTuple_Pack(1, wrapped_capsule);
680   auto wrapper_instance = PyObject_CallObject(wrapper_type, pArgs);
681   if (wrapper_instance == nullptr) {
682     derr << "Failed to construct python OSDMap:" << dendl;
683     derr << handle_pyerror() << dendl;
684   }
685   assert(wrapper_instance != nullptr);
686   Py_DECREF(pArgs);
687   Py_DECREF(wrapped_capsule);
688
689   Py_DECREF(wrapper_type);
690   Py_DECREF(module);
691
692   return wrapper_instance;
693 }
694
695 PyObject *ActivePyModules::get_osdmap()
696 {
697   PyThreadState *tstate = PyEval_SaveThread();
698   Mutex::Locker l(lock);
699   PyEval_RestoreThread(tstate);
700
701   OSDMap *newmap = new OSDMap;
702
703   cluster_state.with_osdmap([&](const OSDMap& o) {
704       newmap->deepish_copy_from(o);
705     });
706
707   return construct_with_capsule("mgr_module", "OSDMap", (void*)newmap);
708 }
709
710 void ActivePyModules::set_health_checks(const std::string& module_name,
711                                   health_check_map_t&& checks)
712 {
713   Mutex::Locker l(lock);
714   auto p = modules.find(module_name);
715   if (p != modules.end()) {
716     p->second->set_health_checks(std::move(checks));
717   }
718 }
719
720 void ActivePyModules::get_health_checks(health_check_map_t *checks)
721 {
722   Mutex::Locker l(lock);
723   for (auto& p : modules) {
724     p.second->get_health_checks(checks);
725   }
726 }
727
728 void ActivePyModules::set_uri(const std::string& module_name,
729                         const std::string &uri)
730 {
731   Mutex::Locker l(lock);
732
733   dout(4) << " module " << module_name << " set URI '" << uri << "'" << dendl;
734
735   modules[module_name]->set_uri(uri);
736 }
737