X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=yardstick%2Fbenchmark%2Frunners%2Fbase.py;h=7c76e42df628ba54d3bd221317ead042b2fc7dde;hb=07249e010dd9837d63f3090f1eac0fc6763b968f;hp=0e029271365f3259a64112385127f52709404790;hpb=f036e9898a69f5041f9cde02e3652c29e2de1643;p=yardstick.git diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index 0e0292713..7c76e42df 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -35,15 +35,18 @@ log = logging.getLogger(__name__) CONF = cfg.CONF -def _output_serializer_main(filename, queue): - '''entrypoint for the singleton subprocess writing to outfile +def _output_serializer_main(filename, queue, config): + """entrypoint for the singleton subprocess writing to outfile Use of this process enables multiple instances of a scenario without messing up the output file. - ''' - config = {} - config["type"] = CONF.dispatcher.capitalize() - config["file_path"] = filename - dispatcher = DispatcherBase.get(config) + """ + out_type = config['yardstick'].get('DEFAULT', {}).get('dispatcher', 'file') + conf = { + 'type': out_type.capitalize(), + 'file_path': filename + } + + dispatcher = DispatcherBase.get(conf, config) while True: # blocks until data becomes available @@ -56,7 +59,7 @@ def _output_serializer_main(filename, queue): def _execute_shell_command(command): - '''execute shell script with error handling''' + """execute shell script with error handling""" exitcode = 0 output = [] try: @@ -71,7 +74,7 @@ def _execute_shell_command(command): def _single_action(seconds, command, queue): - '''entrypoint for the single action process''' + """entrypoint for the single action process""" log.debug("single action, fires after %d seconds (from now)", seconds) time.sleep(seconds) log.debug("single action: executing command: '%s'", command) @@ -85,7 +88,7 @@ def _single_action(seconds, command, queue): def _periodic_action(interval, command, queue): - '''entrypoint for the periodic action process''' + """entrypoint for the periodic action process""" log.debug("periodic action, fires every: %d seconds", interval) time_spent = 0 while True: @@ -108,7 +111,7 @@ class Runner(object): @staticmethod def get_cls(runner_type): - '''return class of specified type''' + """return class of specified type""" for runner in utils.itersubclasses(Runner): if runner_type == runner.__execution_type__: return runner @@ -116,32 +119,32 @@ class Runner(object): @staticmethod def get_types(): - '''return a list of known runner type (class) names''' + """return a list of known runner type (class) names""" types = [] for runner in utils.itersubclasses(Runner): types.append(runner) return types @staticmethod - def get(config): + def get(runner_cfg, config): """Returns instance of a scenario runner for execution type. """ # if there is no runner, start the output serializer subprocess - if len(Runner.runners) == 0: + if not Runner.runners: log.debug("Starting dump process file '%s'", - config["output_filename"]) + runner_cfg["output_filename"]) Runner.queue = multiprocessing.Queue() Runner.dump_process = multiprocessing.Process( target=_output_serializer_main, name="Dumper", - args=(config["output_filename"], Runner.queue)) + args=(runner_cfg["output_filename"], Runner.queue, config)) Runner.dump_process.start() - return Runner.get_cls(config["type"])(config, Runner.queue) + return Runner.get_cls(runner_cfg["type"])(runner_cfg, Runner.queue) @staticmethod def release_dump_process(): - '''Release the dumper process''' + """Release the dumper process""" log.debug("Stopping dump process") if Runner.dump_process: Runner.queue.put('_TERMINATE_') @@ -150,27 +153,27 @@ class Runner(object): @staticmethod def release(runner): - '''Release the runner''' + """Release the runner""" if runner in Runner.runners: Runner.runners.remove(runner) # if this was the last runner, stop the output serializer subprocess - if len(Runner.runners) == 0: + if not Runner.runners: Runner.release_dump_process() @staticmethod def terminate(runner): - '''Terminate the runner''' + """Terminate the runner""" if runner.process and runner.process.is_alive(): runner.process.terminate() @staticmethod def terminate_all(): - '''Terminate all runners (subprocesses)''' + """Terminate all runners (subprocesses)""" log.debug("Terminating all runners") # release dumper process as some errors before any runner is created - if len(Runner.runners) == 0: + if not Runner.runners: Runner.release_dump_process() return @@ -194,7 +197,7 @@ class Runner(object): Runner.runners.append(self) def run_post_stop_action(self): - '''run a potentially configured post-stop action''' + """run a potentially configured post-stop action""" if "post-stop-action" in self.config: command = self.config["post-stop-action"]["command"] log.debug("post stop action: command: '%s'", command) @@ -250,7 +253,7 @@ class Runner(object): self._run_benchmark(cls, "run", scenario_cfg, context_cfg) def abort(self): - '''Abort the execution of a scenario''' + """Abort the execution of a scenario""" self.aborted.set() def join(self, timeout=None):