initial code repo
[stor4nfv.git] / src / ceph / src / pybind / mgr / mgr_module.py
diff --git a/src/ceph/src/pybind/mgr/mgr_module.py b/src/ceph/src/pybind/mgr/mgr_module.py
new file mode 100644 (file)
index 0000000..1abbcc5
--- /dev/null
@@ -0,0 +1,561 @@
+
+import ceph_module  # noqa
+#import ceph_osdmap  #noqa
+#import ceph_osdmap_incremental  #noqa
+#import ceph_crushmap  #noqa
+
+import json
+import logging
+import threading
+from collections import defaultdict
+
+
+class CPlusPlusHandler(logging.Handler):
+    def __init__(self, module_inst):
+        super(CPlusPlusHandler, self).__init__()
+        self._module = module_inst
+
+    def emit(self, record):
+        if record.levelno <= logging.DEBUG:
+            ceph_level = 20
+        elif record.levelno <= logging.INFO:
+            ceph_level = 4
+        elif record.levelno <= logging.WARNING:
+            ceph_level = 1
+        else:
+            ceph_level = 0
+
+        self._module._ceph_log(ceph_level, self.format(record))
+
+
+def configure_logger(module_inst, name):
+    logger = logging.getLogger(name)
+
+
+    # Don't filter any logs at the python level, leave it to C++
+    logger.setLevel(logging.DEBUG)
+
+    # FIXME: we should learn the log level from C++ land, and then
+    # avoid calling the C++ level log when we know a message is of
+    # an insufficient level to be ultimately output
+    logger.addHandler(CPlusPlusHandler(module_inst))
+
+    return logger
+
+
+def unconfigure_logger(module_inst, name):
+    logger = logging.getLogger(name)
+    rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
+    for h in rm_handlers:
+        logger.removeHandler(h)
+
+class CommandResult(object):
+    """
+    Use with MgrModule.send_command
+    """
+    def __init__(self, tag):
+        self.ev = threading.Event()
+        self.outs = ""
+        self.outb = ""
+        self.r = 0
+
+        # This is just a convenience for notifications from
+        # C++ land, to avoid passing addresses around in messages.
+        self.tag = tag
+
+    def complete(self, r, outb, outs):
+        self.r = r
+        self.outb = outb
+        self.outs = outs
+        self.ev.set()
+
+    def wait(self):
+        self.ev.wait()
+        return self.r, self.outb, self.outs
+
+
+class OSDMap(ceph_module.BasePyOSDMap):
+    def get_epoch(self):
+        return self._get_epoch()
+
+    def get_crush_version(self):
+        return self._get_crush_version()
+
+    def dump(self):
+        return self._dump()
+
+    def new_incremental(self):
+        return self._new_incremental()
+
+    def apply_incremental(self, inc):
+        return self._apply_incremental(inc)
+
+    def get_crush(self):
+        return self._get_crush()
+
+    def get_pools_by_take(self, take):
+        return self._get_pools_by_take(take).get('pools', [])
+
+    def calc_pg_upmaps(self, inc,
+                       max_deviation=.01, max_iterations=10, pools=[]):
+        return self._calc_pg_upmaps(
+            inc,
+            max_deviation, max_iterations, pools)
+
+    def map_pool_pgs_up(self, poolid):
+        return self._map_pool_pgs_up(poolid)
+
+class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
+    def get_epoch(self):
+        return self._get_epoch()
+
+    def dump(self):
+        return self._dump()
+
+    def set_osd_reweights(self, weightmap):
+        """
+        weightmap is a dict, int to float.  e.g. { 0: .9, 1: 1.0, 3: .997 }
+        """
+        return self._set_osd_reweights(weightmap)
+
+    def set_crush_compat_weight_set_weights(self, weightmap):
+        """
+        weightmap is a dict, int to float.  devices only.  e.g.,
+        { 0: 3.4, 1: 3.3, 2: 3.334 }
+        """
+        return self._set_crush_compat_weight_set_weights(weightmap)
+
+class CRUSHMap(ceph_module.BasePyCRUSH):
+    def dump(self):
+        return self._dump()
+
+    def get_item_weight(self, item):
+        return self._get_item_weight(item)
+
+    def get_item_name(self, item):
+        return self._get_item_name(item)
+
+    def find_takes(self):
+        return self._find_takes().get('takes', [])
+
+    def get_take_weight_osd_map(self, root):
+        uglymap = self._get_take_weight_osd_map(root)
+        return { int(k): v for k, v in uglymap.get('weights', {}).iteritems() }
+
+class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
+    """
+    Standby modules only implement a serve and shutdown method, they
+    are not permitted to implement commands and they do not receive
+    any notifications.
+
+    They only have access to the mgrmap (for acecssing service URI info
+    from their active peer), and to configuration settings (read only).
+    """
+
+    def __init__(self, module_name, capsule):
+        super(MgrStandbyModule, self).__init__(capsule)
+        self.module_name = module_name
+        self._logger = configure_logger(self, module_name)
+
+    def __del__(self):
+        unconfigure_logger(self, self.module_name)
+
+    @property
+    def log(self):
+        return self._logger
+
+    def serve(self):
+        """
+        The serve method is mandatory for standby modules.
+        :return:
+        """
+        raise NotImplementedError()
+
+    def get_mgr_id(self):
+        return self._ceph_get_mgr_id()
+
+    def get_config(self, key):
+        return self._ceph_get_config(key)
+
+    def get_active_uri(self):
+        return self._ceph_get_active_uri()
+
+    def get_localized_config(self, key, default=None):
+        r = self.get_config(self.get_mgr_id() + '/' + key)
+        if r is None:
+            r = self.get_config(key)
+
+        if r is None:
+            r = default
+        return r
+
+class MgrModule(ceph_module.BaseMgrModule):
+    COMMANDS = []
+
+    # Priority definitions for perf counters
+    PRIO_CRITICAL = 10
+    PRIO_INTERESTING = 8
+    PRIO_USEFUL = 5
+    PRIO_UNINTERESTING = 2
+    PRIO_DEBUGONLY = 0
+
+    # counter value types
+    PERFCOUNTER_TIME = 1
+    PERFCOUNTER_U64 = 2
+
+    # counter types
+    PERFCOUNTER_LONGRUNAVG = 4
+    PERFCOUNTER_COUNTER = 8
+    PERFCOUNTER_HISTOGRAM = 0x10
+    PERFCOUNTER_TYPE_MASK = ~2
+
+    def __init__(self, module_name, py_modules_ptr, this_ptr):
+        self.module_name = module_name
+
+        # If we're taking over from a standby module, let's make sure
+        # its logger was unconfigured before we hook ours up
+        unconfigure_logger(self, self.module_name)
+        self._logger = configure_logger(self, module_name)
+
+        super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
+
+        self._version = self._ceph_get_version()
+
+        self._perf_schema_cache = None
+
+    def __del__(self):
+        unconfigure_logger(self, self.module_name)
+
+    def update_perf_schema(self, daemon_type, daemon_name):
+        """
+        For plugins that use get_all_perf_counters, call this when
+        receiving a notification of type 'perf_schema_update', to
+        prompt MgrModule to update its cache of counter schemas.
+
+        :param daemon_type:
+        :param daemon_name:
+        :return:
+        """
+
+    @property
+    def log(self):
+        return self._logger
+
+    @property
+    def version(self):
+        return self._version
+
+    def get_context(self):
+        """
+        :return: a Python capsule containing a C++ CephContext pointer
+        """
+        return self._ceph_get_context()
+
+    def notify(self, notify_type, notify_id):
+        """
+        Called by the ceph-mgr service to notify the Python plugin
+        that new state is available.
+        """
+        pass
+
+    def serve(self):
+        """
+        Called by the ceph-mgr service to start any server that
+        is provided by this Python plugin.  The implementation
+        of this function should block until ``shutdown`` is called.
+
+        You *must* implement ``shutdown`` if you implement ``serve``
+        """
+        pass
+
+    def shutdown(self):
+        """
+        Called by the ceph-mgr service to request that this
+        module drop out of its serve() function.  You do not
+        need to implement this if you do not implement serve()
+
+        :return: None
+        """
+        pass
+
+    def get(self, data_name):
+        """
+        Called by the plugin to load some cluster state from ceph-mgr
+        """
+        return self._ceph_get(data_name)
+
+    def get_server(self, hostname):
+        """
+        Called by the plugin to load information about a particular
+        node from ceph-mgr.
+
+        :param hostname: a hostame
+        """
+        return self._ceph_get_server(hostname)
+
+    def get_perf_schema(self, svc_type, svc_name):
+        """
+        Called by the plugin to fetch perf counter schema info.
+        svc_name can be nullptr, as can svc_type, in which case
+        they are wildcards
+
+        :param svc_type:
+        :param svc_name:
+        :return: list of dicts describing the counters requested
+        """
+        return self._ceph_get_perf_schema(svc_type, svc_name)
+
+    def get_counter(self, svc_type, svc_name, path):
+        """
+        Called by the plugin to fetch data for a particular perf counter
+        on a particular service.
+
+        :param svc_type:
+        :param svc_name:
+        :param path:
+        :return: A list of two-element lists containing time and value
+        """
+        return self._ceph_get_counter(svc_type, svc_name, path)
+
+    def list_servers(self):
+        """
+        Like ``get_server``, but instead of returning information
+        about just one node, return all the nodes in an array.
+        """
+        return self._ceph_get_server(None)
+
+    def get_metadata(self, svc_type, svc_id):
+        """
+        Fetch the metadata for a particular service.
+
+        :param svc_type: string (e.g., 'mds', 'osd', 'mon')
+        :param svc_id: string
+        :return: dict
+        """
+        return self._ceph_get_metadata(svc_type, svc_id)
+
+    def get_daemon_status(self, svc_type, svc_id):
+        """
+        Fetch the latest status for a particular service daemon.
+
+        :param svc_type: string (e.g., 'rgw')
+        :param svc_id: string
+        :return: dict
+        """
+        return self._ceph_get_daemon_status(svc_type, svc_id)
+
+    def send_command(self, *args, **kwargs):
+        """
+        Called by the plugin to send a command to the mon
+        cluster.
+        """
+        self._ceph_send_command(*args, **kwargs)
+
+    def set_health_checks(self, checks):
+        """
+        Set module's health checks
+
+        Set the module's current map of health checks.  Argument is a
+        dict of check names to info, in this form:
+
+           {
+             'CHECK_FOO': {
+               'severity': 'warning',           # or 'error'
+               'summary': 'summary string',
+               'detail': [ 'list', 'of', 'detail', 'strings' ],
+              },
+             'CHECK_BAR': {
+               'severity': 'error',
+               'summary': 'bars are bad',
+               'detail': [ 'too hard' ],
+             },
+           }
+
+        :param list: dict of health check dicts
+        """
+        self._ceph_set_health_checks(checks)
+
+    def handle_command(self, cmd):
+        """
+        Called by ceph-mgr to request the plugin to handle one
+        of the commands that it declared in self.COMMANDS
+
+        Return a status code, an output buffer, and an
+        output string.  The output buffer is for data results,
+        the output string is for informative text.
+
+        :param cmd: dict, from Ceph's cmdmap_t
+
+        :return: 3-tuple of (int, str, str)
+        """
+
+        # Should never get called if they didn't declare
+        # any ``COMMANDS``
+        raise NotImplementedError()
+
+    def get_mgr_id(self):
+        """
+        Retrieve the mgr id.
+
+        :return: str
+        """
+        return self._ceph_get_mgr_id()
+
+    def get_config(self, key, default=None):
+        """
+        Retrieve the value of a persistent configuration setting
+
+        :param key: str
+        :return: str
+        """
+        r = self._ceph_get_config(key)
+        if r is None:
+            return default
+        else:
+            return r
+
+    def get_config_prefix(self, key_prefix):
+        """
+        Retrieve a dict of config values with the given prefix
+
+        :param key_prefix: str
+        :return: str
+        """
+        return self._ceph_get_config_prefix(key_prefix)
+
+    def get_localized_config(self, key, default=None):
+        """
+        Retrieve localized configuration for this ceph-mgr instance
+        :param key: str
+        :param default: str
+        :return: str
+        """
+        r = self.get_config(self.get_mgr_id() + '/' + key)
+        if r is None:
+            r = self.get_config(key)
+
+        if r is None:
+            r = default
+        return r
+
+    def set_config(self, key, val):
+        """
+        Set the value of a persistent configuration setting
+
+        :param key: str
+        :param val: str
+        """
+        self._ceph_set_config(key, val)
+
+    def set_localized_config(self, key, val):
+        """
+        Set localized configuration for this ceph-mgr instance
+        :param key: str
+        :param default: str
+        :return: str
+        """
+        return self._ceph_set_config(self.get_mgr_id() + '/' + key, val)
+
+    def set_config_json(self, key, val):
+        """
+        Helper for setting json-serialized-config
+
+        :param key: str
+        :param val: json-serializable object
+        """
+        self._ceph_set_config(key, json.dumps(val))
+
+    def get_config_json(self, key):
+        """
+        Helper for getting json-serialized config
+
+        :param key: str
+        :return: object
+        """
+        raw = self.get_config(key)
+        if raw is None:
+            return None
+        else:
+            return json.loads(raw)
+
+    def self_test(self):
+        """
+        Run a self-test on the module. Override this function and implement
+        a best as possible self-test for (automated) testing of the module
+        :return: bool
+        """
+        pass
+
+    def get_osdmap(self):
+        """
+        Get a handle to an OSDMap.  If epoch==0, get a handle for the latest
+        OSDMap.
+        :return: OSDMap
+        """
+        return self._ceph_get_osdmap()
+
+    def get_all_perf_counters(self, prio_limit=PRIO_USEFUL):
+        """
+        Return the perf counters currently known to this ceph-mgr
+        instance, filtered by priority equal to or greater than `prio_limit`.
+
+        The result us a map of string to dict, associating services
+        (like "osd.123") with their counters.  The counter
+        dict for each service maps counter paths to a counter
+        info structure, which is the information from
+        the schema, plus an additional "value" member with the latest
+        value.
+        """
+
+        result = defaultdict(dict)
+
+        # TODO: improve C++->Python interface to return just
+        # the latest if that's all we want.
+        def get_latest(daemon_type, daemon_name, counter):
+            data = self.get_counter(daemon_type, daemon_name, counter)[counter]
+            if data:
+                return data[-1][1]
+            else:
+                return 0
+
+        for server in self.list_servers():
+            for service in server['services']:
+                if service['type'] not in ("mds", "osd", "mon"):
+                    continue
+
+                schema = self.get_perf_schema(service['type'], service['id'])
+                if not schema:
+                    self.log.warn("No perf counter schema for {0}.{1}".format(
+                        service['type'], service['id']
+                    ))
+                    continue
+
+                # Value is returned in a potentially-multi-service format,
+                # get just the service we're asking about
+                svc_full_name = "{0}.{1}".format(service['type'], service['id'])
+                schema = schema[svc_full_name]
+
+                # Populate latest values
+                for counter_path, counter_schema in schema.items():
+                    # self.log.debug("{0}: {1}".format(
+                    #     counter_path, json.dumps(counter_schema)
+                    # ))
+                    if counter_schema['priority'] < prio_limit:
+                        continue
+
+                    counter_info = counter_schema
+                    counter_info['value'] = get_latest(service['type'], service['id'], counter_path)
+                    result[svc_full_name][counter_path] = counter_info
+
+        self.log.debug("returning {0} counter".format(len(result)))
+
+        return result
+
+    def set_uri(self, uri):
+        """
+        If the module exposes a service, then call this to publish the
+        address once it is available.
+
+        :return: a string
+        """
+        return self._ceph_set_uri(uri)