+++ /dev/null
-
-from mgr_module import MgrModule, CommandResult
-import threading
-import random
-import json
-import errno
-
-
-class Module(MgrModule):
- """
- This module is for testing the ceph-mgr python interface from within
- a running ceph-mgr daemon.
-
- It implements a sychronous self-test command for calling the functions
- in the MgrModule interface one by one, and a background "workload"
- command for causing the module to perform some thrashing-type
- activities in its serve() thread.
- """
-
- WORKLOAD_COMMAND_SPAM = "command_spam"
- SHUTDOWN = "shutdown"
-
- WORKLOADS = (WORKLOAD_COMMAND_SPAM, )
-
- COMMANDS = [
- {
- "cmd": "mgr self-test run",
- "desc": "Run mgr python interface tests",
- "perm": "r"
- },
- {
- "cmd": "mgr self-test background start name=workload,type=CephString",
- "desc": "Activate a background workload (one of {0})".format(
- ", ".join(WORKLOADS)),
- "perm": "r"
- },
- {
- "cmd": "mgr self-test background stop",
- "desc": "Stop background workload if any is running",
- "perm": "r"
- },
- ]
-
-
-
- def __init__(self, *args, **kwargs):
- super(Module, self).__init__(*args, **kwargs)
- self._event = threading.Event()
- self._workload = None
-
- def handle_command(self, command):
- if command['prefix'] == 'mgr self-test run':
- self._self_test()
- return 0, '', 'Self-test succeeded'
-
- elif command['prefix'] == 'mgr self-test background start':
- if command['workload'] not in self.WORKLOADS:
- return (-errno.EINVAL, '',
- "Workload not found '{0}'".format(command['workload']))
- self._workload = command['workload']
- self._event.set()
- return 0, '', 'Running `{0}` in background'.format(self._workload)
-
- elif command['prefix'] == 'mgr self-test background stop':
- if self._workload:
- was_running = self._workload
- self._workload = None
- self._event.set()
- return 0, '', 'Stopping background workload `{0}`'.format(
- was_running)
- else:
- return 0, '', 'No background workload was running'
-
- else:
- return (-errno.EINVAL, '',
- "Command not found '{0}'".format(command['prefix']))
-
- def _self_test(self):
- self.log.info("Running self-test procedure...")
-
- self._self_test_osdmap()
- self._self_test_getters()
- self._self_test_config()
- self._self_test_misc()
- self._self_test_perf_counters()
-
- def _self_test_getters(self):
- self.version
- self.get_context()
- self.get_mgr_id()
-
- # In this function, we will assume that the system is in a steady
- # state, i.e. if a server/service appears in one call, it will
- # not have gone by the time we call another function referring to it
-
- objects = [
- "fs_map",
- "osdmap_crush_map_text",
- "osd_map",
- "config",
- "mon_map",
- "service_map",
- "osd_metadata",
- "pg_summary",
- "pg_status",
- "pg_dump",
- "df",
- "osd_stats",
- "health",
- "mon_status",
- "mgr_map"
- ]
- for obj in objects:
- self.get(obj)
-
- servers = self.list_servers()
- for server in servers:
- self.get_server(server['hostname'])
-
- osdmap = self.get('osd_map')
- for o in osdmap['osds']:
- osd_id = o['osd']
- self.get_metadata("osd", str(osd_id))
-
- self.get_daemon_status("osd", "0")
- #send_command
-
- def _self_test_config(self):
- # This is not a strong test (can't tell if values really
- # persisted), it's just for the python interface bit.
-
- self.set_config("testkey", "testvalue")
- assert self.get_config("testkey") == "testvalue"
-
- self.set_localized_config("testkey", "testvalue")
- assert self.get_localized_config("testkey") == "testvalue"
-
- self.set_config_json("testjsonkey", {"testblob": 2})
- assert self.get_config_json("testjsonkey") == {"testblob": 2}
-
- assert sorted(self.get_config_prefix("test").keys()) == sorted(
- ["testkey", "testjsonkey"])
-
- def _self_test_perf_counters(self):
- self.get_perf_schema("osd", "0")
- self.get_counter("osd", "0", "osd.op")
- #get_counter
- #get_all_perf_coutners
-
- def _self_test_misc(self):
- self.set_uri("http://this.is.a.test.com")
- self.set_health_checks({})
-
- def _self_test_osdmap(self):
- osdmap = self.get_osdmap()
- osdmap.get_epoch()
- osdmap.get_crush_version()
- osdmap.dump()
-
- inc = osdmap.new_incremental()
- osdmap.apply_incremental(inc)
- inc.get_epoch()
- inc.dump()
-
- crush = osdmap.get_crush()
- crush.dump()
- crush.get_item_name(-1)
- crush.get_item_weight(-1)
- crush.find_takes()
- crush.get_take_weight_osd_map(-1)
-
- #osdmap.get_pools_by_take()
- #osdmap.calc_pg_upmaps()
- #osdmap.map_pools_pgs_up()
-
- #inc.set_osd_reweights
- #inc.set_crush_compat_weight_set_weights
-
- self.log.info("Finished self-test procedure.")
-
- def shutdown(self):
- self._workload = self.SHUTDOWN
- self._event.set()
-
- def _command_spam(self):
- self.log.info("Starting command_spam workload...")
- while not self._event.is_set():
- osdmap = self.get_osdmap()
- dump = osdmap.dump()
- count = len(dump['osds'])
- i = int(random.random() * count)
- w = random.random()
-
- result = CommandResult('')
- self.send_command(result, 'mon', '', json.dumps({
- 'prefix': 'osd reweight',
- 'id': i,
- 'weight': w
- }), '')
-
- crush = osdmap.get_crush().dump()
- r, outb, outs = result.wait()
-
- self._event.clear()
- self.log.info("Ended command_spam workload...")
-
- def serve(self):
- while True:
- if self._workload == self.WORKLOAD_COMMAND_SPAM:
- self._command_spam()
- elif self._workload == self.SHUTDOWN:
- self.log.info("Shutting down...")
- break
- else:
- self.log.info("Waiting for workload request...")
- self._event.wait()
- self._event.clear()