Merge "Fix report generation"
[yardstick.git] / yardstick / benchmark / runners / base.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/base.py
18
19 from __future__ import absolute_import
20
21 import logging
22 import multiprocessing
23 import subprocess
24 import time
25 import traceback
26 from subprocess import CalledProcessError
27
28 import importlib
29
30 from six.moves.queue import Empty
31
32 import yardstick.common.utils as utils
33 from yardstick.benchmark.scenarios import base as base_scenario
34 from yardstick.dispatcher.base import Base as DispatcherBase
35
36 log = logging.getLogger(__name__)
37
38
39 def _execute_shell_command(command):
40     """execute shell script with error handling"""
41     exitcode = 0
42     try:
43         output = subprocess.check_output(command, shell=True)
44     except CalledProcessError:
45         exitcode = -1
46         output = traceback.format_exc()
47         log.error("exec command '%s' error:\n ", command)
48         log.error(traceback.format_exc())
49
50     return exitcode, output
51
52
53 def _single_action(seconds, command, queue):
54     """entrypoint for the single action process"""
55     log.debug("single action, fires after %d seconds (from now)", seconds)
56     time.sleep(seconds)
57     log.debug("single action: executing command: '%s'", command)
58     ret_code, data = _execute_shell_command(command)
59     if ret_code < 0:
60         log.error("single action error! command:%s", command)
61         queue.put({'single-action-data': data})
62         return
63     log.debug("single action data: \n%s", data)
64     queue.put({'single-action-data': data})
65
66
67 def _periodic_action(interval, command, queue):
68     """entrypoint for the periodic action process"""
69     log.debug("periodic action, fires every: %d seconds", interval)
70     time_spent = 0
71     while True:
72         time.sleep(interval)
73         time_spent += interval
74         log.debug("periodic action, executing command: '%s'", command)
75         ret_code, data = _execute_shell_command(command)
76         if ret_code < 0:
77             log.error("periodic action error! command:%s", command)
78             queue.put({'periodic-action-data': data})
79             break
80         log.debug("periodic action data: \n%s", data)
81         queue.put({'periodic-action-data': data})
82
83
84 class Runner(object):
85     runners = []
86
87     @staticmethod
88     def get_cls(runner_type):
89         """return class of specified type"""
90         for runner in utils.itersubclasses(Runner):
91             if runner_type == runner.__execution_type__:
92                 return runner
93         raise RuntimeError("No such runner_type %s" % runner_type)
94
95     @staticmethod
96     def get_types():
97         """return a list of known runner type (class) names"""
98         types = []
99         for runner in utils.itersubclasses(Runner):
100             types.append(runner)
101         return types
102
103     @staticmethod
104     def get(runner_cfg):
105         """Returns instance of a scenario runner for execution type.
106         """
107         return Runner.get_cls(runner_cfg["type"])(runner_cfg)
108
109     @staticmethod
110     def release(runner):
111         """Release the runner"""
112         if runner in Runner.runners:
113             Runner.runners.remove(runner)
114
115     @staticmethod
116     def terminate(runner):
117         """Terminate the runner"""
118         if runner.process and runner.process.is_alive():
119             runner.process.terminate()
120
121     @staticmethod
122     def terminate_all():
123         """Terminate all runners (subprocesses)"""
124         log.debug("Terminating all runners", exc_info=True)
125
126         # release dumper process as some errors before any runner is created
127         if not Runner.runners:
128             return
129
130         for runner in Runner.runners:
131             log.debug("Terminating runner: %s", runner)
132             if runner.process:
133                 runner.process.terminate()
134                 runner.process.join()
135             if runner.periodic_action_process:
136                 log.debug("Terminating periodic action process")
137                 runner.periodic_action_process.terminate()
138                 runner.periodic_action_process = None
139             Runner.release(runner)
140
141     def __init__(self, config):
142         self.task_id = None
143         self.case_name = None
144         self.config = config
145         self.periodic_action_process = None
146         self.output_queue = multiprocessing.Queue()
147         self.result_queue = multiprocessing.Queue()
148         self.process = None
149         self.aborted = multiprocessing.Event()
150         Runner.runners.append(self)
151
152     def run_post_stop_action(self):
153         """run a potentially configured post-stop action"""
154         if "post-stop-action" in self.config:
155             command = self.config["post-stop-action"]["command"]
156             log.debug("post stop action: command: '%s'", command)
157             ret_code, data = _execute_shell_command(command)
158             if ret_code < 0:
159                 log.error("post action error! command:%s", command)
160                 self.result_queue.put({'post-stop-action-data': data})
161                 return
162             log.debug("post-stop data: \n%s", data)
163             self.result_queue.put({'post-stop-action-data': data})
164
165     def _run_benchmark(self, cls, method_name, scenario_cfg, context_cfg):
166         raise NotImplementedError
167
168     def run(self, scenario_cfg, context_cfg):
169         scenario_type = scenario_cfg["type"]
170         class_name = base_scenario.Scenario.get(scenario_type)
171         path_split = class_name.split(".")
172         module_path = ".".join(path_split[:-1])
173         module = importlib.import_module(module_path)
174         cls = getattr(module, path_split[-1])
175
176         self.config['object'] = class_name
177         self.case_name = scenario_cfg['tc']
178         self.task_id = scenario_cfg['task_id']
179         self.aborted.clear()
180
181         # run a potentially configured pre-start action
182         if "pre-start-action" in self.config:
183             command = self.config["pre-start-action"]["command"]
184             log.debug("pre start action: command: '%s'", command)
185             ret_code, data = _execute_shell_command(command)
186             if ret_code < 0:
187                 log.error("pre-start action error! command:%s", command)
188                 self.result_queue.put({'pre-start-action-data': data})
189                 return
190             log.debug("pre-start data: \n%s", data)
191             self.result_queue.put({'pre-start-action-data': data})
192
193         if "single-shot-action" in self.config:
194             single_action_process = multiprocessing.Process(
195                 target=_single_action,
196                 name="single-shot-action",
197                 args=(self.config["single-shot-action"]["after"],
198                       self.config["single-shot-action"]["command"],
199                       self.result_queue))
200             single_action_process.start()
201
202         if "periodic-action" in self.config:
203             self.periodic_action_process = multiprocessing.Process(
204                 target=_periodic_action,
205                 name="periodic-action",
206                 args=(self.config["periodic-action"]["interval"],
207                       self.config["periodic-action"]["command"],
208                       self.result_queue))
209             self.periodic_action_process.start()
210
211         self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
212
213     def abort(self):
214         """Abort the execution of a scenario"""
215         self.aborted.set()
216
217     QUEUE_JOIN_INTERVAL = 5
218
219     def poll(self, timeout=QUEUE_JOIN_INTERVAL):
220         self.process.join(timeout)
221         return self.process.exitcode
222
223     def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
224         while self.process.exitcode is None:
225             # drain the queue while we are running otherwise we won't terminate
226             outputs.update(self.get_output())
227             result.extend(self.get_result())
228             self.process.join(interval)
229         # drain after the process has exited
230         outputs.update(self.get_output())
231         result.extend(self.get_result())
232
233         self.process.terminate()
234         if self.periodic_action_process:
235             self.periodic_action_process.join(1)
236             self.periodic_action_process.terminate()
237             self.periodic_action_process = None
238
239         self.run_post_stop_action()
240         return self.process.exitcode
241
242     def get_output(self):
243         result = {}
244         while not self.output_queue.empty():
245             log.debug("output_queue size %s", self.output_queue.qsize())
246             try:
247                 result.update(self.output_queue.get(True, 1))
248             except Empty:
249                 pass
250         return result
251
252     def get_result(self):
253         result = []
254
255         dispatcher = self.config['output_config']['DEFAULT']['dispatcher']
256         output_in_influxdb = 'influxdb' in dispatcher
257
258         while not self.result_queue.empty():
259             log.debug("result_queue size %s", self.result_queue.qsize())
260             try:
261                 one_record = self.result_queue.get(True, 1)
262             except Empty:
263                 pass
264             else:
265                 if output_in_influxdb:
266                     self._output_to_influxdb(one_record)
267
268                 result.append(one_record)
269         return result
270
271     def _output_to_influxdb(self, record):
272         dispatchers = DispatcherBase.get(self.config['output_config'])
273         dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
274         dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)