dynamictp: fix flake8 warning
[yardstick.git] / yardstick / benchmark / runners / base.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/base.py
18
19 from __future__ import absolute_import
20 import importlib
21 import logging
22 import multiprocessing
23 import subprocess
24 import time
25 import traceback
26
27 import yardstick.common.utils as utils
28 from yardstick.benchmark.scenarios import base as base_scenario
29
30 log = logging.getLogger(__name__)
31
32
33 def _execute_shell_command(command):
34     """execute shell script with error handling"""
35     exitcode = 0
36     output = []
37     try:
38         output = subprocess.check_output(command, shell=True)
39     except Exception:
40         exitcode = -1
41         output = traceback.format_exc()
42         log.error("exec command '%s' error:\n ", command)
43         log.error(traceback.format_exc())
44
45     return exitcode, output
46
47
48 def _single_action(seconds, command, queue):
49     """entrypoint for the single action process"""
50     log.debug("single action, fires after %d seconds (from now)", seconds)
51     time.sleep(seconds)
52     log.debug("single action: executing command: '%s'", command)
53     ret_code, data = _execute_shell_command(command)
54     if ret_code < 0:
55         log.error("single action error! command:%s", command)
56         queue.put({'single-action-data': data})
57         return
58     log.debug("single action data: \n%s", data)
59     queue.put({'single-action-data': data})
60
61
62 def _periodic_action(interval, command, queue):
63     """entrypoint for the periodic action process"""
64     log.debug("periodic action, fires every: %d seconds", interval)
65     time_spent = 0
66     while True:
67         time.sleep(interval)
68         time_spent += interval
69         log.debug("periodic action, executing command: '%s'", command)
70         ret_code, data = _execute_shell_command(command)
71         if ret_code < 0:
72             log.error("periodic action error! command:%s", command)
73             queue.put({'periodic-action-data': data})
74             break
75         log.debug("periodic action data: \n%s", data)
76         queue.put({'periodic-action-data': data})
77
78
79 class Runner(object):
80     runners = []
81
82     @staticmethod
83     def get_cls(runner_type):
84         """return class of specified type"""
85         for runner in utils.itersubclasses(Runner):
86             if runner_type == runner.__execution_type__:
87                 return runner
88         raise RuntimeError("No such runner_type %s" % runner_type)
89
90     @staticmethod
91     def get_types():
92         """return a list of known runner type (class) names"""
93         types = []
94         for runner in utils.itersubclasses(Runner):
95             types.append(runner)
96         return types
97
98     @staticmethod
99     def get(runner_cfg):
100         """Returns instance of a scenario runner for execution type.
101         """
102         return Runner.get_cls(runner_cfg["type"])(runner_cfg)
103
104     @staticmethod
105     def release(runner):
106         """Release the runner"""
107         if runner in Runner.runners:
108             Runner.runners.remove(runner)
109
110     @staticmethod
111     def terminate(runner):
112         """Terminate the runner"""
113         if runner.process and runner.process.is_alive():
114             runner.process.terminate()
115
116     @staticmethod
117     def terminate_all():
118         """Terminate all runners (subprocesses)"""
119         log.debug("Terminating all runners")
120
121         # release dumper process as some errors before any runner is created
122         if not Runner.runners:
123             return
124
125         for runner in Runner.runners:
126             log.debug("Terminating runner: %s", runner)
127             if runner.process:
128                 runner.process.terminate()
129                 runner.process.join()
130             if runner.periodic_action_process:
131                 log.debug("Terminating periodic action process")
132                 runner.periodic_action_process.terminate()
133                 runner.periodic_action_process = None
134             Runner.release(runner)
135
136     def __init__(self, config):
137         self.config = config
138         self.periodic_action_process = None
139         self.output_queue = multiprocessing.Queue()
140         self.result_queue = multiprocessing.Queue()
141         self.process = None
142         self.aborted = multiprocessing.Event()
143         Runner.runners.append(self)
144
145     def run_post_stop_action(self):
146         """run a potentially configured post-stop action"""
147         if "post-stop-action" in self.config:
148             command = self.config["post-stop-action"]["command"]
149             log.debug("post stop action: command: '%s'", command)
150             ret_code, data = _execute_shell_command(command)
151             if ret_code < 0:
152                 log.error("post action error! command:%s", command)
153                 self.result_queue.put({'post-stop-action-data': data})
154                 return
155             log.debug("post-stop data: \n%s", data)
156             self.result_queue.put({'post-stop-action-data': data})
157
158     def _run_benchmark(self, cls, method_name, scenario_cfg, context_cfg):
159         raise NotImplementedError
160
161     def run(self, scenario_cfg, context_cfg):
162         scenario_type = scenario_cfg["type"]
163         class_name = base_scenario.Scenario.get(scenario_type)
164         path_split = class_name.split(".")
165         module_path = ".".join(path_split[:-1])
166         module = importlib.import_module(module_path)
167         cls = getattr(module, path_split[-1])
168
169         self.config['object'] = class_name
170         self.aborted.clear()
171
172         # run a potentially configured pre-start action
173         if "pre-start-action" in self.config:
174             command = self.config["pre-start-action"]["command"]
175             log.debug("pre start action: command: '%s'", command)
176             ret_code, data = _execute_shell_command(command)
177             if ret_code < 0:
178                 log.error("pre-start action error! command:%s", command)
179                 self.result_queue.put({'pre-start-action-data': data})
180                 return
181             log.debug("pre-start data: \n%s", data)
182             self.result_queue.put({'pre-start-action-data': data})
183
184         if "single-shot-action" in self.config:
185             single_action_process = multiprocessing.Process(
186                 target=_single_action,
187                 name="single-shot-action",
188                 args=(self.config["single-shot-action"]["after"],
189                       self.config["single-shot-action"]["command"],
190                       self.result_queue))
191             single_action_process.start()
192
193         if "periodic-action" in self.config:
194             self.periodic_action_process = multiprocessing.Process(
195                 target=_periodic_action,
196                 name="periodic-action",
197                 args=(self.config["periodic-action"]["interval"],
198                       self.config["periodic-action"]["command"],
199                       self.result_queue))
200             self.periodic_action_process.start()
201
202         self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
203
204     def abort(self):
205         """Abort the execution of a scenario"""
206         self.aborted.set()
207
208     def join(self, timeout=None):
209         self.process.join(timeout)
210         if self.periodic_action_process:
211             self.periodic_action_process.terminate()
212             self.periodic_action_process = None
213
214         self.run_post_stop_action()
215         return self.process.exitcode
216
217     def get_output(self):
218         result = {}
219         while not self.output_queue.empty():
220             result.update(self.output_queue.get())
221         return result
222
223     def get_result(self):
224         result = []
225         while not self.result_queue.empty():
226             result.append(self.result_queue.get())
227         return result