--- /dev/null
+
+import ceph_module # noqa
+#import ceph_osdmap #noqa
+#import ceph_osdmap_incremental #noqa
+#import ceph_crushmap #noqa
+
+import json
+import logging
+import threading
+from collections import defaultdict
+
+
+class CPlusPlusHandler(logging.Handler):
+ def __init__(self, module_inst):
+ super(CPlusPlusHandler, self).__init__()
+ self._module = module_inst
+
+ def emit(self, record):
+ if record.levelno <= logging.DEBUG:
+ ceph_level = 20
+ elif record.levelno <= logging.INFO:
+ ceph_level = 4
+ elif record.levelno <= logging.WARNING:
+ ceph_level = 1
+ else:
+ ceph_level = 0
+
+ self._module._ceph_log(ceph_level, self.format(record))
+
+
+def configure_logger(module_inst, name):
+ logger = logging.getLogger(name)
+
+
+ # Don't filter any logs at the python level, leave it to C++
+ logger.setLevel(logging.DEBUG)
+
+ # FIXME: we should learn the log level from C++ land, and then
+ # avoid calling the C++ level log when we know a message is of
+ # an insufficient level to be ultimately output
+ logger.addHandler(CPlusPlusHandler(module_inst))
+
+ return logger
+
+
+def unconfigure_logger(module_inst, name):
+ logger = logging.getLogger(name)
+ rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
+ for h in rm_handlers:
+ logger.removeHandler(h)
+
+class CommandResult(object):
+ """
+ Use with MgrModule.send_command
+ """
+ def __init__(self, tag):
+ self.ev = threading.Event()
+ self.outs = ""
+ self.outb = ""
+ self.r = 0
+
+ # This is just a convenience for notifications from
+ # C++ land, to avoid passing addresses around in messages.
+ self.tag = tag
+
+ def complete(self, r, outb, outs):
+ self.r = r
+ self.outb = outb
+ self.outs = outs
+ self.ev.set()
+
+ def wait(self):
+ self.ev.wait()
+ return self.r, self.outb, self.outs
+
+
+class OSDMap(ceph_module.BasePyOSDMap):
+ def get_epoch(self):
+ return self._get_epoch()
+
+ def get_crush_version(self):
+ return self._get_crush_version()
+
+ def dump(self):
+ return self._dump()
+
+ def new_incremental(self):
+ return self._new_incremental()
+
+ def apply_incremental(self, inc):
+ return self._apply_incremental(inc)
+
+ def get_crush(self):
+ return self._get_crush()
+
+ def get_pools_by_take(self, take):
+ return self._get_pools_by_take(take).get('pools', [])
+
+ def calc_pg_upmaps(self, inc,
+ max_deviation=.01, max_iterations=10, pools=[]):
+ return self._calc_pg_upmaps(
+ inc,
+ max_deviation, max_iterations, pools)
+
+ def map_pool_pgs_up(self, poolid):
+ return self._map_pool_pgs_up(poolid)
+
+class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
+ def get_epoch(self):
+ return self._get_epoch()
+
+ def dump(self):
+ return self._dump()
+
+ def set_osd_reweights(self, weightmap):
+ """
+ weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
+ """
+ return self._set_osd_reweights(weightmap)
+
+ def set_crush_compat_weight_set_weights(self, weightmap):
+ """
+ weightmap is a dict, int to float. devices only. e.g.,
+ { 0: 3.4, 1: 3.3, 2: 3.334 }
+ """
+ return self._set_crush_compat_weight_set_weights(weightmap)
+
+class CRUSHMap(ceph_module.BasePyCRUSH):
+ def dump(self):
+ return self._dump()
+
+ def get_item_weight(self, item):
+ return self._get_item_weight(item)
+
+ def get_item_name(self, item):
+ return self._get_item_name(item)
+
+ def find_takes(self):
+ return self._find_takes().get('takes', [])
+
+ def get_take_weight_osd_map(self, root):
+ uglymap = self._get_take_weight_osd_map(root)
+ return { int(k): v for k, v in uglymap.get('weights', {}).iteritems() }
+
+class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
+ """
+ Standby modules only implement a serve and shutdown method, they
+ are not permitted to implement commands and they do not receive
+ any notifications.
+
+ They only have access to the mgrmap (for acecssing service URI info
+ from their active peer), and to configuration settings (read only).
+ """
+
+ def __init__(self, module_name, capsule):
+ super(MgrStandbyModule, self).__init__(capsule)
+ self.module_name = module_name
+ self._logger = configure_logger(self, module_name)
+
+ def __del__(self):
+ unconfigure_logger(self, self.module_name)
+
+ @property
+ def log(self):
+ return self._logger
+
+ def serve(self):
+ """
+ The serve method is mandatory for standby modules.
+ :return:
+ """
+ raise NotImplementedError()
+
+ def get_mgr_id(self):
+ return self._ceph_get_mgr_id()
+
+ def get_config(self, key):
+ return self._ceph_get_config(key)
+
+ def get_active_uri(self):
+ return self._ceph_get_active_uri()
+
+ def get_localized_config(self, key, default=None):
+ r = self.get_config(self.get_mgr_id() + '/' + key)
+ if r is None:
+ r = self.get_config(key)
+
+ if r is None:
+ r = default
+ return r
+
+class MgrModule(ceph_module.BaseMgrModule):
+ COMMANDS = []
+
+ # Priority definitions for perf counters
+ PRIO_CRITICAL = 10
+ PRIO_INTERESTING = 8
+ PRIO_USEFUL = 5
+ PRIO_UNINTERESTING = 2
+ PRIO_DEBUGONLY = 0
+
+ # counter value types
+ PERFCOUNTER_TIME = 1
+ PERFCOUNTER_U64 = 2
+
+ # counter types
+ PERFCOUNTER_LONGRUNAVG = 4
+ PERFCOUNTER_COUNTER = 8
+ PERFCOUNTER_HISTOGRAM = 0x10
+ PERFCOUNTER_TYPE_MASK = ~2
+
+ def __init__(self, module_name, py_modules_ptr, this_ptr):
+ self.module_name = module_name
+
+ # If we're taking over from a standby module, let's make sure
+ # its logger was unconfigured before we hook ours up
+ unconfigure_logger(self, self.module_name)
+ self._logger = configure_logger(self, module_name)
+
+ super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
+
+ self._version = self._ceph_get_version()
+
+ self._perf_schema_cache = None
+
+ def __del__(self):
+ unconfigure_logger(self, self.module_name)
+
+ def update_perf_schema(self, daemon_type, daemon_name):
+ """
+ For plugins that use get_all_perf_counters, call this when
+ receiving a notification of type 'perf_schema_update', to
+ prompt MgrModule to update its cache of counter schemas.
+
+ :param daemon_type:
+ :param daemon_name:
+ :return:
+ """
+
+ @property
+ def log(self):
+ return self._logger
+
+ @property
+ def version(self):
+ return self._version
+
+ def get_context(self):
+ """
+ :return: a Python capsule containing a C++ CephContext pointer
+ """
+ return self._ceph_get_context()
+
+ def notify(self, notify_type, notify_id):
+ """
+ Called by the ceph-mgr service to notify the Python plugin
+ that new state is available.
+ """
+ pass
+
+ def serve(self):
+ """
+ Called by the ceph-mgr service to start any server that
+ is provided by this Python plugin. The implementation
+ of this function should block until ``shutdown`` is called.
+
+ You *must* implement ``shutdown`` if you implement ``serve``
+ """
+ pass
+
+ def shutdown(self):
+ """
+ Called by the ceph-mgr service to request that this
+ module drop out of its serve() function. You do not
+ need to implement this if you do not implement serve()
+
+ :return: None
+ """
+ pass
+
+ def get(self, data_name):
+ """
+ Called by the plugin to load some cluster state from ceph-mgr
+ """
+ return self._ceph_get(data_name)
+
+ def get_server(self, hostname):
+ """
+ Called by the plugin to load information about a particular
+ node from ceph-mgr.
+
+ :param hostname: a hostame
+ """
+ return self._ceph_get_server(hostname)
+
+ def get_perf_schema(self, svc_type, svc_name):
+ """
+ Called by the plugin to fetch perf counter schema info.
+ svc_name can be nullptr, as can svc_type, in which case
+ they are wildcards
+
+ :param svc_type:
+ :param svc_name:
+ :return: list of dicts describing the counters requested
+ """
+ return self._ceph_get_perf_schema(svc_type, svc_name)
+
+ def get_counter(self, svc_type, svc_name, path):
+ """
+ Called by the plugin to fetch data for a particular perf counter
+ on a particular service.
+
+ :param svc_type:
+ :param svc_name:
+ :param path:
+ :return: A list of two-element lists containing time and value
+ """
+ return self._ceph_get_counter(svc_type, svc_name, path)
+
+ def list_servers(self):
+ """
+ Like ``get_server``, but instead of returning information
+ about just one node, return all the nodes in an array.
+ """
+ return self._ceph_get_server(None)
+
+ def get_metadata(self, svc_type, svc_id):
+ """
+ Fetch the metadata for a particular service.
+
+ :param svc_type: string (e.g., 'mds', 'osd', 'mon')
+ :param svc_id: string
+ :return: dict
+ """
+ return self._ceph_get_metadata(svc_type, svc_id)
+
+ def get_daemon_status(self, svc_type, svc_id):
+ """
+ Fetch the latest status for a particular service daemon.
+
+ :param svc_type: string (e.g., 'rgw')
+ :param svc_id: string
+ :return: dict
+ """
+ return self._ceph_get_daemon_status(svc_type, svc_id)
+
+ def send_command(self, *args, **kwargs):
+ """
+ Called by the plugin to send a command to the mon
+ cluster.
+ """
+ self._ceph_send_command(*args, **kwargs)
+
+ def set_health_checks(self, checks):
+ """
+ Set module's health checks
+
+ Set the module's current map of health checks. Argument is a
+ dict of check names to info, in this form:
+
+ {
+ 'CHECK_FOO': {
+ 'severity': 'warning', # or 'error'
+ 'summary': 'summary string',
+ 'detail': [ 'list', 'of', 'detail', 'strings' ],
+ },
+ 'CHECK_BAR': {
+ 'severity': 'error',
+ 'summary': 'bars are bad',
+ 'detail': [ 'too hard' ],
+ },
+ }
+
+ :param list: dict of health check dicts
+ """
+ self._ceph_set_health_checks(checks)
+
+ def handle_command(self, cmd):
+ """
+ Called by ceph-mgr to request the plugin to handle one
+ of the commands that it declared in self.COMMANDS
+
+ Return a status code, an output buffer, and an
+ output string. The output buffer is for data results,
+ the output string is for informative text.
+
+ :param cmd: dict, from Ceph's cmdmap_t
+
+ :return: 3-tuple of (int, str, str)
+ """
+
+ # Should never get called if they didn't declare
+ # any ``COMMANDS``
+ raise NotImplementedError()
+
+ def get_mgr_id(self):
+ """
+ Retrieve the mgr id.
+
+ :return: str
+ """
+ return self._ceph_get_mgr_id()
+
+ def get_config(self, key, default=None):
+ """
+ Retrieve the value of a persistent configuration setting
+
+ :param key: str
+ :return: str
+ """
+ r = self._ceph_get_config(key)
+ if r is None:
+ return default
+ else:
+ return r
+
+ def get_config_prefix(self, key_prefix):
+ """
+ Retrieve a dict of config values with the given prefix
+
+ :param key_prefix: str
+ :return: str
+ """
+ return self._ceph_get_config_prefix(key_prefix)
+
+ def get_localized_config(self, key, default=None):
+ """
+ Retrieve localized configuration for this ceph-mgr instance
+ :param key: str
+ :param default: str
+ :return: str
+ """
+ r = self.get_config(self.get_mgr_id() + '/' + key)
+ if r is None:
+ r = self.get_config(key)
+
+ if r is None:
+ r = default
+ return r
+
+ def set_config(self, key, val):
+ """
+ Set the value of a persistent configuration setting
+
+ :param key: str
+ :param val: str
+ """
+ self._ceph_set_config(key, val)
+
+ def set_localized_config(self, key, val):
+ """
+ Set localized configuration for this ceph-mgr instance
+ :param key: str
+ :param default: str
+ :return: str
+ """
+ return self._ceph_set_config(self.get_mgr_id() + '/' + key, val)
+
+ def set_config_json(self, key, val):
+ """
+ Helper for setting json-serialized-config
+
+ :param key: str
+ :param val: json-serializable object
+ """
+ self._ceph_set_config(key, json.dumps(val))
+
+ def get_config_json(self, key):
+ """
+ Helper for getting json-serialized config
+
+ :param key: str
+ :return: object
+ """
+ raw = self.get_config(key)
+ if raw is None:
+ return None
+ else:
+ return json.loads(raw)
+
+ def self_test(self):
+ """
+ Run a self-test on the module. Override this function and implement
+ a best as possible self-test for (automated) testing of the module
+ :return: bool
+ """
+ pass
+
+ def get_osdmap(self):
+ """
+ Get a handle to an OSDMap. If epoch==0, get a handle for the latest
+ OSDMap.
+ :return: OSDMap
+ """
+ return self._ceph_get_osdmap()
+
+ def get_all_perf_counters(self, prio_limit=PRIO_USEFUL):
+ """
+ Return the perf counters currently known to this ceph-mgr
+ instance, filtered by priority equal to or greater than `prio_limit`.
+
+ The result us a map of string to dict, associating services
+ (like "osd.123") with their counters. The counter
+ dict for each service maps counter paths to a counter
+ info structure, which is the information from
+ the schema, plus an additional "value" member with the latest
+ value.
+ """
+
+ result = defaultdict(dict)
+
+ # TODO: improve C++->Python interface to return just
+ # the latest if that's all we want.
+ def get_latest(daemon_type, daemon_name, counter):
+ data = self.get_counter(daemon_type, daemon_name, counter)[counter]
+ if data:
+ return data[-1][1]
+ else:
+ return 0
+
+ for server in self.list_servers():
+ for service in server['services']:
+ if service['type'] not in ("mds", "osd", "mon"):
+ continue
+
+ schema = self.get_perf_schema(service['type'], service['id'])
+ if not schema:
+ self.log.warn("No perf counter schema for {0}.{1}".format(
+ service['type'], service['id']
+ ))
+ continue
+
+ # Value is returned in a potentially-multi-service format,
+ # get just the service we're asking about
+ svc_full_name = "{0}.{1}".format(service['type'], service['id'])
+ schema = schema[svc_full_name]
+
+ # Populate latest values
+ for counter_path, counter_schema in schema.items():
+ # self.log.debug("{0}: {1}".format(
+ # counter_path, json.dumps(counter_schema)
+ # ))
+ if counter_schema['priority'] < prio_limit:
+ continue
+
+ counter_info = counter_schema
+ counter_info['value'] = get_latest(service['type'], service['id'], counter_path)
+ result[svc_full_name][counter_path] = counter_info
+
+ self.log.debug("returning {0} counter".format(len(result)))
+
+ return result
+
+ def set_uri(self, uri):
+ """
+ If the module exposes a service, then call this to publish the
+ address once it is available.
+
+ :return: a string
+ """
+ return self._ceph_set_uri(uri)