Merge "Update plotter.py to new yardstick.out format"
[yardstick.git] / yardstick / benchmark / runners / base.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/base.py
18
19 import importlib
20 import logging
21 import multiprocessing
22 import subprocess
23 import time
24 import traceback
25
26 from oslo_config import cfg
27
28 import yardstick.common.utils as utils
29 from yardstick.benchmark.scenarios import base as base_scenario
30 from yardstick.dispatcher.base import Base as DispatcherBase
31
32 log = logging.getLogger(__name__)
33
34 CONF = cfg.CONF
35
36
37 def _output_serializer_main(filename, queue):
38     '''entrypoint for the singleton subprocess writing to outfile
39     Use of this process enables multiple instances of a scenario without
40     messing up the output file.
41     '''
42     config = {}
43     config["type"] = CONF.dispatcher.capitalize()
44     config["file_path"] = filename
45     dispatcher = DispatcherBase.get(config)
46
47     while True:
48         # blocks until data becomes available
49         record = queue.get()
50         if record == '_TERMINATE_':
51             dispatcher.flush_result_data()
52             break
53         else:
54             dispatcher.record_result_data(record)
55
56
57 def _execute_shell_command(command):
58     '''execute shell script with error handling'''
59     exitcode = 0
60     output = []
61     try:
62         output = subprocess.check_output(command, shell=True)
63     except Exception:
64         exitcode = -1
65         output = traceback.format_exc()
66         log.error("exec command '%s' error:\n " % command)
67         log.error(traceback.format_exc())
68
69     return exitcode, output
70
71
72 def _single_action(seconds, command, queue):
73     '''entrypoint for the single action process'''
74     log.debug("single action, fires after %d seconds (from now)", seconds)
75     time.sleep(seconds)
76     log.debug("single action: executing command: '%s'", command)
77     ret_code, data = _execute_shell_command(command)
78     if ret_code < 0:
79         log.error("single action error! command:%s" % command)
80         queue.put({'single-action-data': data})
81         return
82     log.debug("single action data: \n%s" % data)
83     queue.put({'single-action-data': data})
84
85
86 def _periodic_action(interval, command, queue):
87     '''entrypoint for the periodic action process'''
88     log.debug("periodic action, fires every: %d seconds", interval)
89     time_spent = 0
90     while True:
91         time.sleep(interval)
92         time_spent += interval
93         log.debug("periodic action, executing command: '%s'", command)
94         ret_code, data = _execute_shell_command(command)
95         if ret_code < 0:
96             log.error("periodic action error! command:%s", command)
97             queue.put({'periodic-action-data': data})
98             break
99         log.debug("periodic action data: \n%s" % data)
100         queue.put({'periodic-action-data': data})
101
102
103 class Runner(object):
104     queue = None
105     dump_process = None
106     runners = []
107
108     @staticmethod
109     def get_cls(runner_type):
110         '''return class of specified type'''
111         for runner in utils.itersubclasses(Runner):
112             if runner_type == runner.__execution_type__:
113                 return runner
114         raise RuntimeError("No such runner_type %s" % runner_type)
115
116     @staticmethod
117     def get_types():
118         '''return a list of known runner type (class) names'''
119         types = []
120         for runner in utils.itersubclasses(Runner):
121             types.append(runner)
122         return types
123
124     @staticmethod
125     def get(config):
126         """Returns instance of a scenario runner for execution type.
127         """
128         # if there is no runner, start the output serializer subprocess
129         if len(Runner.runners) == 0:
130             log.debug("Starting dump process file '%s'" %
131                       config["output_filename"])
132             Runner.queue = multiprocessing.Queue()
133             Runner.dump_process = multiprocessing.Process(
134                 target=_output_serializer_main,
135                 name="Dumper",
136                 args=(config["output_filename"], Runner.queue))
137             Runner.dump_process.start()
138
139         return Runner.get_cls(config["type"])(config, Runner.queue)
140
141     @staticmethod
142     def release_dump_process():
143         '''Release the dumper process'''
144         log.debug("Stopping dump process")
145         if Runner.dump_process:
146             Runner.queue.put('_TERMINATE_')
147             Runner.dump_process.join()
148             Runner.dump_process = None
149
150     @staticmethod
151     def release(runner):
152         '''Release the runner'''
153         if runner in Runner.runners:
154             Runner.runners.remove(runner)
155
156         # if this was the last runner, stop the output serializer subprocess
157         if len(Runner.runners) == 0:
158             Runner.release_dump_process()
159
160     @staticmethod
161     def terminate(runner):
162         '''Terminate the runner'''
163         if runner.process and runner.process.is_alive():
164             runner.process.terminate()
165
166     @staticmethod
167     def terminate_all():
168         '''Terminate all runners (subprocesses)'''
169         log.debug("Terminating all runners")
170
171         # release dumper process as some errors before any runner is created
172         if len(Runner.runners) == 0:
173             Runner.release_dump_process()
174             return
175
176         for runner in Runner.runners:
177             log.debug("Terminating runner: %s", runner)
178             if runner.process:
179                 runner.process.terminate()
180                 runner.process.join()
181             if runner.periodic_action_process:
182                 log.debug("Terminating periodic action process")
183                 runner.periodic_action_process.terminate()
184                 runner.periodic_action_process = None
185             Runner.release(runner)
186
187     def __init__(self, config, queue):
188         self.config = config
189         self.periodic_action_process = None
190         self.result_queue = queue
191         self.process = None
192         self.aborted = multiprocessing.Event()
193         Runner.runners.append(self)
194
195     def run_post_stop_action(self):
196         '''run a potentially configured post-stop action'''
197         if "post-stop-action" in self.config:
198             command = self.config["post-stop-action"]["command"]
199             log.debug("post stop action: command: '%s'" % command)
200             ret_code, data = _execute_shell_command(command)
201             if ret_code < 0:
202                 log.error("post action error! command:%s", command)
203                 self.result_queue.put({'post-stop-action-data': data})
204                 return
205             log.debug("post-stop data: \n%s" % data)
206             self.result_queue.put({'post-stop-action-data': data})
207
208     def run(self, scenario_cfg, context_cfg):
209         scenario_type = scenario_cfg["type"]
210         class_name = base_scenario.Scenario.get(scenario_type)
211         path_split = class_name.split(".")
212         module_path = ".".join(path_split[:-1])
213         module = importlib.import_module(module_path)
214         cls = getattr(module, path_split[-1])
215
216         self.config['object'] = class_name
217         self.aborted.clear()
218
219         # run a potentially configured pre-start action
220         if "pre-start-action" in self.config:
221             command = self.config["pre-start-action"]["command"]
222             log.debug("pre start action: command: '%s'" % command)
223             ret_code, data = _execute_shell_command(command)
224             if ret_code < 0:
225                 log.error("pre-start action error! command:%s", command)
226                 self.result_queue.put({'pre-start-action-data': data})
227                 return
228             log.debug("pre-start data: \n%s" % data)
229             self.result_queue.put({'pre-start-action-data': data})
230
231         if "single-shot-action" in self.config:
232             single_action_process = multiprocessing.Process(
233                 target=_single_action,
234                 name="single-shot-action",
235                 args=(self.config["single-shot-action"]["after"],
236                       self.config["single-shot-action"]["command"],
237                       self.result_queue))
238             single_action_process.start()
239
240         if "periodic-action" in self.config:
241             self.periodic_action_process = multiprocessing.Process(
242                 target=_periodic_action,
243                 name="periodic-action",
244                 args=(self.config["periodic-action"]["interval"],
245                       self.config["periodic-action"]["command"],
246                       self.result_queue))
247             self.periodic_action_process.start()
248
249         self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
250
251     def abort(self):
252         '''Abort the execution of a scenario'''
253         self.aborted.set()
254
255     def join(self, timeout=None):
256         self.process.join(timeout)
257         if self.periodic_action_process:
258             self.periodic_action_process.terminate()
259             self.periodic_action_process = None
260
261         self.run_post_stop_action()
262         return self.process.exitcode