Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / pybind / mgr / mgr_module.py
1
2 import ceph_module  # noqa
3 #import ceph_osdmap  #noqa
4 #import ceph_osdmap_incremental  #noqa
5 #import ceph_crushmap  #noqa
6
7 import json
8 import logging
9 import threading
10 from collections import defaultdict
11
12
13 class CPlusPlusHandler(logging.Handler):
14     def __init__(self, module_inst):
15         super(CPlusPlusHandler, self).__init__()
16         self._module = module_inst
17
18     def emit(self, record):
19         if record.levelno <= logging.DEBUG:
20             ceph_level = 20
21         elif record.levelno <= logging.INFO:
22             ceph_level = 4
23         elif record.levelno <= logging.WARNING:
24             ceph_level = 1
25         else:
26             ceph_level = 0
27
28         self._module._ceph_log(ceph_level, self.format(record))
29
30
31 def configure_logger(module_inst, name):
32     logger = logging.getLogger(name)
33
34
35     # Don't filter any logs at the python level, leave it to C++
36     logger.setLevel(logging.DEBUG)
37
38     # FIXME: we should learn the log level from C++ land, and then
39     # avoid calling the C++ level log when we know a message is of
40     # an insufficient level to be ultimately output
41     logger.addHandler(CPlusPlusHandler(module_inst))
42
43     return logger
44
45
46 def unconfigure_logger(module_inst, name):
47     logger = logging.getLogger(name)
48     rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
49     for h in rm_handlers:
50         logger.removeHandler(h)
51
52 class CommandResult(object):
53     """
54     Use with MgrModule.send_command
55     """
56     def __init__(self, tag):
57         self.ev = threading.Event()
58         self.outs = ""
59         self.outb = ""
60         self.r = 0
61
62         # This is just a convenience for notifications from
63         # C++ land, to avoid passing addresses around in messages.
64         self.tag = tag
65
66     def complete(self, r, outb, outs):
67         self.r = r
68         self.outb = outb
69         self.outs = outs
70         self.ev.set()
71
72     def wait(self):
73         self.ev.wait()
74         return self.r, self.outb, self.outs
75
76
77 class OSDMap(ceph_module.BasePyOSDMap):
78     def get_epoch(self):
79         return self._get_epoch()
80
81     def get_crush_version(self):
82         return self._get_crush_version()
83
84     def dump(self):
85         return self._dump()
86
87     def new_incremental(self):
88         return self._new_incremental()
89
90     def apply_incremental(self, inc):
91         return self._apply_incremental(inc)
92
93     def get_crush(self):
94         return self._get_crush()
95
96     def get_pools_by_take(self, take):
97         return self._get_pools_by_take(take).get('pools', [])
98
99     def calc_pg_upmaps(self, inc,
100                        max_deviation=.01, max_iterations=10, pools=[]):
101         return self._calc_pg_upmaps(
102             inc,
103             max_deviation, max_iterations, pools)
104
105     def map_pool_pgs_up(self, poolid):
106         return self._map_pool_pgs_up(poolid)
107
108 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
109     def get_epoch(self):
110         return self._get_epoch()
111
112     def dump(self):
113         return self._dump()
114
115     def set_osd_reweights(self, weightmap):
116         """
117         weightmap is a dict, int to float.  e.g. { 0: .9, 1: 1.0, 3: .997 }
118         """
119         return self._set_osd_reweights(weightmap)
120
121     def set_crush_compat_weight_set_weights(self, weightmap):
122         """
123         weightmap is a dict, int to float.  devices only.  e.g.,
124         { 0: 3.4, 1: 3.3, 2: 3.334 }
125         """
126         return self._set_crush_compat_weight_set_weights(weightmap)
127
128 class CRUSHMap(ceph_module.BasePyCRUSH):
129     def dump(self):
130         return self._dump()
131
132     def get_item_weight(self, item):
133         return self._get_item_weight(item)
134
135     def get_item_name(self, item):
136         return self._get_item_name(item)
137
138     def find_takes(self):
139         return self._find_takes().get('takes', [])
140
141     def get_take_weight_osd_map(self, root):
142         uglymap = self._get_take_weight_osd_map(root)
143         return { int(k): v for k, v in uglymap.get('weights', {}).iteritems() }
144
145 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
146     """
147     Standby modules only implement a serve and shutdown method, they
148     are not permitted to implement commands and they do not receive
149     any notifications.
150
151     They only have access to the mgrmap (for acecssing service URI info
152     from their active peer), and to configuration settings (read only).
153     """
154
155     def __init__(self, module_name, capsule):
156         super(MgrStandbyModule, self).__init__(capsule)
157         self.module_name = module_name
158         self._logger = configure_logger(self, module_name)
159
160     def __del__(self):
161         unconfigure_logger(self, self.module_name)
162
163     @property
164     def log(self):
165         return self._logger
166
167     def serve(self):
168         """
169         The serve method is mandatory for standby modules.
170         :return:
171         """
172         raise NotImplementedError()
173
174     def get_mgr_id(self):
175         return self._ceph_get_mgr_id()
176
177     def get_config(self, key):
178         return self._ceph_get_config(key)
179
180     def get_active_uri(self):
181         return self._ceph_get_active_uri()
182
183     def get_localized_config(self, key, default=None):
184         r = self.get_config(self.get_mgr_id() + '/' + key)
185         if r is None:
186             r = self.get_config(key)
187
188         if r is None:
189             r = default
190         return r
191
192 class MgrModule(ceph_module.BaseMgrModule):
193     COMMANDS = []
194
195     # Priority definitions for perf counters
196     PRIO_CRITICAL = 10
197     PRIO_INTERESTING = 8
198     PRIO_USEFUL = 5
199     PRIO_UNINTERESTING = 2
200     PRIO_DEBUGONLY = 0
201
202     # counter value types
203     PERFCOUNTER_TIME = 1
204     PERFCOUNTER_U64 = 2
205
206     # counter types
207     PERFCOUNTER_LONGRUNAVG = 4
208     PERFCOUNTER_COUNTER = 8
209     PERFCOUNTER_HISTOGRAM = 0x10
210     PERFCOUNTER_TYPE_MASK = ~2
211
212     def __init__(self, module_name, py_modules_ptr, this_ptr):
213         self.module_name = module_name
214
215         # If we're taking over from a standby module, let's make sure
216         # its logger was unconfigured before we hook ours up
217         unconfigure_logger(self, self.module_name)
218         self._logger = configure_logger(self, module_name)
219
220         super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
221
222         self._version = self._ceph_get_version()
223
224         self._perf_schema_cache = None
225
226     def __del__(self):
227         unconfigure_logger(self, self.module_name)
228
229     def update_perf_schema(self, daemon_type, daemon_name):
230         """
231         For plugins that use get_all_perf_counters, call this when
232         receiving a notification of type 'perf_schema_update', to
233         prompt MgrModule to update its cache of counter schemas.
234
235         :param daemon_type:
236         :param daemon_name:
237         :return:
238         """
239
240     @property
241     def log(self):
242         return self._logger
243
244     @property
245     def version(self):
246         return self._version
247
248     def get_context(self):
249         """
250         :return: a Python capsule containing a C++ CephContext pointer
251         """
252         return self._ceph_get_context()
253
254     def notify(self, notify_type, notify_id):
255         """
256         Called by the ceph-mgr service to notify the Python plugin
257         that new state is available.
258         """
259         pass
260
261     def serve(self):
262         """
263         Called by the ceph-mgr service to start any server that
264         is provided by this Python plugin.  The implementation
265         of this function should block until ``shutdown`` is called.
266
267         You *must* implement ``shutdown`` if you implement ``serve``
268         """
269         pass
270
271     def shutdown(self):
272         """
273         Called by the ceph-mgr service to request that this
274         module drop out of its serve() function.  You do not
275         need to implement this if you do not implement serve()
276
277         :return: None
278         """
279         pass
280
281     def get(self, data_name):
282         """
283         Called by the plugin to load some cluster state from ceph-mgr
284         """
285         return self._ceph_get(data_name)
286
287     def get_server(self, hostname):
288         """
289         Called by the plugin to load information about a particular
290         node from ceph-mgr.
291
292         :param hostname: a hostame
293         """
294         return self._ceph_get_server(hostname)
295
296     def get_perf_schema(self, svc_type, svc_name):
297         """
298         Called by the plugin to fetch perf counter schema info.
299         svc_name can be nullptr, as can svc_type, in which case
300         they are wildcards
301
302         :param svc_type:
303         :param svc_name:
304         :return: list of dicts describing the counters requested
305         """
306         return self._ceph_get_perf_schema(svc_type, svc_name)
307
308     def get_counter(self, svc_type, svc_name, path):
309         """
310         Called by the plugin to fetch data for a particular perf counter
311         on a particular service.
312
313         :param svc_type:
314         :param svc_name:
315         :param path:
316         :return: A list of two-element lists containing time and value
317         """
318         return self._ceph_get_counter(svc_type, svc_name, path)
319
320     def list_servers(self):
321         """
322         Like ``get_server``, but instead of returning information
323         about just one node, return all the nodes in an array.
324         """
325         return self._ceph_get_server(None)
326
327     def get_metadata(self, svc_type, svc_id):
328         """
329         Fetch the metadata for a particular service.
330
331         :param svc_type: string (e.g., 'mds', 'osd', 'mon')
332         :param svc_id: string
333         :return: dict
334         """
335         return self._ceph_get_metadata(svc_type, svc_id)
336
337     def get_daemon_status(self, svc_type, svc_id):
338         """
339         Fetch the latest status for a particular service daemon.
340
341         :param svc_type: string (e.g., 'rgw')
342         :param svc_id: string
343         :return: dict
344         """
345         return self._ceph_get_daemon_status(svc_type, svc_id)
346
347     def send_command(self, *args, **kwargs):
348         """
349         Called by the plugin to send a command to the mon
350         cluster.
351         """
352         self._ceph_send_command(*args, **kwargs)
353
354     def set_health_checks(self, checks):
355         """
356         Set module's health checks
357
358         Set the module's current map of health checks.  Argument is a
359         dict of check names to info, in this form:
360
361            {
362              'CHECK_FOO': {
363                'severity': 'warning',           # or 'error'
364                'summary': 'summary string',
365                'detail': [ 'list', 'of', 'detail', 'strings' ],
366               },
367              'CHECK_BAR': {
368                'severity': 'error',
369                'summary': 'bars are bad',
370                'detail': [ 'too hard' ],
371              },
372            }
373
374         :param list: dict of health check dicts
375         """
376         self._ceph_set_health_checks(checks)
377
378     def handle_command(self, cmd):
379         """
380         Called by ceph-mgr to request the plugin to handle one
381         of the commands that it declared in self.COMMANDS
382
383         Return a status code, an output buffer, and an
384         output string.  The output buffer is for data results,
385         the output string is for informative text.
386
387         :param cmd: dict, from Ceph's cmdmap_t
388
389         :return: 3-tuple of (int, str, str)
390         """
391
392         # Should never get called if they didn't declare
393         # any ``COMMANDS``
394         raise NotImplementedError()
395
396     def get_mgr_id(self):
397         """
398         Retrieve the mgr id.
399
400         :return: str
401         """
402         return self._ceph_get_mgr_id()
403
404     def get_config(self, key, default=None):
405         """
406         Retrieve the value of a persistent configuration setting
407
408         :param key: str
409         :return: str
410         """
411         r = self._ceph_get_config(key)
412         if r is None:
413             return default
414         else:
415             return r
416
417     def get_config_prefix(self, key_prefix):
418         """
419         Retrieve a dict of config values with the given prefix
420
421         :param key_prefix: str
422         :return: str
423         """
424         return self._ceph_get_config_prefix(key_prefix)
425
426     def get_localized_config(self, key, default=None):
427         """
428         Retrieve localized configuration for this ceph-mgr instance
429         :param key: str
430         :param default: str
431         :return: str
432         """
433         r = self.get_config(self.get_mgr_id() + '/' + key)
434         if r is None:
435             r = self.get_config(key)
436
437         if r is None:
438             r = default
439         return r
440
441     def set_config(self, key, val):
442         """
443         Set the value of a persistent configuration setting
444
445         :param key: str
446         :param val: str
447         """
448         self._ceph_set_config(key, val)
449
450     def set_localized_config(self, key, val):
451         """
452         Set localized configuration for this ceph-mgr instance
453         :param key: str
454         :param default: str
455         :return: str
456         """
457         return self._ceph_set_config(self.get_mgr_id() + '/' + key, val)
458
459     def set_config_json(self, key, val):
460         """
461         Helper for setting json-serialized-config
462
463         :param key: str
464         :param val: json-serializable object
465         """
466         self._ceph_set_config(key, json.dumps(val))
467
468     def get_config_json(self, key):
469         """
470         Helper for getting json-serialized config
471
472         :param key: str
473         :return: object
474         """
475         raw = self.get_config(key)
476         if raw is None:
477             return None
478         else:
479             return json.loads(raw)
480
481     def self_test(self):
482         """
483         Run a self-test on the module. Override this function and implement
484         a best as possible self-test for (automated) testing of the module
485         :return: bool
486         """
487         pass
488
489     def get_osdmap(self):
490         """
491         Get a handle to an OSDMap.  If epoch==0, get a handle for the latest
492         OSDMap.
493         :return: OSDMap
494         """
495         return self._ceph_get_osdmap()
496
497     def get_all_perf_counters(self, prio_limit=PRIO_USEFUL):
498         """
499         Return the perf counters currently known to this ceph-mgr
500         instance, filtered by priority equal to or greater than `prio_limit`.
501
502         The result us a map of string to dict, associating services
503         (like "osd.123") with their counters.  The counter
504         dict for each service maps counter paths to a counter
505         info structure, which is the information from
506         the schema, plus an additional "value" member with the latest
507         value.
508         """
509
510         result = defaultdict(dict)
511
512         # TODO: improve C++->Python interface to return just
513         # the latest if that's all we want.
514         def get_latest(daemon_type, daemon_name, counter):
515             data = self.get_counter(daemon_type, daemon_name, counter)[counter]
516             if data:
517                 return data[-1][1]
518             else:
519                 return 0
520
521         for server in self.list_servers():
522             for service in server['services']:
523                 if service['type'] not in ("mds", "osd", "mon"):
524                     continue
525
526                 schema = self.get_perf_schema(service['type'], service['id'])
527                 if not schema:
528                     self.log.warn("No perf counter schema for {0}.{1}".format(
529                         service['type'], service['id']
530                     ))
531                     continue
532
533                 # Value is returned in a potentially-multi-service format,
534                 # get just the service we're asking about
535                 svc_full_name = "{0}.{1}".format(service['type'], service['id'])
536                 schema = schema[svc_full_name]
537
538                 # Populate latest values
539                 for counter_path, counter_schema in schema.items():
540                     # self.log.debug("{0}: {1}".format(
541                     #     counter_path, json.dumps(counter_schema)
542                     # ))
543                     if counter_schema['priority'] < prio_limit:
544                         continue
545
546                     counter_info = counter_schema
547                     counter_info['value'] = get_latest(service['type'], service['id'], counter_path)
548                     result[svc_full_name][counter_path] = counter_info
549
550         self.log.debug("returning {0} counter".format(len(result)))
551
552         return result
553
554     def set_uri(self, uri):
555         """
556         If the module exposes a service, then call this to publish the
557         address once it is available.
558
559         :return: a string
560         """
561         return self._ceph_set_uri(uri)