Merge "Use Chart.js for graphs in HTML reports"
[yardstick.git] / yardstick / benchmark / runners / base.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16 # This is a modified copy of ``rally/rally/benchmark/runners/base.py``
17
18 import importlib
19 import logging
20 import multiprocessing
21 import subprocess
22 import time
23 import traceback
24
25 from six import moves
26
27 from yardstick.benchmark.scenarios import base as base_scenario
28 from yardstick.common import messaging
29 from yardstick.common.messaging import payloads
30 from yardstick.common.messaging import producer
31 from yardstick.common import utils
32 from yardstick.dispatcher.base import Base as DispatcherBase
33
34
35 log = logging.getLogger(__name__)
36
37
38 def _execute_shell_command(command):
39     """execute shell script with error handling"""
40     exitcode = 0
41     try:
42         output = subprocess.check_output(command, shell=True)
43     except subprocess.CalledProcessError:
44         exitcode = -1
45         output = traceback.format_exc()
46         log.error("exec command '%s' error:\n ", command)
47         log.error(traceback.format_exc())
48
49     return exitcode, output
50
51
52 def _single_action(seconds, command, queue):
53     """entrypoint for the single action process"""
54     log.debug("single action, fires after %d seconds (from now)", seconds)
55     time.sleep(seconds)
56     log.debug("single action: executing command: '%s'", command)
57     ret_code, data = _execute_shell_command(command)
58     if ret_code < 0:
59         log.error("single action error! command:%s", command)
60         queue.put({'single-action-data': data})
61         return
62     log.debug("single action data: \n%s", data)
63     queue.put({'single-action-data': data})
64
65
66 def _periodic_action(interval, command, queue):
67     """entrypoint for the periodic action process"""
68     log.debug("periodic action, fires every: %d seconds", interval)
69     time_spent = 0
70     while True:
71         time.sleep(interval)
72         time_spent += interval
73         log.debug("periodic action, executing command: '%s'", command)
74         ret_code, data = _execute_shell_command(command)
75         if ret_code < 0:
76             log.error("periodic action error! command:%s", command)
77             queue.put({'periodic-action-data': data})
78             break
79         log.debug("periodic action data: \n%s", data)
80         queue.put({'periodic-action-data': data})
81
82
83 class Runner(object):
84     runners = []
85
86     @staticmethod
87     def get_cls(runner_type):
88         """return class of specified type"""
89         for runner in utils.itersubclasses(Runner):
90             if runner_type == runner.__execution_type__:
91                 return runner
92         raise RuntimeError("No such runner_type %s" % runner_type)
93
94     @staticmethod
95     def get_types():
96         """return a list of known runner type (class) names"""
97         types = []
98         for runner in utils.itersubclasses(Runner):
99             types.append(runner)
100         return types
101
102     @staticmethod
103     def get(runner_cfg):
104         """Returns instance of a scenario runner for execution type.
105         """
106         return Runner.get_cls(runner_cfg["type"])(runner_cfg)
107
108     @staticmethod
109     def release(runner):
110         """Release the runner"""
111         if runner in Runner.runners:
112             Runner.runners.remove(runner)
113
114     @staticmethod
115     def terminate(runner):
116         """Terminate the runner"""
117         if runner.process and runner.process.is_alive():
118             runner.process.terminate()
119
120     @staticmethod
121     def terminate_all():
122         """Terminate all runners (subprocesses)"""
123         log.debug("Terminating all runners")
124
125         # release dumper process as some errors before any runner is created
126         if not Runner.runners:
127             return
128
129         for runner in Runner.runners:
130             log.debug("Terminating runner: %s", runner)
131             if runner.process:
132                 runner.process.terminate()
133                 runner.process.join()
134             if runner.periodic_action_process:
135                 log.debug("Terminating periodic action process")
136                 runner.periodic_action_process.terminate()
137                 runner.periodic_action_process = None
138             Runner.release(runner)
139
140     def __init__(self, config):
141         self.task_id = None
142         self.case_name = None
143         self.config = config
144         self.periodic_action_process = None
145         self.output_queue = multiprocessing.Queue()
146         self.result_queue = multiprocessing.Queue()
147         self.process = None
148         self.aborted = multiprocessing.Event()
149         Runner.runners.append(self)
150
151     def run_post_stop_action(self):
152         """run a potentially configured post-stop action"""
153         if "post-stop-action" in self.config:
154             command = self.config["post-stop-action"]["command"]
155             log.debug("post stop action: command: '%s'", command)
156             ret_code, data = _execute_shell_command(command)
157             if ret_code < 0:
158                 log.error("post action error! command:%s", command)
159                 self.result_queue.put({'post-stop-action-data': data})
160                 return
161             log.debug("post-stop data: \n%s", data)
162             self.result_queue.put({'post-stop-action-data': data})
163
164     def _run_benchmark(self, cls, method_name, scenario_cfg, context_cfg):
165         raise NotImplementedError
166
167     def run(self, scenario_cfg, context_cfg):
168         scenario_type = scenario_cfg["type"]
169         class_name = base_scenario.Scenario.get(scenario_type)
170         path_split = class_name.split(".")
171         module_path = ".".join(path_split[:-1])
172         module = importlib.import_module(module_path)
173         cls = getattr(module, path_split[-1])
174
175         self.config['object'] = class_name
176         self.case_name = scenario_cfg['tc']
177         self.task_id = scenario_cfg['task_id']
178         self.aborted.clear()
179
180         # run a potentially configured pre-start action
181         if "pre-start-action" in self.config:
182             command = self.config["pre-start-action"]["command"]
183             log.debug("pre start action: command: '%s'", command)
184             ret_code, data = _execute_shell_command(command)
185             if ret_code < 0:
186                 log.error("pre-start action error! command:%s", command)
187                 self.result_queue.put({'pre-start-action-data': data})
188                 return
189             log.debug("pre-start data: \n%s", data)
190             self.result_queue.put({'pre-start-action-data': data})
191
192         if "single-shot-action" in self.config:
193             single_action_process = multiprocessing.Process(
194                 target=_single_action,
195                 name="single-shot-action",
196                 args=(self.config["single-shot-action"]["after"],
197                       self.config["single-shot-action"]["command"],
198                       self.result_queue))
199             single_action_process.start()
200
201         if "periodic-action" in self.config:
202             self.periodic_action_process = multiprocessing.Process(
203                 target=_periodic_action,
204                 name="periodic-action",
205                 args=(self.config["periodic-action"]["interval"],
206                       self.config["periodic-action"]["command"],
207                       self.result_queue))
208             self.periodic_action_process.start()
209
210         self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
211
212     def abort(self):
213         """Abort the execution of a scenario"""
214         self.aborted.set()
215
216     QUEUE_JOIN_INTERVAL = 5
217
218     def poll(self, timeout=QUEUE_JOIN_INTERVAL):
219         self.process.join(timeout)
220         return self.process.exitcode
221
222     def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
223         while self.process.exitcode is None:
224             # drain the queue while we are running otherwise we won't terminate
225             outputs.update(self.get_output())
226             result.extend(self.get_result())
227             self.process.join(interval)
228         # drain after the process has exited
229         outputs.update(self.get_output())
230         result.extend(self.get_result())
231
232         self.process.terminate()
233         if self.periodic_action_process:
234             self.periodic_action_process.join(1)
235             self.periodic_action_process.terminate()
236             self.periodic_action_process = None
237
238         self.run_post_stop_action()
239         return self.process.exitcode
240
241     def get_output(self):
242         result = {}
243         while not self.output_queue.empty():
244             log.debug("output_queue size %s", self.output_queue.qsize())
245             try:
246                 result.update(self.output_queue.get(True, 1))
247             except moves.queue.Empty:
248                 pass
249         return result
250
251     def get_result(self):
252         result = []
253
254         dispatcher = self.config['output_config']['DEFAULT']['dispatcher']
255         output_in_influxdb = 'influxdb' in dispatcher
256
257         while not self.result_queue.empty():
258             log.debug("result_queue size %s", self.result_queue.qsize())
259             try:
260                 one_record = self.result_queue.get(True, 1)
261             except moves.queue.Empty:
262                 pass
263             else:
264                 if output_in_influxdb:
265                     self._output_to_influxdb(one_record)
266
267                 result.append(one_record)
268         return result
269
270     def _output_to_influxdb(self, record):
271         dispatchers = DispatcherBase.get(self.config['output_config'])
272         dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
273         dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
274
275
276 class RunnerProducer(producer.MessagingProducer):
277     """Class implementing the message producer for runners"""
278
279     def __init__(self, _id):
280         super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id)
281
282     def start_iteration(self, version=1, data=None):
283         data = {} if not data else data
284         self.send_message(
285             messaging.RUNNER_METHOD_START_ITERATION,
286             payloads.RunnerPayload(version=version, data=data))
287
288     def stop_iteration(self, version=1, data=None):
289         data = {} if not data else data
290         self.send_message(
291             messaging.RUNNER_METHOD_STOP_ITERATION,
292             payloads.RunnerPayload(version=version, data=data))