Prohibit the importation of a list of libraries
[yardstick.git] / yardstick / benchmark / runners / base.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/base.py
18
19 from __future__ import absolute_import
20
21 import logging
22 import multiprocessing
23 import subprocess
24 import time
25 import traceback
26
27 import importlib
28
29 from six.moves.queue import Empty
30
31 import yardstick.common.utils as utils
32 from yardstick.benchmark.scenarios import base as base_scenario
33
34 log = logging.getLogger(__name__)
35
36
37 def _execute_shell_command(command):
38     """execute shell script with error handling"""
39     exitcode = 0
40     try:
41         output = subprocess.check_output(command, shell=True)
42     except Exception:
43         exitcode = -1
44         output = traceback.format_exc()
45         log.error("exec command '%s' error:\n ", command)
46         log.error(traceback.format_exc())
47
48     return exitcode, output
49
50
51 def _single_action(seconds, command, queue):
52     """entrypoint for the single action process"""
53     log.debug("single action, fires after %d seconds (from now)", seconds)
54     time.sleep(seconds)
55     log.debug("single action: executing command: '%s'", command)
56     ret_code, data = _execute_shell_command(command)
57     if ret_code < 0:
58         log.error("single action error! command:%s", command)
59         queue.put({'single-action-data': data})
60         return
61     log.debug("single action data: \n%s", data)
62     queue.put({'single-action-data': data})
63
64
65 def _periodic_action(interval, command, queue):
66     """entrypoint for the periodic action process"""
67     log.debug("periodic action, fires every: %d seconds", interval)
68     time_spent = 0
69     while True:
70         time.sleep(interval)
71         time_spent += interval
72         log.debug("periodic action, executing command: '%s'", command)
73         ret_code, data = _execute_shell_command(command)
74         if ret_code < 0:
75             log.error("periodic action error! command:%s", command)
76             queue.put({'periodic-action-data': data})
77             break
78         log.debug("periodic action data: \n%s", data)
79         queue.put({'periodic-action-data': data})
80
81
82 class Runner(object):
83     runners = []
84
85     @staticmethod
86     def get_cls(runner_type):
87         """return class of specified type"""
88         for runner in utils.itersubclasses(Runner):
89             if runner_type == runner.__execution_type__:
90                 return runner
91         raise RuntimeError("No such runner_type %s" % runner_type)
92
93     @staticmethod
94     def get_types():
95         """return a list of known runner type (class) names"""
96         types = []
97         for runner in utils.itersubclasses(Runner):
98             types.append(runner)
99         return types
100
101     @staticmethod
102     def get(runner_cfg):
103         """Returns instance of a scenario runner for execution type.
104         """
105         return Runner.get_cls(runner_cfg["type"])(runner_cfg)
106
107     @staticmethod
108     def release(runner):
109         """Release the runner"""
110         if runner in Runner.runners:
111             Runner.runners.remove(runner)
112
113     @staticmethod
114     def terminate(runner):
115         """Terminate the runner"""
116         if runner.process and runner.process.is_alive():
117             runner.process.terminate()
118
119     @staticmethod
120     def terminate_all():
121         """Terminate all runners (subprocesses)"""
122         log.debug("Terminating all runners", exc_info=True)
123
124         # release dumper process as some errors before any runner is created
125         if not Runner.runners:
126             return
127
128         for runner in Runner.runners:
129             log.debug("Terminating runner: %s", runner)
130             if runner.process:
131                 runner.process.terminate()
132                 runner.process.join()
133             if runner.periodic_action_process:
134                 log.debug("Terminating periodic action process")
135                 runner.periodic_action_process.terminate()
136                 runner.periodic_action_process = None
137             Runner.release(runner)
138
139     def __init__(self, config):
140         self.config = config
141         self.periodic_action_process = None
142         self.output_queue = multiprocessing.Queue()
143         self.result_queue = multiprocessing.Queue()
144         self.process = None
145         self.aborted = multiprocessing.Event()
146         Runner.runners.append(self)
147
148     def run_post_stop_action(self):
149         """run a potentially configured post-stop action"""
150         if "post-stop-action" in self.config:
151             command = self.config["post-stop-action"]["command"]
152             log.debug("post stop action: command: '%s'", command)
153             ret_code, data = _execute_shell_command(command)
154             if ret_code < 0:
155                 log.error("post action error! command:%s", command)
156                 self.result_queue.put({'post-stop-action-data': data})
157                 return
158             log.debug("post-stop data: \n%s", data)
159             self.result_queue.put({'post-stop-action-data': data})
160
161     def _run_benchmark(self, cls, method_name, scenario_cfg, context_cfg):
162         raise NotImplementedError
163
164     def run(self, scenario_cfg, context_cfg):
165         scenario_type = scenario_cfg["type"]
166         class_name = base_scenario.Scenario.get(scenario_type)
167         path_split = class_name.split(".")
168         module_path = ".".join(path_split[:-1])
169         module = importlib.import_module(module_path)
170         cls = getattr(module, path_split[-1])
171
172         self.config['object'] = class_name
173         self.aborted.clear()
174
175         # run a potentially configured pre-start action
176         if "pre-start-action" in self.config:
177             command = self.config["pre-start-action"]["command"]
178             log.debug("pre start action: command: '%s'", command)
179             ret_code, data = _execute_shell_command(command)
180             if ret_code < 0:
181                 log.error("pre-start action error! command:%s", command)
182                 self.result_queue.put({'pre-start-action-data': data})
183                 return
184             log.debug("pre-start data: \n%s", data)
185             self.result_queue.put({'pre-start-action-data': data})
186
187         if "single-shot-action" in self.config:
188             single_action_process = multiprocessing.Process(
189                 target=_single_action,
190                 name="single-shot-action",
191                 args=(self.config["single-shot-action"]["after"],
192                       self.config["single-shot-action"]["command"],
193                       self.result_queue))
194             single_action_process.start()
195
196         if "periodic-action" in self.config:
197             self.periodic_action_process = multiprocessing.Process(
198                 target=_periodic_action,
199                 name="periodic-action",
200                 args=(self.config["periodic-action"]["interval"],
201                       self.config["periodic-action"]["command"],
202                       self.result_queue))
203             self.periodic_action_process.start()
204
205         self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
206
207     def abort(self):
208         """Abort the execution of a scenario"""
209         self.aborted.set()
210
211     QUEUE_JOIN_INTERVAL = 5
212
213     def poll(self, timeout=QUEUE_JOIN_INTERVAL):
214         self.process.join(timeout)
215         return self.process.exitcode
216
217     def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
218         while self.process.exitcode is None:
219             # drain the queue while we are running otherwise we won't terminate
220             outputs.update(self.get_output())
221             result.extend(self.get_result())
222             self.process.join(interval)
223         # drain after the process has exited
224         outputs.update(self.get_output())
225         result.extend(self.get_result())
226
227         self.process.terminate()
228         if self.periodic_action_process:
229             self.periodic_action_process.join(1)
230             self.periodic_action_process.terminate()
231             self.periodic_action_process = None
232
233         self.run_post_stop_action()
234         return self.process.exitcode
235
236     def get_output(self):
237         result = {}
238         while not self.output_queue.empty():
239             log.debug("output_queue size %s", self.output_queue.qsize())
240             try:
241                 result.update(self.output_queue.get(True, 1))
242             except Empty:
243                 pass
244         return result
245
246     def get_result(self):
247         result = []
248         while not self.result_queue.empty():
249             log.debug("result_queue size %s", self.result_queue.qsize())
250             try:
251                 result.append(self.result_queue.get(True, 1))
252             except Empty:
253                 pass
254         return result