1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 import multiprocessing
17 log = logging.getLogger(__name__)
19 from oslo_config import cfg
21 import yardstick.common.utils as utils
22 from yardstick.benchmark.scenarios import base as base_scenario
23 from yardstick.dispatcher.base import Base as DispatcherBase
28 def _output_serializer_main(filename, queue):
29 '''entrypoint for the singleton subprocess writing to outfile
30 Use of this process enables multiple instances of a scenario without
31 messing up the output file.
34 config["type"] = CONF.dispatcher.capitalize()
35 config["file_path"] = filename
36 dispatcher = DispatcherBase.get(config)
39 # blocks until data becomes available
41 if record == '_TERMINATE_':
42 dispatcher.flush_result_data()
45 dispatcher.record_result_data(record)
48 def _execute_shell_command(command):
49 '''execute shell script with error handling'''
53 output = subprocess.check_output(command, shell=True)
56 output = traceback.format_exc()
57 log.error("exec command '%s' error:\n " % command)
58 log.error(traceback.format_exc())
60 return exitcode, output
63 def _single_action(seconds, command, queue):
64 '''entrypoint for the single action process'''
65 log.debug("single action, fires after %d seconds (from now)", seconds)
67 log.debug("single action: executing command: '%s'", command)
68 ret_code, data = _execute_shell_command(command)
70 log.error("single action error! command:%s" % command)
71 queue.put({'single-action-data': data})
73 log.debug("single action data: \n%s" % data)
74 queue.put({'single-action-data': data})
77 def _periodic_action(interval, command, queue):
78 '''entrypoint for the periodic action process'''
79 log.debug("periodic action, fires every: %d seconds", interval)
83 time_spent += interval
84 log.debug("periodic action, executing command: '%s'", command)
85 ret_code, data = _execute_shell_command(command)
87 log.error("periodic action error! command:%s", command)
88 queue.put({'periodic-action-data': data})
90 log.debug("periodic action data: \n%s" % data)
91 queue.put({'periodic-action-data': data})
100 def get_cls(runner_type):
101 '''return class of specified type'''
102 for runner in utils.itersubclasses(Runner):
103 if runner_type == runner.__execution_type__:
105 raise RuntimeError("No such runner_type %s" % runner_type)
109 '''return a list of known runner type (class) names'''
111 for runner in utils.itersubclasses(Runner):
117 """Returns instance of a scenario runner for execution type.
119 # if there is no runner, start the output serializer subprocess
120 if len(Runner.runners) == 0:
121 log.debug("Starting dump process file '%s'" %
122 config["output_filename"])
123 Runner.queue = multiprocessing.Queue()
124 Runner.dump_process = multiprocessing.Process(
125 target=_output_serializer_main,
127 args=(config["output_filename"], Runner.queue))
128 Runner.dump_process.start()
130 return Runner.get_cls(config["type"])(config, Runner.queue)
133 def release_dump_process():
134 '''Release the dumper process'''
135 log.debug("Stopping dump process")
136 if Runner.dump_process:
137 Runner.queue.put('_TERMINATE_')
138 Runner.dump_process.join()
139 Runner.dump_process = None
143 '''Release the runner'''
144 if runner in Runner.runners:
145 Runner.runners.remove(runner)
147 # if this was the last runner, stop the output serializer subprocess
148 if len(Runner.runners) == 0:
149 Runner.release_dump_process()
152 def terminate(runner):
153 '''Terminate the runner'''
154 if runner.process and runner.process.is_alive():
155 runner.process.terminate()
159 '''Terminate all runners (subprocesses)'''
160 log.debug("Terminating all runners")
162 # release dumper process as some errors before any runner is created
163 if len(Runner.runners) == 0:
164 Runner.release_dump_process()
167 for runner in Runner.runners:
168 log.debug("Terminating runner: %s", runner)
170 runner.process.terminate()
171 runner.process.join()
172 if runner.periodic_action_process:
173 log.debug("Terminating periodic action process")
174 runner.periodic_action_process.terminate()
175 runner.periodic_action_process = None
176 Runner.release(runner)
178 def __init__(self, config, queue):
180 self.periodic_action_process = None
181 self.result_queue = queue
183 self.aborted = multiprocessing.Event()
184 Runner.runners.append(self)
186 def run_post_stop_action(self):
187 '''run a potentially configured post-stop action'''
188 if "post-stop-action" in self.config:
189 command = self.config["post-stop-action"]["command"]
190 log.debug("post stop action: command: '%s'" % command)
191 ret_code, data = _execute_shell_command(command)
193 log.error("post action error! command:%s", command)
194 self.result_queue.put({'post-stop-action-data': data})
196 log.debug("post-stop data: \n%s" % data)
197 self.result_queue.put({'post-stop-action-data': data})
199 def run(self, scenario_cfg, context_cfg):
200 scenario_type = scenario_cfg["type"]
201 class_name = base_scenario.Scenario.get(scenario_type)
202 path_split = class_name.split(".")
203 module_path = ".".join(path_split[:-1])
204 module = importlib.import_module(module_path)
205 cls = getattr(module, path_split[-1])
207 self.config['object'] = class_name
210 # run a potentially configured pre-start action
211 if "pre-start-action" in self.config:
212 command = self.config["pre-start-action"]["command"]
213 log.debug("pre start action: command: '%s'" % command)
214 ret_code, data = _execute_shell_command(command)
216 log.error("pre-start action error! command:%s", command)
217 self.result_queue.put({'pre-start-action-data': data})
219 log.debug("pre-start data: \n%s" % data)
220 self.result_queue.put({'pre-start-action-data': data})
222 if "single-shot-action" in self.config:
223 single_action_process = multiprocessing.Process(
224 target=_single_action,
225 name="single-shot-action",
226 args=(self.config["single-shot-action"]["after"],
227 self.config["single-shot-action"]["command"],
229 single_action_process.start()
231 if "periodic-action" in self.config:
232 self.periodic_action_process = multiprocessing.Process(
233 target=_periodic_action,
234 name="periodic-action",
235 args=(self.config["periodic-action"]["interval"],
236 self.config["periodic-action"]["command"],
238 self.periodic_action_process.start()
240 self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
243 '''Abort the execution of a scenario'''
246 def join(self, timeout=None):
247 self.process.join(timeout)
248 if self.periodic_action_process:
249 self.periodic_action_process.terminate()
250 self.periodic_action_process = None
252 self.run_post_stop_action()
253 return self.process.exitcode