57903ebb9949be4dbe15e4729269ace2eabe2852
[yardstick.git] / yardstick / benchmark / runners / base.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/base.py
18
19 from __future__ import absolute_import
20 import importlib
21 import logging
22 import multiprocessing
23 import subprocess
24 import time
25 import traceback
26
27 import yardstick.common.utils as utils
28 from yardstick.benchmark.scenarios import base as base_scenario
29
30 log = logging.getLogger(__name__)
31
32
33 def _execute_shell_command(command):
34     """execute shell script with error handling"""
35     exitcode = 0
36     output = []
37     try:
38         output = subprocess.check_output(command, shell=True)
39     except Exception:
40         exitcode = -1
41         output = traceback.format_exc()
42         log.error("exec command '%s' error:\n ", command)
43         log.error(traceback.format_exc())
44
45     return exitcode, output
46
47
48 def _single_action(seconds, command, queue):
49     """entrypoint for the single action process"""
50     queue.cancel_join_thread()
51     log.debug("single action, fires after %d seconds (from now)", seconds)
52     time.sleep(seconds)
53     log.debug("single action: executing command: '%s'", command)
54     ret_code, data = _execute_shell_command(command)
55     if ret_code < 0:
56         log.error("single action error! command:%s", command)
57         queue.put({'single-action-data': data})
58         return
59     log.debug("single action data: \n%s", data)
60     queue.put({'single-action-data': data})
61
62
63 def _periodic_action(interval, command, queue):
64     """entrypoint for the periodic action process"""
65     queue.cancel_join_thread()
66     log.debug("periodic action, fires every: %d seconds", interval)
67     time_spent = 0
68     while True:
69         time.sleep(interval)
70         time_spent += interval
71         log.debug("periodic action, executing command: '%s'", command)
72         ret_code, data = _execute_shell_command(command)
73         if ret_code < 0:
74             log.error("periodic action error! command:%s", command)
75             queue.put({'periodic-action-data': data})
76             break
77         log.debug("periodic action data: \n%s", data)
78         queue.put({'periodic-action-data': data})
79
80
81 class Runner(object):
82     runners = []
83
84     @staticmethod
85     def get_cls(runner_type):
86         """return class of specified type"""
87         for runner in utils.itersubclasses(Runner):
88             if runner_type == runner.__execution_type__:
89                 return runner
90         raise RuntimeError("No such runner_type %s" % runner_type)
91
92     @staticmethod
93     def get_types():
94         """return a list of known runner type (class) names"""
95         types = []
96         for runner in utils.itersubclasses(Runner):
97             types.append(runner)
98         return types
99
100     @staticmethod
101     def get(runner_cfg):
102         """Returns instance of a scenario runner for execution type.
103         """
104         return Runner.get_cls(runner_cfg["type"])(runner_cfg)
105
106     @staticmethod
107     def release(runner):
108         """Release the runner"""
109         if runner in Runner.runners:
110             Runner.runners.remove(runner)
111
112     @staticmethod
113     def terminate(runner):
114         """Terminate the runner"""
115         if runner.process and runner.process.is_alive():
116             runner.process.terminate()
117
118     @staticmethod
119     def terminate_all():
120         """Terminate all runners (subprocesses)"""
121         log.debug("Terminating all runners")
122
123         # release dumper process as some errors before any runner is created
124         if not Runner.runners:
125             return
126
127         for runner in Runner.runners:
128             log.debug("Terminating runner: %s", runner)
129             if runner.process:
130                 runner.process.terminate()
131                 runner.process.join()
132             if runner.periodic_action_process:
133                 log.debug("Terminating periodic action process")
134                 runner.periodic_action_process.terminate()
135                 runner.periodic_action_process = None
136             Runner.release(runner)
137
138     def __init__(self, config):
139         self.config = config
140         self.periodic_action_process = None
141         self.output_queue = multiprocessing.Queue()
142         self.output_queue.cancel_join_thread()
143         self.result_queue = multiprocessing.Queue()
144         self.result_queue.cancel_join_thread()
145         self.process = None
146         self.aborted = multiprocessing.Event()
147         Runner.runners.append(self)
148
149     def run_post_stop_action(self):
150         """run a potentially configured post-stop action"""
151         if "post-stop-action" in self.config:
152             command = self.config["post-stop-action"]["command"]
153             log.debug("post stop action: command: '%s'", command)
154             ret_code, data = _execute_shell_command(command)
155             if ret_code < 0:
156                 log.error("post action error! command:%s", command)
157                 self.result_queue.put({'post-stop-action-data': data})
158                 return
159             log.debug("post-stop data: \n%s", data)
160             self.result_queue.put({'post-stop-action-data': data})
161
162     def _run_benchmark(self, cls, method_name, scenario_cfg, context_cfg):
163         raise NotImplementedError
164
165     def run(self, scenario_cfg, context_cfg):
166         scenario_type = scenario_cfg["type"]
167         class_name = base_scenario.Scenario.get(scenario_type)
168         path_split = class_name.split(".")
169         module_path = ".".join(path_split[:-1])
170         module = importlib.import_module(module_path)
171         cls = getattr(module, path_split[-1])
172
173         self.config['object'] = class_name
174         self.aborted.clear()
175
176         # run a potentially configured pre-start action
177         if "pre-start-action" in self.config:
178             command = self.config["pre-start-action"]["command"]
179             log.debug("pre start action: command: '%s'", command)
180             ret_code, data = _execute_shell_command(command)
181             if ret_code < 0:
182                 log.error("pre-start action error! command:%s", command)
183                 self.result_queue.put({'pre-start-action-data': data})
184                 return
185             log.debug("pre-start data: \n%s", data)
186             self.result_queue.put({'pre-start-action-data': data})
187
188         if "single-shot-action" in self.config:
189             single_action_process = multiprocessing.Process(
190                 target=_single_action,
191                 name="single-shot-action",
192                 args=(self.config["single-shot-action"]["after"],
193                       self.config["single-shot-action"]["command"],
194                       self.result_queue))
195             single_action_process.start()
196
197         if "periodic-action" in self.config:
198             self.periodic_action_process = multiprocessing.Process(
199                 target=_periodic_action,
200                 name="periodic-action",
201                 args=(self.config["periodic-action"]["interval"],
202                       self.config["periodic-action"]["command"],
203                       self.result_queue))
204             self.periodic_action_process.start()
205
206         self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
207
208     def abort(self):
209         """Abort the execution of a scenario"""
210         self.aborted.set()
211
212     def join(self, timeout=None):
213         self.process.join(timeout)
214         if self.periodic_action_process:
215             self.periodic_action_process.terminate()
216             self.periodic_action_process = None
217
218         self.run_post_stop_action()
219         return self.process.exitcode
220
221     def get_output(self):
222         result = {}
223         while not self.output_queue.empty():
224             result.update(self.output_queue.get())
225         return result
226
227     def get_result(self):
228         result = []
229         while not self.result_queue.empty():
230             result.append(self.result_queue.get())
231         return result