add error handling when given failing script calls
[yardstick.git] / yardstick / benchmark / runners / base.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import importlib
11 import logging
12 import multiprocessing
13 import subprocess
14 import time
15 import traceback
16
17 log = logging.getLogger(__name__)
18
19 from oslo_config import cfg
20
21 import yardstick.common.utils as utils
22 from yardstick.benchmark.scenarios import base as base_scenario
23 from yardstick.dispatcher.base import Base as DispatcherBase
24
25 CONF = cfg.CONF
26
27
28 def _output_serializer_main(filename, queue):
29     '''entrypoint for the singleton subprocess writing to outfile
30     Use of this process enables multiple instances of a scenario without
31     messing up the output file.
32     '''
33     config = {}
34     config["type"] = CONF.dispatcher.capitalize()
35     config["file_path"] = filename
36     dispatcher = DispatcherBase.get(config)
37
38     while True:
39         # blocks until data becomes available
40         record = queue.get()
41         if record == '_TERMINATE_':
42             break
43         else:
44             dispatcher.record_result_data(record)
45
46
47 def _execute_shell_command(command):
48     '''execute shell script with error handling'''
49     exitcode = 0
50     output = []
51     try:
52         output = subprocess.check_output(command, shell=True)
53     except Exception:
54         exitcode = -1
55         output = traceback.format_exc()
56         log.error("exec command '%s' error:\n " % command)
57         log.error(traceback.format_exc())
58
59     return exitcode, output
60
61
62 def _single_action(seconds, command, queue):
63     '''entrypoint for the single action process'''
64     log.debug("single action, fires after %d seconds (from now)", seconds)
65     time.sleep(seconds)
66     log.debug("single action: executing command: '%s'", command)
67     ret_code, data = _execute_shell_command(command)
68     if ret_code < 0:
69         log.error("single action error! command:%s" % command)
70         queue.put({'single-action-data': data})
71         return
72     log.debug("single action data: \n%s" % data)
73     queue.put({'single-action-data': data})
74
75
76 def _periodic_action(interval, command, queue):
77     '''entrypoint for the periodic action process'''
78     log.debug("periodic action, fires every: %d seconds", interval)
79     time_spent = 0
80     while True:
81         time.sleep(interval)
82         time_spent += interval
83         log.debug("periodic action, executing command: '%s'", command)
84         ret_code, data = _execute_shell_command(command)
85         if ret_code < 0:
86             log.error("periodic action error! command:%s", command)
87             queue.put({'periodic-action-data': data})
88             break
89         log.debug("periodic action data: \n%s" % data)
90         queue.put({'periodic-action-data': data})
91
92
93 class Runner(object):
94     queue = None
95     dump_process = None
96     runners = []
97
98     @staticmethod
99     def get_cls(runner_type):
100         '''return class of specified type'''
101         for runner in utils.itersubclasses(Runner):
102             if runner_type == runner.__execution_type__:
103                 return runner
104         raise RuntimeError("No such runner_type %s" % runner_type)
105
106     @staticmethod
107     def get_types():
108         '''return a list of known runner type (class) names'''
109         types = []
110         for runner in utils.itersubclasses(Runner):
111             types.append(runner)
112         return types
113
114     @staticmethod
115     def get(config):
116         """Returns instance of a scenario runner for execution type.
117         """
118         # if there is no runner, start the output serializer subprocess
119         if len(Runner.runners) == 0:
120             log.debug("Starting dump process file '%s'" %
121                       config["output_filename"])
122             Runner.queue = multiprocessing.Queue()
123             Runner.dump_process = multiprocessing.Process(
124                 target=_output_serializer_main,
125                 name="Dumper",
126                 args=(config["output_filename"], Runner.queue))
127             Runner.dump_process.start()
128
129         return Runner.get_cls(config["type"])(config, Runner.queue)
130
131     @staticmethod
132     def release_dump_process():
133         '''Release the dumper process'''
134         log.debug("Stopping dump process")
135         if Runner.dump_process:
136             Runner.queue.put('_TERMINATE_')
137             Runner.dump_process.join()
138             Runner.dump_process = None
139
140     @staticmethod
141     def release(runner):
142         '''Release the runner'''
143         Runner.runners.remove(runner)
144
145         # if this was the last runner, stop the output serializer subprocess
146         if len(Runner.runners) == 0:
147             Runner.release_dump_process()
148
149     @staticmethod
150     def terminate_all():
151         '''Terminate all runners (subprocesses)'''
152         log.debug("Terminating all runners")
153
154         # release dumper process as some errors before any runner is created
155         if len(Runner.runners) == 0:
156             Runner.release_dump_process()
157             return
158
159         for runner in Runner.runners:
160             log.debug("Terminating runner: %s", runner)
161             if runner.process:
162                 runner.process.terminate()
163                 runner.process.join()
164             if runner.periodic_action_process:
165                 log.debug("Terminating periodic action process")
166                 runner.periodic_action_process.terminate()
167                 runner.periodic_action_process = None
168             Runner.release(runner)
169
170     def __init__(self, config, queue):
171         self.context = {}
172         self.config = config
173         self.periodic_action_process = None
174         self.result_queue = queue
175         self.process = None
176         Runner.runners.append(self)
177
178     def run_post_stop_action(self):
179         '''run a potentially configured post-stop action'''
180         if "post-stop-action" in self.config:
181             command = self.config["post-stop-action"]["command"]
182             log.debug("post stop action: command: '%s'" % command)
183             ret_code, data = _execute_shell_command(command)
184             if ret_code < 0:
185                 log.error("post action error! command:%s", command)
186                 self.result_queue.put({'post-stop-action-data': data})
187                 return
188             log.debug("post-stop data: \n%s" % data)
189             self.result_queue.put({'post-stop-action-data': data})
190
191     def run(self, scenario_type, scenario_cfg):
192         class_name = base_scenario.Scenario.get(scenario_type)
193         path_split = class_name.split(".")
194         module_path = ".".join(path_split[:-1])
195         module = importlib.import_module(module_path)
196         cls = getattr(module, path_split[-1])
197
198         self.config['object'] = class_name
199
200         # run a potentially configured pre-start action
201         if "pre-start-action" in self.config:
202             command = self.config["pre-start-action"]["command"]
203             log.debug("pre start action: command: '%s'" % command)
204             ret_code, data = _execute_shell_command(command)
205             if ret_code < 0:
206                 log.error("pre-start action error! command:%s", command)
207                 self.result_queue.put({'pre-start-action-data': data})
208                 return
209             log.debug("pre-start data: \n%s" % data)
210             self.result_queue.put({'pre-start-action-data': data})
211
212         if "single-shot-action" in self.config:
213             single_action_process = multiprocessing.Process(
214                 target=_single_action,
215                 name="single-shot-action",
216                 args=(self.config["single-shot-action"]["after"],
217                       self.config["single-shot-action"]["command"],
218                       self.result_queue))
219             single_action_process.start()
220
221         if "periodic-action" in self.config:
222             self.periodic_action_process = multiprocessing.Process(
223                 target=_periodic_action,
224                 name="periodic-action",
225                 args=(self.config["periodic-action"]["interval"],
226                       self.config["periodic-action"]["command"],
227                       self.result_queue))
228             self.periodic_action_process.start()
229
230         self._run_benchmark(cls, "run", scenario_cfg)
231
232     def join(self):
233         self.process.join()
234         if self.periodic_action_process:
235             self.periodic_action_process.terminate()
236             self.periodic_action_process = None
237
238         self.run_post_stop_action()
239         return self.process.exitcode