94de45d1eae98a002983c7af23a7fdec826e4e7f
[yardstick.git] / yardstick / benchmark / runners / base.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15 #
16 # This is a modified copy of ``rally/rally/benchmark/runners/base.py``
17
18 import importlib
19 import logging
20 import multiprocessing
21 import subprocess
22 import time
23 import traceback
24
25 from six import moves
26
27 from yardstick.benchmark.scenarios import base as base_scenario
28 from yardstick.common import utils
29 from yardstick.dispatcher.base import Base as DispatcherBase
30
31
32 log = logging.getLogger(__name__)
33
34
35 def _execute_shell_command(command):
36     """execute shell script with error handling"""
37     exitcode = 0
38     try:
39         output = subprocess.check_output(command, shell=True)
40     except subprocess.CalledProcessError:
41         exitcode = -1
42         output = traceback.format_exc()
43         log.error("exec command '%s' error:\n ", command)
44         log.error(traceback.format_exc())
45
46     return exitcode, output
47
48
49 def _single_action(seconds, command, queue):
50     """entrypoint for the single action process"""
51     log.debug("single action, fires after %d seconds (from now)", seconds)
52     time.sleep(seconds)
53     log.debug("single action: executing command: '%s'", command)
54     ret_code, data = _execute_shell_command(command)
55     if ret_code < 0:
56         log.error("single action error! command:%s", command)
57         queue.put({'single-action-data': data})
58         return
59     log.debug("single action data: \n%s", data)
60     queue.put({'single-action-data': data})
61
62
63 def _periodic_action(interval, command, queue):
64     """entrypoint for the periodic action process"""
65     log.debug("periodic action, fires every: %d seconds", interval)
66     time_spent = 0
67     while True:
68         time.sleep(interval)
69         time_spent += interval
70         log.debug("periodic action, executing command: '%s'", command)
71         ret_code, data = _execute_shell_command(command)
72         if ret_code < 0:
73             log.error("periodic action error! command:%s", command)
74             queue.put({'periodic-action-data': data})
75             break
76         log.debug("periodic action data: \n%s", data)
77         queue.put({'periodic-action-data': data})
78
79
80 class ScenarioOutput(dict):
81
82     QUEUE_PUT_TIMEOUT = 10
83
84     def __init__(self, queue, **kwargs):
85         super(ScenarioOutput, self).__init__()
86         self._queue = queue
87         self.result_ext = dict()
88         for key, val in kwargs.items():
89             self.result_ext[key] = val
90             setattr(self, key, val)
91
92     def push(self, data=None, add_timestamp=True):
93         if data is None:
94             data = dict(self)
95
96         if add_timestamp:
97             result = {'timestamp': time.time(), 'data': data}
98         else:
99             result = data
100
101         for key in self.result_ext.keys():
102             result[key] = getattr(self, key)
103
104         self._queue.put(result, True, self.QUEUE_PUT_TIMEOUT)
105
106
107 class Runner(object):
108     runners = []
109
110     @staticmethod
111     def get_cls(runner_type):
112         """return class of specified type"""
113         for runner in utils.itersubclasses(Runner):
114             if runner_type == runner.__execution_type__:
115                 return runner
116         raise RuntimeError("No such runner_type %s" % runner_type)
117
118     @staticmethod
119     def get_types():
120         """return a list of known runner type (class) names"""
121         types = []
122         for runner in utils.itersubclasses(Runner):
123             types.append(runner)
124         return types
125
126     @staticmethod
127     def get(runner_cfg):
128         """Returns instance of a scenario runner for execution type.
129         """
130         return Runner.get_cls(runner_cfg["type"])(runner_cfg)
131
132     @staticmethod
133     def release(runner):
134         """Release the runner"""
135         if runner in Runner.runners:
136             Runner.runners.remove(runner)
137
138     @staticmethod
139     def terminate(runner):
140         """Terminate the runner"""
141         if runner.process and runner.process.is_alive():
142             runner.process.terminate()
143
144     @staticmethod
145     def terminate_all():
146         """Terminate all runners (subprocesses)"""
147         log.debug("Terminating all runners")
148
149         # release dumper process as some errors before any runner is created
150         if not Runner.runners:
151             return
152
153         for runner in Runner.runners:
154             log.debug("Terminating runner: %s", runner)
155             if runner.process:
156                 runner.process.terminate()
157                 runner.process.join()
158             if runner.periodic_action_process:
159                 log.debug("Terminating periodic action process")
160                 runner.periodic_action_process.terminate()
161                 runner.periodic_action_process = None
162             Runner.release(runner)
163
164     def __init__(self, config):
165         self.task_id = None
166         self.case_name = None
167         self.config = config
168         self.periodic_action_process = None
169         self.output_queue = multiprocessing.Queue()
170         self.result_queue = multiprocessing.Queue()
171         self.process = None
172         self.aborted = multiprocessing.Event()
173         Runner.runners.append(self)
174
175     def run_post_stop_action(self):
176         """run a potentially configured post-stop action"""
177         if "post-stop-action" in self.config:
178             command = self.config["post-stop-action"]["command"]
179             log.debug("post stop action: command: '%s'", command)
180             ret_code, data = _execute_shell_command(command)
181             if ret_code < 0:
182                 log.error("post action error! command:%s", command)
183                 self.result_queue.put({'post-stop-action-data': data})
184                 return
185             log.debug("post-stop data: \n%s", data)
186             self.result_queue.put({'post-stop-action-data': data})
187
188     def _run_benchmark(self, cls, method_name, scenario_cfg, context_cfg):
189         raise NotImplementedError
190
191     def run(self, scenario_cfg, context_cfg):
192         scenario_type = scenario_cfg["type"]
193         class_name = base_scenario.Scenario.get(scenario_type)
194         path_split = class_name.split(".")
195         module_path = ".".join(path_split[:-1])
196         module = importlib.import_module(module_path)
197         cls = getattr(module, path_split[-1])
198
199         self.config['object'] = class_name
200         self.case_name = scenario_cfg['tc']
201         self.task_id = scenario_cfg['task_id']
202         self.aborted.clear()
203
204         # run a potentially configured pre-start action
205         if "pre-start-action" in self.config:
206             command = self.config["pre-start-action"]["command"]
207             log.debug("pre start action: command: '%s'", command)
208             ret_code, data = _execute_shell_command(command)
209             if ret_code < 0:
210                 log.error("pre-start action error! command:%s", command)
211                 self.result_queue.put({'pre-start-action-data': data})
212                 return
213             log.debug("pre-start data: \n%s", data)
214             self.result_queue.put({'pre-start-action-data': data})
215
216         if "single-shot-action" in self.config:
217             single_action_process = multiprocessing.Process(
218                 target=_single_action,
219                 name="single-shot-action",
220                 args=(self.config["single-shot-action"]["after"],
221                       self.config["single-shot-action"]["command"],
222                       self.result_queue))
223             single_action_process.start()
224
225         if "periodic-action" in self.config:
226             self.periodic_action_process = multiprocessing.Process(
227                 target=_periodic_action,
228                 name="periodic-action",
229                 args=(self.config["periodic-action"]["interval"],
230                       self.config["periodic-action"]["command"],
231                       self.result_queue))
232             self.periodic_action_process.start()
233
234         self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
235
236     def abort(self):
237         """Abort the execution of a scenario"""
238         self.aborted.set()
239
240     QUEUE_JOIN_INTERVAL = 5
241
242     def poll(self, timeout=QUEUE_JOIN_INTERVAL):
243         self.process.join(timeout)
244         return self.process.exitcode
245
246     def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
247         while self.process.exitcode is None:
248             # drain the queue while we are running otherwise we won't terminate
249             outputs.update(self.get_output())
250             result.extend(self.get_result())
251             self.process.join(interval)
252         # drain after the process has exited
253         outputs.update(self.get_output())
254         result.extend(self.get_result())
255
256         self.process.terminate()
257         if self.periodic_action_process:
258             self.periodic_action_process.join(1)
259             self.periodic_action_process.terminate()
260             self.periodic_action_process = None
261
262         self.run_post_stop_action()
263         return self.process.exitcode
264
265     def get_output(self):
266         result = {}
267         while not self.output_queue.empty():
268             log.debug("output_queue size %s", self.output_queue.qsize())
269             try:
270                 result.update(self.output_queue.get(True, 1))
271             except moves.queue.Empty:
272                 pass
273         return result
274
275     def get_result(self):
276         result = []
277
278         dispatcher = self.config['output_config']['DEFAULT']['dispatcher']
279         output_in_influxdb = 'influxdb' in dispatcher
280
281         while not self.result_queue.empty():
282             log.debug("result_queue size %s", self.result_queue.qsize())
283             try:
284                 one_record = self.result_queue.get(True, 1)
285             except moves.queue.Empty:
286                 pass
287             else:
288                 if output_in_influxdb:
289                     self._output_to_influxdb(one_record)
290
291                 result.append(one_record)
292         return result
293
294     def _output_to_influxdb(self, record):
295         dispatchers = DispatcherBase.get(self.config['output_config'])
296         dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
297         dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)