Set TEST_DB_URL for storperf
[yardstick.git] / yardstick / benchmark / runners / base.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/base.py
18
19 from __future__ import absolute_import
20 import importlib
21 import logging
22 import multiprocessing
23 import subprocess
24 import time
25 import traceback
26
27 import yardstick.common.utils as utils
28 from yardstick.benchmark.scenarios import base as base_scenario
29
30 log = logging.getLogger(__name__)
31
32
33 def _execute_shell_command(command):
34     """execute shell script with error handling"""
35     exitcode = 0
36     output = []
37     try:
38         output = subprocess.check_output(command, shell=True)
39     except Exception:
40         exitcode = -1
41         output = traceback.format_exc()
42         log.error("exec command '%s' error:\n ", command)
43         log.error(traceback.format_exc())
44
45     return exitcode, output
46
47
48 def _single_action(seconds, command, queue):
49     """entrypoint for the single action process"""
50     log.debug("single action, fires after %d seconds (from now)", seconds)
51     time.sleep(seconds)
52     log.debug("single action: executing command: '%s'", command)
53     ret_code, data = _execute_shell_command(command)
54     if ret_code < 0:
55         log.error("single action error! command:%s", command)
56         queue.put({'single-action-data': data})
57         return
58     log.debug("single action data: \n%s", data)
59     queue.put({'single-action-data': data})
60
61
62 def _periodic_action(interval, command, queue):
63     """entrypoint for the periodic action process"""
64     log.debug("periodic action, fires every: %d seconds", interval)
65     time_spent = 0
66     while True:
67         time.sleep(interval)
68         time_spent += interval
69         log.debug("periodic action, executing command: '%s'", command)
70         ret_code, data = _execute_shell_command(command)
71         if ret_code < 0:
72             log.error("periodic action error! command:%s", command)
73             queue.put({'periodic-action-data': data})
74             break
75         log.debug("periodic action data: \n%s", data)
76         queue.put({'periodic-action-data': data})
77
78
79 class Runner(object):
80     runners = []
81
82     @staticmethod
83     def get_cls(runner_type):
84         """return class of specified type"""
85         for runner in utils.itersubclasses(Runner):
86             if runner_type == runner.__execution_type__:
87                 return runner
88         raise RuntimeError("No such runner_type %s" % runner_type)
89
90     @staticmethod
91     def get_types():
92         """return a list of known runner type (class) names"""
93         types = []
94         for runner in utils.itersubclasses(Runner):
95             types.append(runner)
96         return types
97
98     @staticmethod
99     def get(runner_cfg):
100         """Returns instance of a scenario runner for execution type.
101         """
102         return Runner.get_cls(runner_cfg["type"])(runner_cfg)
103
104     @staticmethod
105     def release(runner):
106         """Release the runner"""
107         if runner in Runner.runners:
108             Runner.runners.remove(runner)
109
110     @staticmethod
111     def terminate(runner):
112         """Terminate the runner"""
113         if runner.process and runner.process.is_alive():
114             runner.process.terminate()
115
116     @staticmethod
117     def terminate_all():
118         """Terminate all runners (subprocesses)"""
119         log.debug("Terminating all runners")
120
121         # release dumper process as some errors before any runner is created
122         if not Runner.runners:
123             return
124
125         for runner in Runner.runners:
126             log.debug("Terminating runner: %s", runner)
127             if runner.process:
128                 runner.process.terminate()
129                 runner.process.join()
130             if runner.periodic_action_process:
131                 log.debug("Terminating periodic action process")
132                 runner.periodic_action_process.terminate()
133                 runner.periodic_action_process = None
134             Runner.release(runner)
135
136     def __init__(self, config):
137         self.config = config
138         self.periodic_action_process = None
139         self.output_queue = multiprocessing.Queue()
140         self.result_queue = multiprocessing.Queue()
141         self.process = None
142         self.aborted = multiprocessing.Event()
143         Runner.runners.append(self)
144
145     def run_post_stop_action(self):
146         """run a potentially configured post-stop action"""
147         if "post-stop-action" in self.config:
148             command = self.config["post-stop-action"]["command"]
149             log.debug("post stop action: command: '%s'", command)
150             ret_code, data = _execute_shell_command(command)
151             if ret_code < 0:
152                 log.error("post action error! command:%s", command)
153                 self.result_queue.put({'post-stop-action-data': data})
154                 return
155             log.debug("post-stop data: \n%s", data)
156             self.result_queue.put({'post-stop-action-data': data})
157
158     def run(self, scenario_cfg, context_cfg):
159         scenario_type = scenario_cfg["type"]
160         class_name = base_scenario.Scenario.get(scenario_type)
161         path_split = class_name.split(".")
162         module_path = ".".join(path_split[:-1])
163         module = importlib.import_module(module_path)
164         cls = getattr(module, path_split[-1])
165
166         self.config['object'] = class_name
167         self.aborted.clear()
168
169         # run a potentially configured pre-start action
170         if "pre-start-action" in self.config:
171             command = self.config["pre-start-action"]["command"]
172             log.debug("pre start action: command: '%s'", command)
173             ret_code, data = _execute_shell_command(command)
174             if ret_code < 0:
175                 log.error("pre-start action error! command:%s", command)
176                 self.result_queue.put({'pre-start-action-data': data})
177                 return
178             log.debug("pre-start data: \n%s", data)
179             self.result_queue.put({'pre-start-action-data': data})
180
181         if "single-shot-action" in self.config:
182             single_action_process = multiprocessing.Process(
183                 target=_single_action,
184                 name="single-shot-action",
185                 args=(self.config["single-shot-action"]["after"],
186                       self.config["single-shot-action"]["command"],
187                       self.result_queue))
188             single_action_process.start()
189
190         if "periodic-action" in self.config:
191             self.periodic_action_process = multiprocessing.Process(
192                 target=_periodic_action,
193                 name="periodic-action",
194                 args=(self.config["periodic-action"]["interval"],
195                       self.config["periodic-action"]["command"],
196                       self.result_queue))
197             self.periodic_action_process.start()
198
199         self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
200
201     def abort(self):
202         """Abort the execution of a scenario"""
203         self.aborted.set()
204
205     def join(self, timeout=None):
206         self.process.join(timeout)
207         if self.periodic_action_process:
208             self.periodic_action_process.terminate()
209             self.periodic_action_process = None
210
211         self.run_post_stop_action()
212         return self.process.exitcode
213
214     def get_output(self):
215         result = {}
216         while not self.output_queue.empty():
217             result.update(self.output_queue.get())
218         return result
219
220     def get_result(self):
221         result = []
222         while not self.result_queue.empty():
223             result.append(self.result_queue.get())
224         return result