remove ceph code
[stor4nfv.git] / src / ceph / src / pybind / mgr / selftest / module.py
diff --git a/src/ceph/src/pybind/mgr/selftest/module.py b/src/ceph/src/pybind/mgr/selftest/module.py
deleted file mode 100644 (file)
index e289aee..0000000
+++ /dev/null
@@ -1,217 +0,0 @@
-
-from mgr_module import MgrModule, CommandResult
-import threading
-import random
-import json
-import errno
-
-
-class Module(MgrModule):
-    """
-    This module is for testing the ceph-mgr python interface from within
-    a running ceph-mgr daemon.
-
-    It implements a sychronous self-test command for calling the functions
-    in the MgrModule interface one by one, and a background "workload"
-    command for causing the module to perform some thrashing-type
-    activities in its serve() thread.
-    """
-
-    WORKLOAD_COMMAND_SPAM = "command_spam"
-    SHUTDOWN = "shutdown"
-
-    WORKLOADS = (WORKLOAD_COMMAND_SPAM, )
-
-    COMMANDS = [
-            {
-                "cmd": "mgr self-test run",
-                "desc": "Run mgr python interface tests",
-                "perm": "r"
-            },
-            {
-                "cmd": "mgr self-test background start name=workload,type=CephString",
-                "desc": "Activate a background workload (one of {0})".format(
-                    ", ".join(WORKLOADS)),
-                "perm": "r"
-            },
-            {
-                "cmd": "mgr self-test background stop",
-                "desc": "Stop background workload if any is running",
-                "perm": "r"
-            },
-            ]
-
-
-
-    def __init__(self, *args, **kwargs):
-        super(Module, self).__init__(*args, **kwargs)
-        self._event = threading.Event()
-        self._workload = None
-
-    def handle_command(self, command):
-        if command['prefix'] == 'mgr self-test run':
-            self._self_test()
-            return 0, '', 'Self-test succeeded'
-
-        elif command['prefix'] == 'mgr self-test background start':
-            if command['workload'] not in self.WORKLOADS:
-                return (-errno.EINVAL, '',
-                        "Workload not found '{0}'".format(command['workload']))
-            self._workload = command['workload']
-            self._event.set()
-            return 0, '', 'Running `{0}` in background'.format(self._workload)
-
-        elif command['prefix'] == 'mgr self-test background stop':
-            if self._workload:
-                was_running = self._workload
-                self._workload = None
-                self._event.set()
-                return 0, '', 'Stopping background workload `{0}`'.format(
-                        was_running)
-            else:
-                return 0, '', 'No background workload was running'
-
-        else:
-            return (-errno.EINVAL, '',
-                    "Command not found '{0}'".format(command['prefix']))
-
-    def _self_test(self):
-        self.log.info("Running self-test procedure...")
-
-        self._self_test_osdmap()
-        self._self_test_getters()
-        self._self_test_config()
-        self._self_test_misc()
-        self._self_test_perf_counters()
-
-    def _self_test_getters(self):
-        self.version
-        self.get_context()
-        self.get_mgr_id()
-
-        # In this function, we will assume that the system is in a steady
-        # state, i.e. if a server/service appears in one call, it will
-        # not have gone by the time we call another function referring to it
-
-        objects = [
-                "fs_map",
-                "osdmap_crush_map_text",
-                "osd_map",
-                "config",
-                "mon_map",
-                "service_map",
-                "osd_metadata",
-                "pg_summary",
-                "pg_status",
-                "pg_dump",
-                "df",
-                "osd_stats",
-                "health",
-                "mon_status",
-                "mgr_map"
-                ]
-        for obj in objects:
-            self.get(obj)
-
-        servers = self.list_servers()
-        for server in servers:
-            self.get_server(server['hostname'])
-
-        osdmap = self.get('osd_map')
-        for o in osdmap['osds']:
-            osd_id = o['osd']
-            self.get_metadata("osd", str(osd_id))
-
-        self.get_daemon_status("osd", "0")
-        #send_command
-
-    def _self_test_config(self):
-        # This is not a strong test (can't tell if values really
-        # persisted), it's just for the python interface bit.
-
-        self.set_config("testkey", "testvalue")
-        assert self.get_config("testkey") == "testvalue"
-
-        self.set_localized_config("testkey", "testvalue")
-        assert self.get_localized_config("testkey") == "testvalue"
-
-        self.set_config_json("testjsonkey", {"testblob": 2})
-        assert self.get_config_json("testjsonkey") == {"testblob": 2}
-
-        assert sorted(self.get_config_prefix("test").keys()) == sorted(
-                ["testkey", "testjsonkey"])
-
-    def _self_test_perf_counters(self):
-        self.get_perf_schema("osd", "0")
-        self.get_counter("osd", "0", "osd.op")
-        #get_counter
-        #get_all_perf_coutners
-
-    def _self_test_misc(self):
-        self.set_uri("http://this.is.a.test.com")
-        self.set_health_checks({})
-
-    def _self_test_osdmap(self):
-        osdmap = self.get_osdmap()
-        osdmap.get_epoch()
-        osdmap.get_crush_version()
-        osdmap.dump()
-
-        inc = osdmap.new_incremental()
-        osdmap.apply_incremental(inc)
-        inc.get_epoch()
-        inc.dump()
-
-        crush = osdmap.get_crush()
-        crush.dump()
-        crush.get_item_name(-1)
-        crush.get_item_weight(-1)
-        crush.find_takes()
-        crush.get_take_weight_osd_map(-1)
-
-        #osdmap.get_pools_by_take()
-        #osdmap.calc_pg_upmaps()
-        #osdmap.map_pools_pgs_up()
-
-        #inc.set_osd_reweights
-        #inc.set_crush_compat_weight_set_weights
-
-        self.log.info("Finished self-test procedure.")
-
-    def shutdown(self):
-        self._workload = self.SHUTDOWN
-        self._event.set()
-
-    def _command_spam(self):
-        self.log.info("Starting command_spam workload...")
-        while not self._event.is_set():
-            osdmap = self.get_osdmap()
-            dump = osdmap.dump()
-            count = len(dump['osds'])
-            i = int(random.random() * count)
-            w = random.random()
-
-            result = CommandResult('')
-            self.send_command(result, 'mon', '', json.dumps({
-                'prefix': 'osd reweight',
-                'id': i,
-                'weight': w
-                }), '')
-
-            crush = osdmap.get_crush().dump()
-            r, outb, outs = result.wait()
-
-        self._event.clear()
-        self.log.info("Ended command_spam workload...")
-
-    def serve(self):
-        while True:
-            if self._workload == self.WORKLOAD_COMMAND_SPAM:
-                self._command_spam()
-            elif self._workload == self.SHUTDOWN:
-                self.log.info("Shutting down...")
-                break
-            else:
-                self.log.info("Waiting for workload request...")
-                self._event.wait()
-                self._event.clear()