1 # Copyright 2014: Mirantis Inc.
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/base.py
21 import multiprocessing
26 from oslo_config import cfg
28 import yardstick.common.utils as utils
29 from yardstick.benchmark.scenarios import base as base_scenario
30 from yardstick.dispatcher.base import Base as DispatcherBase
32 log = logging.getLogger(__name__)
37 def _output_serializer_main(filename, queue):
38 '''entrypoint for the singleton subprocess writing to outfile
39 Use of this process enables multiple instances of a scenario without
40 messing up the output file.
43 config["type"] = CONF.dispatcher.capitalize()
44 config["file_path"] = filename
45 dispatcher = DispatcherBase.get(config)
48 # blocks until data becomes available
50 if record == '_TERMINATE_':
51 dispatcher.flush_result_data()
54 dispatcher.record_result_data(record)
57 def _execute_shell_command(command):
58 '''execute shell script with error handling'''
62 output = subprocess.check_output(command, shell=True)
65 output = traceback.format_exc()
66 log.error("exec command '%s' error:\n ", command)
67 log.error(traceback.format_exc())
69 return exitcode, output
72 def _single_action(seconds, command, queue):
73 '''entrypoint for the single action process'''
74 log.debug("single action, fires after %d seconds (from now)", seconds)
76 log.debug("single action: executing command: '%s'", command)
77 ret_code, data = _execute_shell_command(command)
79 log.error("single action error! command:%s", command)
80 queue.put({'single-action-data': data})
82 log.debug("single action data: \n%s", data)
83 queue.put({'single-action-data': data})
86 def _periodic_action(interval, command, queue):
87 '''entrypoint for the periodic action process'''
88 log.debug("periodic action, fires every: %d seconds", interval)
92 time_spent += interval
93 log.debug("periodic action, executing command: '%s'", command)
94 ret_code, data = _execute_shell_command(command)
96 log.error("periodic action error! command:%s", command)
97 queue.put({'periodic-action-data': data})
99 log.debug("periodic action data: \n%s", data)
100 queue.put({'periodic-action-data': data})
103 class Runner(object):
109 def get_cls(runner_type):
110 '''return class of specified type'''
111 for runner in utils.itersubclasses(Runner):
112 if runner_type == runner.__execution_type__:
114 raise RuntimeError("No such runner_type %s" % runner_type)
118 '''return a list of known runner type (class) names'''
120 for runner in utils.itersubclasses(Runner):
126 """Returns instance of a scenario runner for execution type.
128 # if there is no runner, start the output serializer subprocess
129 if len(Runner.runners) == 0:
130 log.debug("Starting dump process file '%s'",
131 config["output_filename"])
132 Runner.queue = multiprocessing.Queue()
133 Runner.dump_process = multiprocessing.Process(
134 target=_output_serializer_main,
136 args=(config["output_filename"], Runner.queue))
137 Runner.dump_process.start()
139 return Runner.get_cls(config["type"])(config, Runner.queue)
142 def release_dump_process():
143 '''Release the dumper process'''
144 log.debug("Stopping dump process")
145 if Runner.dump_process:
146 Runner.queue.put('_TERMINATE_')
147 Runner.dump_process.join()
148 Runner.dump_process = None
152 '''Release the runner'''
153 if runner in Runner.runners:
154 Runner.runners.remove(runner)
156 # if this was the last runner, stop the output serializer subprocess
157 if len(Runner.runners) == 0:
158 Runner.release_dump_process()
161 def terminate(runner):
162 '''Terminate the runner'''
163 if runner.process and runner.process.is_alive():
164 runner.process.terminate()
168 '''Terminate all runners (subprocesses)'''
169 log.debug("Terminating all runners")
171 # release dumper process as some errors before any runner is created
172 if len(Runner.runners) == 0:
173 Runner.release_dump_process()
176 for runner in Runner.runners:
177 log.debug("Terminating runner: %s", runner)
179 runner.process.terminate()
180 runner.process.join()
181 if runner.periodic_action_process:
182 log.debug("Terminating periodic action process")
183 runner.periodic_action_process.terminate()
184 runner.periodic_action_process = None
185 Runner.release(runner)
187 def __init__(self, config, queue):
189 self.periodic_action_process = None
190 self.result_queue = queue
192 self.aborted = multiprocessing.Event()
193 Runner.runners.append(self)
195 def run_post_stop_action(self):
196 '''run a potentially configured post-stop action'''
197 if "post-stop-action" in self.config:
198 command = self.config["post-stop-action"]["command"]
199 log.debug("post stop action: command: '%s'", command)
200 ret_code, data = _execute_shell_command(command)
202 log.error("post action error! command:%s", command)
203 self.result_queue.put({'post-stop-action-data': data})
205 log.debug("post-stop data: \n%s", data)
206 self.result_queue.put({'post-stop-action-data': data})
208 def run(self, scenario_cfg, context_cfg):
209 scenario_type = scenario_cfg["type"]
210 class_name = base_scenario.Scenario.get(scenario_type)
211 path_split = class_name.split(".")
212 module_path = ".".join(path_split[:-1])
213 module = importlib.import_module(module_path)
214 cls = getattr(module, path_split[-1])
216 self.config['object'] = class_name
219 # run a potentially configured pre-start action
220 if "pre-start-action" in self.config:
221 command = self.config["pre-start-action"]["command"]
222 log.debug("pre start action: command: '%s'", command)
223 ret_code, data = _execute_shell_command(command)
225 log.error("pre-start action error! command:%s", command)
226 self.result_queue.put({'pre-start-action-data': data})
228 log.debug("pre-start data: \n%s", data)
229 self.result_queue.put({'pre-start-action-data': data})
231 if "single-shot-action" in self.config:
232 single_action_process = multiprocessing.Process(
233 target=_single_action,
234 name="single-shot-action",
235 args=(self.config["single-shot-action"]["after"],
236 self.config["single-shot-action"]["command"],
238 single_action_process.start()
240 if "periodic-action" in self.config:
241 self.periodic_action_process = multiprocessing.Process(
242 target=_periodic_action,
243 name="periodic-action",
244 args=(self.config["periodic-action"]["interval"],
245 self.config["periodic-action"]["command"],
247 self.periodic_action_process.start()
249 self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
252 '''Abort the execution of a scenario'''
255 def join(self, timeout=None):
256 self.process.join(timeout)
257 if self.periodic_action_process:
258 self.periodic_action_process.terminate()
259 self.periodic_action_process = None
261 self.run_post_stop_action()
262 return self.process.exitcode