2 import ceph_module # noqa
3 #import ceph_osdmap #noqa
4 #import ceph_osdmap_incremental #noqa
5 #import ceph_crushmap #noqa
10 from collections import defaultdict
13 class CPlusPlusHandler(logging.Handler):
14 def __init__(self, module_inst):
15 super(CPlusPlusHandler, self).__init__()
16 self._module = module_inst
18 def emit(self, record):
19 if record.levelno <= logging.DEBUG:
21 elif record.levelno <= logging.INFO:
23 elif record.levelno <= logging.WARNING:
28 self._module._ceph_log(ceph_level, self.format(record))
31 def configure_logger(module_inst, name):
32 logger = logging.getLogger(name)
35 # Don't filter any logs at the python level, leave it to C++
36 logger.setLevel(logging.DEBUG)
38 # FIXME: we should learn the log level from C++ land, and then
39 # avoid calling the C++ level log when we know a message is of
40 # an insufficient level to be ultimately output
41 logger.addHandler(CPlusPlusHandler(module_inst))
46 def unconfigure_logger(module_inst, name):
47 logger = logging.getLogger(name)
48 rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
50 logger.removeHandler(h)
52 class CommandResult(object):
54 Use with MgrModule.send_command
56 def __init__(self, tag):
57 self.ev = threading.Event()
62 # This is just a convenience for notifications from
63 # C++ land, to avoid passing addresses around in messages.
66 def complete(self, r, outb, outs):
74 return self.r, self.outb, self.outs
77 class OSDMap(ceph_module.BasePyOSDMap):
79 return self._get_epoch()
81 def get_crush_version(self):
82 return self._get_crush_version()
87 def new_incremental(self):
88 return self._new_incremental()
90 def apply_incremental(self, inc):
91 return self._apply_incremental(inc)
94 return self._get_crush()
96 def get_pools_by_take(self, take):
97 return self._get_pools_by_take(take).get('pools', [])
99 def calc_pg_upmaps(self, inc,
100 max_deviation=.01, max_iterations=10, pools=[]):
101 return self._calc_pg_upmaps(
103 max_deviation, max_iterations, pools)
105 def map_pool_pgs_up(self, poolid):
106 return self._map_pool_pgs_up(poolid)
108 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
110 return self._get_epoch()
115 def set_osd_reweights(self, weightmap):
117 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
119 return self._set_osd_reweights(weightmap)
121 def set_crush_compat_weight_set_weights(self, weightmap):
123 weightmap is a dict, int to float. devices only. e.g.,
124 { 0: 3.4, 1: 3.3, 2: 3.334 }
126 return self._set_crush_compat_weight_set_weights(weightmap)
128 class CRUSHMap(ceph_module.BasePyCRUSH):
132 def get_item_weight(self, item):
133 return self._get_item_weight(item)
135 def get_item_name(self, item):
136 return self._get_item_name(item)
138 def find_takes(self):
139 return self._find_takes().get('takes', [])
141 def get_take_weight_osd_map(self, root):
142 uglymap = self._get_take_weight_osd_map(root)
143 return { int(k): v for k, v in uglymap.get('weights', {}).iteritems() }
145 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
147 Standby modules only implement a serve and shutdown method, they
148 are not permitted to implement commands and they do not receive
151 They only have access to the mgrmap (for acecssing service URI info
152 from their active peer), and to configuration settings (read only).
155 def __init__(self, module_name, capsule):
156 super(MgrStandbyModule, self).__init__(capsule)
157 self.module_name = module_name
158 self._logger = configure_logger(self, module_name)
161 unconfigure_logger(self, self.module_name)
169 The serve method is mandatory for standby modules.
172 raise NotImplementedError()
174 def get_mgr_id(self):
175 return self._ceph_get_mgr_id()
177 def get_config(self, key):
178 return self._ceph_get_config(key)
180 def get_active_uri(self):
181 return self._ceph_get_active_uri()
183 def get_localized_config(self, key, default=None):
184 r = self.get_config(self.get_mgr_id() + '/' + key)
186 r = self.get_config(key)
192 class MgrModule(ceph_module.BaseMgrModule):
195 # Priority definitions for perf counters
199 PRIO_UNINTERESTING = 2
202 # counter value types
207 PERFCOUNTER_LONGRUNAVG = 4
208 PERFCOUNTER_COUNTER = 8
209 PERFCOUNTER_HISTOGRAM = 0x10
210 PERFCOUNTER_TYPE_MASK = ~2
212 def __init__(self, module_name, py_modules_ptr, this_ptr):
213 self.module_name = module_name
215 # If we're taking over from a standby module, let's make sure
216 # its logger was unconfigured before we hook ours up
217 unconfigure_logger(self, self.module_name)
218 self._logger = configure_logger(self, module_name)
220 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
222 self._version = self._ceph_get_version()
224 self._perf_schema_cache = None
227 unconfigure_logger(self, self.module_name)
229 def update_perf_schema(self, daemon_type, daemon_name):
231 For plugins that use get_all_perf_counters, call this when
232 receiving a notification of type 'perf_schema_update', to
233 prompt MgrModule to update its cache of counter schemas.
248 def get_context(self):
250 :return: a Python capsule containing a C++ CephContext pointer
252 return self._ceph_get_context()
254 def notify(self, notify_type, notify_id):
256 Called by the ceph-mgr service to notify the Python plugin
257 that new state is available.
263 Called by the ceph-mgr service to start any server that
264 is provided by this Python plugin. The implementation
265 of this function should block until ``shutdown`` is called.
267 You *must* implement ``shutdown`` if you implement ``serve``
273 Called by the ceph-mgr service to request that this
274 module drop out of its serve() function. You do not
275 need to implement this if you do not implement serve()
281 def get(self, data_name):
283 Called by the plugin to load some cluster state from ceph-mgr
285 return self._ceph_get(data_name)
287 def get_server(self, hostname):
289 Called by the plugin to load information about a particular
292 :param hostname: a hostame
294 return self._ceph_get_server(hostname)
296 def get_perf_schema(self, svc_type, svc_name):
298 Called by the plugin to fetch perf counter schema info.
299 svc_name can be nullptr, as can svc_type, in which case
304 :return: list of dicts describing the counters requested
306 return self._ceph_get_perf_schema(svc_type, svc_name)
308 def get_counter(self, svc_type, svc_name, path):
310 Called by the plugin to fetch data for a particular perf counter
311 on a particular service.
316 :return: A list of two-element lists containing time and value
318 return self._ceph_get_counter(svc_type, svc_name, path)
320 def list_servers(self):
322 Like ``get_server``, but instead of returning information
323 about just one node, return all the nodes in an array.
325 return self._ceph_get_server(None)
327 def get_metadata(self, svc_type, svc_id):
329 Fetch the metadata for a particular service.
331 :param svc_type: string (e.g., 'mds', 'osd', 'mon')
332 :param svc_id: string
335 return self._ceph_get_metadata(svc_type, svc_id)
337 def get_daemon_status(self, svc_type, svc_id):
339 Fetch the latest status for a particular service daemon.
341 :param svc_type: string (e.g., 'rgw')
342 :param svc_id: string
345 return self._ceph_get_daemon_status(svc_type, svc_id)
347 def send_command(self, *args, **kwargs):
349 Called by the plugin to send a command to the mon
352 self._ceph_send_command(*args, **kwargs)
354 def set_health_checks(self, checks):
356 Set module's health checks
358 Set the module's current map of health checks. Argument is a
359 dict of check names to info, in this form:
363 'severity': 'warning', # or 'error'
364 'summary': 'summary string',
365 'detail': [ 'list', 'of', 'detail', 'strings' ],
369 'summary': 'bars are bad',
370 'detail': [ 'too hard' ],
374 :param list: dict of health check dicts
376 self._ceph_set_health_checks(checks)
378 def handle_command(self, cmd):
380 Called by ceph-mgr to request the plugin to handle one
381 of the commands that it declared in self.COMMANDS
383 Return a status code, an output buffer, and an
384 output string. The output buffer is for data results,
385 the output string is for informative text.
387 :param cmd: dict, from Ceph's cmdmap_t
389 :return: 3-tuple of (int, str, str)
392 # Should never get called if they didn't declare
394 raise NotImplementedError()
396 def get_mgr_id(self):
402 return self._ceph_get_mgr_id()
404 def get_config(self, key, default=None):
406 Retrieve the value of a persistent configuration setting
411 r = self._ceph_get_config(key)
417 def get_config_prefix(self, key_prefix):
419 Retrieve a dict of config values with the given prefix
421 :param key_prefix: str
424 return self._ceph_get_config_prefix(key_prefix)
426 def get_localized_config(self, key, default=None):
428 Retrieve localized configuration for this ceph-mgr instance
433 r = self.get_config(self.get_mgr_id() + '/' + key)
435 r = self.get_config(key)
441 def set_config(self, key, val):
443 Set the value of a persistent configuration setting
448 self._ceph_set_config(key, val)
450 def set_localized_config(self, key, val):
452 Set localized configuration for this ceph-mgr instance
457 return self._ceph_set_config(self.get_mgr_id() + '/' + key, val)
459 def set_config_json(self, key, val):
461 Helper for setting json-serialized-config
464 :param val: json-serializable object
466 self._ceph_set_config(key, json.dumps(val))
468 def get_config_json(self, key):
470 Helper for getting json-serialized config
475 raw = self.get_config(key)
479 return json.loads(raw)
483 Run a self-test on the module. Override this function and implement
484 a best as possible self-test for (automated) testing of the module
489 def get_osdmap(self):
491 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
495 return self._ceph_get_osdmap()
497 def get_all_perf_counters(self, prio_limit=PRIO_USEFUL):
499 Return the perf counters currently known to this ceph-mgr
500 instance, filtered by priority equal to or greater than `prio_limit`.
502 The result us a map of string to dict, associating services
503 (like "osd.123") with their counters. The counter
504 dict for each service maps counter paths to a counter
505 info structure, which is the information from
506 the schema, plus an additional "value" member with the latest
510 result = defaultdict(dict)
512 # TODO: improve C++->Python interface to return just
513 # the latest if that's all we want.
514 def get_latest(daemon_type, daemon_name, counter):
515 data = self.get_counter(daemon_type, daemon_name, counter)[counter]
521 for server in self.list_servers():
522 for service in server['services']:
523 if service['type'] not in ("mds", "osd", "mon"):
526 schema = self.get_perf_schema(service['type'], service['id'])
528 self.log.warn("No perf counter schema for {0}.{1}".format(
529 service['type'], service['id']
533 # Value is returned in a potentially-multi-service format,
534 # get just the service we're asking about
535 svc_full_name = "{0}.{1}".format(service['type'], service['id'])
536 schema = schema[svc_full_name]
538 # Populate latest values
539 for counter_path, counter_schema in schema.items():
540 # self.log.debug("{0}: {1}".format(
541 # counter_path, json.dumps(counter_schema)
543 if counter_schema['priority'] < prio_limit:
546 counter_info = counter_schema
547 counter_info['value'] = get_latest(service['type'], service['id'], counter_path)
548 result[svc_full_name][counter_path] = counter_info
550 self.log.debug("returning {0} counter".format(len(result)))
554 def set_uri(self, uri):
556 If the module exposes a service, then call this to publish the
557 address once it is available.
561 return self._ceph_set_uri(uri)