Fix flake8 errors
[yardstick.git] / yardstick / benchmark / runners / base.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import importlib
11 import logging
12 import multiprocessing
13 import subprocess
14 import time
15 import traceback
16
17 from oslo_config import cfg
18
19 import yardstick.common.utils as utils
20 from yardstick.benchmark.scenarios import base as base_scenario
21 from yardstick.dispatcher.base import Base as DispatcherBase
22
23 log = logging.getLogger(__name__)
24
25 CONF = cfg.CONF
26
27
28 def _output_serializer_main(filename, queue):
29     '''entrypoint for the singleton subprocess writing to outfile
30     Use of this process enables multiple instances of a scenario without
31     messing up the output file.
32     '''
33     config = {}
34     config["type"] = CONF.dispatcher.capitalize()
35     config["file_path"] = filename
36     dispatcher = DispatcherBase.get(config)
37
38     while True:
39         # blocks until data becomes available
40         record = queue.get()
41         if record == '_TERMINATE_':
42             dispatcher.flush_result_data()
43             break
44         else:
45             dispatcher.record_result_data(record)
46
47
48 def _execute_shell_command(command):
49     '''execute shell script with error handling'''
50     exitcode = 0
51     output = []
52     try:
53         output = subprocess.check_output(command, shell=True)
54     except Exception:
55         exitcode = -1
56         output = traceback.format_exc()
57         log.error("exec command '%s' error:\n " % command)
58         log.error(traceback.format_exc())
59
60     return exitcode, output
61
62
63 def _single_action(seconds, command, queue):
64     '''entrypoint for the single action process'''
65     log.debug("single action, fires after %d seconds (from now)", seconds)
66     time.sleep(seconds)
67     log.debug("single action: executing command: '%s'", command)
68     ret_code, data = _execute_shell_command(command)
69     if ret_code < 0:
70         log.error("single action error! command:%s" % command)
71         queue.put({'single-action-data': data})
72         return
73     log.debug("single action data: \n%s" % data)
74     queue.put({'single-action-data': data})
75
76
77 def _periodic_action(interval, command, queue):
78     '''entrypoint for the periodic action process'''
79     log.debug("periodic action, fires every: %d seconds", interval)
80     time_spent = 0
81     while True:
82         time.sleep(interval)
83         time_spent += interval
84         log.debug("periodic action, executing command: '%s'", command)
85         ret_code, data = _execute_shell_command(command)
86         if ret_code < 0:
87             log.error("periodic action error! command:%s", command)
88             queue.put({'periodic-action-data': data})
89             break
90         log.debug("periodic action data: \n%s" % data)
91         queue.put({'periodic-action-data': data})
92
93
94 class Runner(object):
95     queue = None
96     dump_process = None
97     runners = []
98
99     @staticmethod
100     def get_cls(runner_type):
101         '''return class of specified type'''
102         for runner in utils.itersubclasses(Runner):
103             if runner_type == runner.__execution_type__:
104                 return runner
105         raise RuntimeError("No such runner_type %s" % runner_type)
106
107     @staticmethod
108     def get_types():
109         '''return a list of known runner type (class) names'''
110         types = []
111         for runner in utils.itersubclasses(Runner):
112             types.append(runner)
113         return types
114
115     @staticmethod
116     def get(config):
117         """Returns instance of a scenario runner for execution type.
118         """
119         # if there is no runner, start the output serializer subprocess
120         if len(Runner.runners) == 0:
121             log.debug("Starting dump process file '%s'" %
122                       config["output_filename"])
123             Runner.queue = multiprocessing.Queue()
124             Runner.dump_process = multiprocessing.Process(
125                 target=_output_serializer_main,
126                 name="Dumper",
127                 args=(config["output_filename"], Runner.queue))
128             Runner.dump_process.start()
129
130         return Runner.get_cls(config["type"])(config, Runner.queue)
131
132     @staticmethod
133     def release_dump_process():
134         '''Release the dumper process'''
135         log.debug("Stopping dump process")
136         if Runner.dump_process:
137             Runner.queue.put('_TERMINATE_')
138             Runner.dump_process.join()
139             Runner.dump_process = None
140
141     @staticmethod
142     def release(runner):
143         '''Release the runner'''
144         if runner in Runner.runners:
145             Runner.runners.remove(runner)
146
147         # if this was the last runner, stop the output serializer subprocess
148         if len(Runner.runners) == 0:
149             Runner.release_dump_process()
150
151     @staticmethod
152     def terminate(runner):
153         '''Terminate the runner'''
154         if runner.process and runner.process.is_alive():
155             runner.process.terminate()
156
157     @staticmethod
158     def terminate_all():
159         '''Terminate all runners (subprocesses)'''
160         log.debug("Terminating all runners")
161
162         # release dumper process as some errors before any runner is created
163         if len(Runner.runners) == 0:
164             Runner.release_dump_process()
165             return
166
167         for runner in Runner.runners:
168             log.debug("Terminating runner: %s", runner)
169             if runner.process:
170                 runner.process.terminate()
171                 runner.process.join()
172             if runner.periodic_action_process:
173                 log.debug("Terminating periodic action process")
174                 runner.periodic_action_process.terminate()
175                 runner.periodic_action_process = None
176             Runner.release(runner)
177
178     def __init__(self, config, queue):
179         self.config = config
180         self.periodic_action_process = None
181         self.result_queue = queue
182         self.process = None
183         self.aborted = multiprocessing.Event()
184         Runner.runners.append(self)
185
186     def run_post_stop_action(self):
187         '''run a potentially configured post-stop action'''
188         if "post-stop-action" in self.config:
189             command = self.config["post-stop-action"]["command"]
190             log.debug("post stop action: command: '%s'" % command)
191             ret_code, data = _execute_shell_command(command)
192             if ret_code < 0:
193                 log.error("post action error! command:%s", command)
194                 self.result_queue.put({'post-stop-action-data': data})
195                 return
196             log.debug("post-stop data: \n%s" % data)
197             self.result_queue.put({'post-stop-action-data': data})
198
199     def run(self, scenario_cfg, context_cfg):
200         scenario_type = scenario_cfg["type"]
201         class_name = base_scenario.Scenario.get(scenario_type)
202         path_split = class_name.split(".")
203         module_path = ".".join(path_split[:-1])
204         module = importlib.import_module(module_path)
205         cls = getattr(module, path_split[-1])
206
207         self.config['object'] = class_name
208         self.aborted.clear()
209
210         # run a potentially configured pre-start action
211         if "pre-start-action" in self.config:
212             command = self.config["pre-start-action"]["command"]
213             log.debug("pre start action: command: '%s'" % command)
214             ret_code, data = _execute_shell_command(command)
215             if ret_code < 0:
216                 log.error("pre-start action error! command:%s", command)
217                 self.result_queue.put({'pre-start-action-data': data})
218                 return
219             log.debug("pre-start data: \n%s" % data)
220             self.result_queue.put({'pre-start-action-data': data})
221
222         if "single-shot-action" in self.config:
223             single_action_process = multiprocessing.Process(
224                 target=_single_action,
225                 name="single-shot-action",
226                 args=(self.config["single-shot-action"]["after"],
227                       self.config["single-shot-action"]["command"],
228                       self.result_queue))
229             single_action_process.start()
230
231         if "periodic-action" in self.config:
232             self.periodic_action_process = multiprocessing.Process(
233                 target=_periodic_action,
234                 name="periodic-action",
235                 args=(self.config["periodic-action"]["interval"],
236                       self.config["periodic-action"]["command"],
237                       self.result_queue))
238             self.periodic_action_process.start()
239
240         self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
241
242     def abort(self):
243         '''Abort the execution of a scenario'''
244         self.aborted.set()
245
246     def join(self, timeout=None):
247         self.process.join(timeout)
248         if self.periodic_action_process:
249             self.periodic_action_process.terminate()
250             self.periodic_action_process = None
251
252         self.run_post_stop_action()
253         return self.process.exitcode