Merge "Bugfix: KeyError when using http dispatcher"
[yardstick.git] / yardstick / benchmark / runners / base.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/base.py
18
19 from __future__ import absolute_import
20 import importlib
21 import logging
22 import multiprocessing
23 import subprocess
24 import time
25 import os
26 import traceback
27
28 from oslo_config import cfg
29
30 import yardstick.common.utils as utils
31 from yardstick.benchmark.scenarios import base as base_scenario
32 from yardstick.dispatcher.base import Base as DispatcherBase
33
34 log = logging.getLogger(__name__)
35
36 CONF = cfg.CONF
37
38
39 def _output_serializer_main(filename, queue, config):
40     """entrypoint for the singleton subprocess writing to outfile
41     Use of this process enables multiple instances of a scenario without
42     messing up the output file.
43     """
44     try:
45         out_type = config['yardstick'].get('DEFAULT', {})['dispatcher']
46     except KeyError:
47         out_type = os.environ.get('DISPATCHER', 'file')
48
49     conf = {
50         'type': out_type.capitalize(),
51         'file_path': filename
52     }
53
54     dispatcher = DispatcherBase.get(conf, config)
55
56     while True:
57         # blocks until data becomes available
58         record = queue.get()
59         if record == '_TERMINATE_':
60             dispatcher.flush_result_data()
61             break
62         else:
63             dispatcher.record_result_data(record)
64
65
66 def _execute_shell_command(command):
67     """execute shell script with error handling"""
68     exitcode = 0
69     output = []
70     try:
71         output = subprocess.check_output(command, shell=True)
72     except Exception:
73         exitcode = -1
74         output = traceback.format_exc()
75         log.error("exec command '%s' error:\n ", command)
76         log.error(traceback.format_exc())
77
78     return exitcode, output
79
80
81 def _single_action(seconds, command, queue):
82     """entrypoint for the single action process"""
83     log.debug("single action, fires after %d seconds (from now)", seconds)
84     time.sleep(seconds)
85     log.debug("single action: executing command: '%s'", command)
86     ret_code, data = _execute_shell_command(command)
87     if ret_code < 0:
88         log.error("single action error! command:%s", command)
89         queue.put({'single-action-data': data})
90         return
91     log.debug("single action data: \n%s", data)
92     queue.put({'single-action-data': data})
93
94
95 def _periodic_action(interval, command, queue):
96     """entrypoint for the periodic action process"""
97     log.debug("periodic action, fires every: %d seconds", interval)
98     time_spent = 0
99     while True:
100         time.sleep(interval)
101         time_spent += interval
102         log.debug("periodic action, executing command: '%s'", command)
103         ret_code, data = _execute_shell_command(command)
104         if ret_code < 0:
105             log.error("periodic action error! command:%s", command)
106             queue.put({'periodic-action-data': data})
107             break
108         log.debug("periodic action data: \n%s", data)
109         queue.put({'periodic-action-data': data})
110
111
112 class Runner(object):
113     queue = None
114     dump_process = None
115     runners = []
116
117     @staticmethod
118     def get_cls(runner_type):
119         """return class of specified type"""
120         for runner in utils.itersubclasses(Runner):
121             if runner_type == runner.__execution_type__:
122                 return runner
123         raise RuntimeError("No such runner_type %s" % runner_type)
124
125     @staticmethod
126     def get_types():
127         """return a list of known runner type (class) names"""
128         types = []
129         for runner in utils.itersubclasses(Runner):
130             types.append(runner)
131         return types
132
133     @staticmethod
134     def get(runner_cfg, config):
135         """Returns instance of a scenario runner for execution type.
136         """
137         # if there is no runner, start the output serializer subprocess
138         if not Runner.runners:
139             log.debug("Starting dump process file '%s'",
140                       runner_cfg["output_filename"])
141             Runner.queue = multiprocessing.Queue()
142             Runner.dump_process = multiprocessing.Process(
143                 target=_output_serializer_main,
144                 name="Dumper",
145                 args=(runner_cfg["output_filename"], Runner.queue, config))
146             Runner.dump_process.start()
147
148         return Runner.get_cls(runner_cfg["type"])(runner_cfg, Runner.queue)
149
150     @staticmethod
151     def release_dump_process():
152         """Release the dumper process"""
153         log.debug("Stopping dump process")
154         if Runner.dump_process:
155             Runner.queue.put('_TERMINATE_')
156             Runner.dump_process.join()
157             Runner.dump_process = None
158
159     @staticmethod
160     def release(runner):
161         """Release the runner"""
162         if runner in Runner.runners:
163             Runner.runners.remove(runner)
164
165         # if this was the last runner, stop the output serializer subprocess
166         if not Runner.runners:
167             Runner.release_dump_process()
168
169     @staticmethod
170     def terminate(runner):
171         """Terminate the runner"""
172         if runner.process and runner.process.is_alive():
173             runner.process.terminate()
174
175     @staticmethod
176     def terminate_all():
177         """Terminate all runners (subprocesses)"""
178         log.debug("Terminating all runners")
179
180         # release dumper process as some errors before any runner is created
181         if not Runner.runners:
182             Runner.release_dump_process()
183             return
184
185         for runner in Runner.runners:
186             log.debug("Terminating runner: %s", runner)
187             if runner.process:
188                 runner.process.terminate()
189                 runner.process.join()
190             if runner.periodic_action_process:
191                 log.debug("Terminating periodic action process")
192                 runner.periodic_action_process.terminate()
193                 runner.periodic_action_process = None
194             Runner.release(runner)
195
196     def __init__(self, config, queue):
197         self.config = config
198         self.periodic_action_process = None
199         self.result_queue = queue
200         self.process = None
201         self.aborted = multiprocessing.Event()
202         Runner.runners.append(self)
203
204     def run_post_stop_action(self):
205         """run a potentially configured post-stop action"""
206         if "post-stop-action" in self.config:
207             command = self.config["post-stop-action"]["command"]
208             log.debug("post stop action: command: '%s'", command)
209             ret_code, data = _execute_shell_command(command)
210             if ret_code < 0:
211                 log.error("post action error! command:%s", command)
212                 self.result_queue.put({'post-stop-action-data': data})
213                 return
214             log.debug("post-stop data: \n%s", data)
215             self.result_queue.put({'post-stop-action-data': data})
216
217     def run(self, scenario_cfg, context_cfg):
218         scenario_type = scenario_cfg["type"]
219         class_name = base_scenario.Scenario.get(scenario_type)
220         path_split = class_name.split(".")
221         module_path = ".".join(path_split[:-1])
222         module = importlib.import_module(module_path)
223         cls = getattr(module, path_split[-1])
224
225         self.config['object'] = class_name
226         self.aborted.clear()
227
228         # run a potentially configured pre-start action
229         if "pre-start-action" in self.config:
230             command = self.config["pre-start-action"]["command"]
231             log.debug("pre start action: command: '%s'", command)
232             ret_code, data = _execute_shell_command(command)
233             if ret_code < 0:
234                 log.error("pre-start action error! command:%s", command)
235                 self.result_queue.put({'pre-start-action-data': data})
236                 return
237             log.debug("pre-start data: \n%s", data)
238             self.result_queue.put({'pre-start-action-data': data})
239
240         if "single-shot-action" in self.config:
241             single_action_process = multiprocessing.Process(
242                 target=_single_action,
243                 name="single-shot-action",
244                 args=(self.config["single-shot-action"]["after"],
245                       self.config["single-shot-action"]["command"],
246                       self.result_queue))
247             single_action_process.start()
248
249         if "periodic-action" in self.config:
250             self.periodic_action_process = multiprocessing.Process(
251                 target=_periodic_action,
252                 name="periodic-action",
253                 args=(self.config["periodic-action"]["interval"],
254                       self.config["periodic-action"]["command"],
255                       self.result_queue))
256             self.periodic_action_process.start()
257
258         self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
259
260     def abort(self):
261         """Abort the execution of a scenario"""
262         self.aborted.set()
263
264     def join(self, timeout=None):
265         self.process.join(timeout)
266         if self.periodic_action_process:
267             self.periodic_action_process.terminate()
268             self.periodic_action_process = None
269
270         self.run_post_stop_action()
271         return self.process.exitcode