+++ /dev/null
-
-import ceph_module # noqa
-#import ceph_osdmap #noqa
-#import ceph_osdmap_incremental #noqa
-#import ceph_crushmap #noqa
-
-import json
-import logging
-import threading
-from collections import defaultdict
-
-
-class CPlusPlusHandler(logging.Handler):
- def __init__(self, module_inst):
- super(CPlusPlusHandler, self).__init__()
- self._module = module_inst
-
- def emit(self, record):
- if record.levelno <= logging.DEBUG:
- ceph_level = 20
- elif record.levelno <= logging.INFO:
- ceph_level = 4
- elif record.levelno <= logging.WARNING:
- ceph_level = 1
- else:
- ceph_level = 0
-
- self._module._ceph_log(ceph_level, self.format(record))
-
-
-def configure_logger(module_inst, name):
- logger = logging.getLogger(name)
-
-
- # Don't filter any logs at the python level, leave it to C++
- logger.setLevel(logging.DEBUG)
-
- # FIXME: we should learn the log level from C++ land, and then
- # avoid calling the C++ level log when we know a message is of
- # an insufficient level to be ultimately output
- logger.addHandler(CPlusPlusHandler(module_inst))
-
- return logger
-
-
-def unconfigure_logger(module_inst, name):
- logger = logging.getLogger(name)
- rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
- for h in rm_handlers:
- logger.removeHandler(h)
-
-class CommandResult(object):
- """
- Use with MgrModule.send_command
- """
- def __init__(self, tag):
- self.ev = threading.Event()
- self.outs = ""
- self.outb = ""
- self.r = 0
-
- # This is just a convenience for notifications from
- # C++ land, to avoid passing addresses around in messages.
- self.tag = tag
-
- def complete(self, r, outb, outs):
- self.r = r
- self.outb = outb
- self.outs = outs
- self.ev.set()
-
- def wait(self):
- self.ev.wait()
- return self.r, self.outb, self.outs
-
-
-class OSDMap(ceph_module.BasePyOSDMap):
- def get_epoch(self):
- return self._get_epoch()
-
- def get_crush_version(self):
- return self._get_crush_version()
-
- def dump(self):
- return self._dump()
-
- def new_incremental(self):
- return self._new_incremental()
-
- def apply_incremental(self, inc):
- return self._apply_incremental(inc)
-
- def get_crush(self):
- return self._get_crush()
-
- def get_pools_by_take(self, take):
- return self._get_pools_by_take(take).get('pools', [])
-
- def calc_pg_upmaps(self, inc,
- max_deviation=.01, max_iterations=10, pools=[]):
- return self._calc_pg_upmaps(
- inc,
- max_deviation, max_iterations, pools)
-
- def map_pool_pgs_up(self, poolid):
- return self._map_pool_pgs_up(poolid)
-
-class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
- def get_epoch(self):
- return self._get_epoch()
-
- def dump(self):
- return self._dump()
-
- def set_osd_reweights(self, weightmap):
- """
- weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
- """
- return self._set_osd_reweights(weightmap)
-
- def set_crush_compat_weight_set_weights(self, weightmap):
- """
- weightmap is a dict, int to float. devices only. e.g.,
- { 0: 3.4, 1: 3.3, 2: 3.334 }
- """
- return self._set_crush_compat_weight_set_weights(weightmap)
-
-class CRUSHMap(ceph_module.BasePyCRUSH):
- def dump(self):
- return self._dump()
-
- def get_item_weight(self, item):
- return self._get_item_weight(item)
-
- def get_item_name(self, item):
- return self._get_item_name(item)
-
- def find_takes(self):
- return self._find_takes().get('takes', [])
-
- def get_take_weight_osd_map(self, root):
- uglymap = self._get_take_weight_osd_map(root)
- return { int(k): v for k, v in uglymap.get('weights', {}).iteritems() }
-
-class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
- """
- Standby modules only implement a serve and shutdown method, they
- are not permitted to implement commands and they do not receive
- any notifications.
-
- They only have access to the mgrmap (for acecssing service URI info
- from their active peer), and to configuration settings (read only).
- """
-
- def __init__(self, module_name, capsule):
- super(MgrStandbyModule, self).__init__(capsule)
- self.module_name = module_name
- self._logger = configure_logger(self, module_name)
-
- def __del__(self):
- unconfigure_logger(self, self.module_name)
-
- @property
- def log(self):
- return self._logger
-
- def serve(self):
- """
- The serve method is mandatory for standby modules.
- :return:
- """
- raise NotImplementedError()
-
- def get_mgr_id(self):
- return self._ceph_get_mgr_id()
-
- def get_config(self, key):
- return self._ceph_get_config(key)
-
- def get_active_uri(self):
- return self._ceph_get_active_uri()
-
- def get_localized_config(self, key, default=None):
- r = self.get_config(self.get_mgr_id() + '/' + key)
- if r is None:
- r = self.get_config(key)
-
- if r is None:
- r = default
- return r
-
-class MgrModule(ceph_module.BaseMgrModule):
- COMMANDS = []
-
- # Priority definitions for perf counters
- PRIO_CRITICAL = 10
- PRIO_INTERESTING = 8
- PRIO_USEFUL = 5
- PRIO_UNINTERESTING = 2
- PRIO_DEBUGONLY = 0
-
- # counter value types
- PERFCOUNTER_TIME = 1
- PERFCOUNTER_U64 = 2
-
- # counter types
- PERFCOUNTER_LONGRUNAVG = 4
- PERFCOUNTER_COUNTER = 8
- PERFCOUNTER_HISTOGRAM = 0x10
- PERFCOUNTER_TYPE_MASK = ~2
-
- def __init__(self, module_name, py_modules_ptr, this_ptr):
- self.module_name = module_name
-
- # If we're taking over from a standby module, let's make sure
- # its logger was unconfigured before we hook ours up
- unconfigure_logger(self, self.module_name)
- self._logger = configure_logger(self, module_name)
-
- super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
-
- self._version = self._ceph_get_version()
-
- self._perf_schema_cache = None
-
- def __del__(self):
- unconfigure_logger(self, self.module_name)
-
- def update_perf_schema(self, daemon_type, daemon_name):
- """
- For plugins that use get_all_perf_counters, call this when
- receiving a notification of type 'perf_schema_update', to
- prompt MgrModule to update its cache of counter schemas.
-
- :param daemon_type:
- :param daemon_name:
- :return:
- """
-
- @property
- def log(self):
- return self._logger
-
- @property
- def version(self):
- return self._version
-
- def get_context(self):
- """
- :return: a Python capsule containing a C++ CephContext pointer
- """
- return self._ceph_get_context()
-
- def notify(self, notify_type, notify_id):
- """
- Called by the ceph-mgr service to notify the Python plugin
- that new state is available.
- """
- pass
-
- def serve(self):
- """
- Called by the ceph-mgr service to start any server that
- is provided by this Python plugin. The implementation
- of this function should block until ``shutdown`` is called.
-
- You *must* implement ``shutdown`` if you implement ``serve``
- """
- pass
-
- def shutdown(self):
- """
- Called by the ceph-mgr service to request that this
- module drop out of its serve() function. You do not
- need to implement this if you do not implement serve()
-
- :return: None
- """
- pass
-
- def get(self, data_name):
- """
- Called by the plugin to load some cluster state from ceph-mgr
- """
- return self._ceph_get(data_name)
-
- def get_server(self, hostname):
- """
- Called by the plugin to load information about a particular
- node from ceph-mgr.
-
- :param hostname: a hostame
- """
- return self._ceph_get_server(hostname)
-
- def get_perf_schema(self, svc_type, svc_name):
- """
- Called by the plugin to fetch perf counter schema info.
- svc_name can be nullptr, as can svc_type, in which case
- they are wildcards
-
- :param svc_type:
- :param svc_name:
- :return: list of dicts describing the counters requested
- """
- return self._ceph_get_perf_schema(svc_type, svc_name)
-
- def get_counter(self, svc_type, svc_name, path):
- """
- Called by the plugin to fetch data for a particular perf counter
- on a particular service.
-
- :param svc_type:
- :param svc_name:
- :param path:
- :return: A list of two-element lists containing time and value
- """
- return self._ceph_get_counter(svc_type, svc_name, path)
-
- def list_servers(self):
- """
- Like ``get_server``, but instead of returning information
- about just one node, return all the nodes in an array.
- """
- return self._ceph_get_server(None)
-
- def get_metadata(self, svc_type, svc_id):
- """
- Fetch the metadata for a particular service.
-
- :param svc_type: string (e.g., 'mds', 'osd', 'mon')
- :param svc_id: string
- :return: dict
- """
- return self._ceph_get_metadata(svc_type, svc_id)
-
- def get_daemon_status(self, svc_type, svc_id):
- """
- Fetch the latest status for a particular service daemon.
-
- :param svc_type: string (e.g., 'rgw')
- :param svc_id: string
- :return: dict
- """
- return self._ceph_get_daemon_status(svc_type, svc_id)
-
- def send_command(self, *args, **kwargs):
- """
- Called by the plugin to send a command to the mon
- cluster.
- """
- self._ceph_send_command(*args, **kwargs)
-
- def set_health_checks(self, checks):
- """
- Set module's health checks
-
- Set the module's current map of health checks. Argument is a
- dict of check names to info, in this form:
-
- {
- 'CHECK_FOO': {
- 'severity': 'warning', # or 'error'
- 'summary': 'summary string',
- 'detail': [ 'list', 'of', 'detail', 'strings' ],
- },
- 'CHECK_BAR': {
- 'severity': 'error',
- 'summary': 'bars are bad',
- 'detail': [ 'too hard' ],
- },
- }
-
- :param list: dict of health check dicts
- """
- self._ceph_set_health_checks(checks)
-
- def handle_command(self, cmd):
- """
- Called by ceph-mgr to request the plugin to handle one
- of the commands that it declared in self.COMMANDS
-
- Return a status code, an output buffer, and an
- output string. The output buffer is for data results,
- the output string is for informative text.
-
- :param cmd: dict, from Ceph's cmdmap_t
-
- :return: 3-tuple of (int, str, str)
- """
-
- # Should never get called if they didn't declare
- # any ``COMMANDS``
- raise NotImplementedError()
-
- def get_mgr_id(self):
- """
- Retrieve the mgr id.
-
- :return: str
- """
- return self._ceph_get_mgr_id()
-
- def get_config(self, key, default=None):
- """
- Retrieve the value of a persistent configuration setting
-
- :param key: str
- :return: str
- """
- r = self._ceph_get_config(key)
- if r is None:
- return default
- else:
- return r
-
- def get_config_prefix(self, key_prefix):
- """
- Retrieve a dict of config values with the given prefix
-
- :param key_prefix: str
- :return: str
- """
- return self._ceph_get_config_prefix(key_prefix)
-
- def get_localized_config(self, key, default=None):
- """
- Retrieve localized configuration for this ceph-mgr instance
- :param key: str
- :param default: str
- :return: str
- """
- r = self.get_config(self.get_mgr_id() + '/' + key)
- if r is None:
- r = self.get_config(key)
-
- if r is None:
- r = default
- return r
-
- def set_config(self, key, val):
- """
- Set the value of a persistent configuration setting
-
- :param key: str
- :param val: str
- """
- self._ceph_set_config(key, val)
-
- def set_localized_config(self, key, val):
- """
- Set localized configuration for this ceph-mgr instance
- :param key: str
- :param default: str
- :return: str
- """
- return self._ceph_set_config(self.get_mgr_id() + '/' + key, val)
-
- def set_config_json(self, key, val):
- """
- Helper for setting json-serialized-config
-
- :param key: str
- :param val: json-serializable object
- """
- self._ceph_set_config(key, json.dumps(val))
-
- def get_config_json(self, key):
- """
- Helper for getting json-serialized config
-
- :param key: str
- :return: object
- """
- raw = self.get_config(key)
- if raw is None:
- return None
- else:
- return json.loads(raw)
-
- def self_test(self):
- """
- Run a self-test on the module. Override this function and implement
- a best as possible self-test for (automated) testing of the module
- :return: bool
- """
- pass
-
- def get_osdmap(self):
- """
- Get a handle to an OSDMap. If epoch==0, get a handle for the latest
- OSDMap.
- :return: OSDMap
- """
- return self._ceph_get_osdmap()
-
- def get_all_perf_counters(self, prio_limit=PRIO_USEFUL):
- """
- Return the perf counters currently known to this ceph-mgr
- instance, filtered by priority equal to or greater than `prio_limit`.
-
- The result us a map of string to dict, associating services
- (like "osd.123") with their counters. The counter
- dict for each service maps counter paths to a counter
- info structure, which is the information from
- the schema, plus an additional "value" member with the latest
- value.
- """
-
- result = defaultdict(dict)
-
- # TODO: improve C++->Python interface to return just
- # the latest if that's all we want.
- def get_latest(daemon_type, daemon_name, counter):
- data = self.get_counter(daemon_type, daemon_name, counter)[counter]
- if data:
- return data[-1][1]
- else:
- return 0
-
- for server in self.list_servers():
- for service in server['services']:
- if service['type'] not in ("mds", "osd", "mon"):
- continue
-
- schema = self.get_perf_schema(service['type'], service['id'])
- if not schema:
- self.log.warn("No perf counter schema for {0}.{1}".format(
- service['type'], service['id']
- ))
- continue
-
- # Value is returned in a potentially-multi-service format,
- # get just the service we're asking about
- svc_full_name = "{0}.{1}".format(service['type'], service['id'])
- schema = schema[svc_full_name]
-
- # Populate latest values
- for counter_path, counter_schema in schema.items():
- # self.log.debug("{0}: {1}".format(
- # counter_path, json.dumps(counter_schema)
- # ))
- if counter_schema['priority'] < prio_limit:
- continue
-
- counter_info = counter_schema
- counter_info['value'] = get_latest(service['type'], service['id'], counter_path)
- result[svc_full_name][counter_path] = counter_info
-
- self.log.debug("returning {0} counter".format(len(result)))
-
- return result
-
- def set_uri(self, uri):
- """
- If the module exposes a service, then call this to publish the
- address once it is available.
-
- :return: a string
- """
- return self._ceph_set_uri(uri)