Support general configuration file
[yardstick.git] / yardstick / benchmark / runners / base.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import importlib
11 import logging
12 import multiprocessing
13 import subprocess
14 import time
15
16 log = logging.getLogger(__name__)
17
18 from oslo_config import cfg
19
20 import yardstick.common.utils as utils
21 from yardstick.benchmark.scenarios import base as base_scenario
22 from yardstick.dispatcher.base import Base as DispatcherBase
23
24 CONF = cfg.CONF
25
26
27 def _output_serializer_main(filename, queue):
28     '''entrypoint for the singleton subprocess writing to outfile
29     Use of this process enables multiple instances of a scenario without
30     messing up the output file.
31     '''
32     config = {}
33     config["type"] = CONF.dispatcher.capitalize()
34     config["file_path"] = filename
35     dispatcher = DispatcherBase.get(config)
36
37     while True:
38         # blocks until data becomes available
39         record = queue.get()
40         if record == '_TERMINATE_':
41             break
42         else:
43             dispatcher.record_result_data(record)
44
45
46 def _single_action(seconds, command, queue):
47     '''entrypoint for the single action process'''
48     log.debug("single action, fires after %d seconds (from now)", seconds)
49     time.sleep(seconds)
50     log.debug("single action: executing command: '%s'", command)
51     data = subprocess.check_output(command, shell=True)
52     log.debug("\n%s" % data)
53
54
55 def _periodic_action(interval, command, queue):
56     '''entrypoint for the periodic action process'''
57     log.debug("periodic action, fires every: %d seconds", interval)
58     time_spent = 0
59     while True:
60         time.sleep(interval)
61         time_spent += interval
62         log.debug("periodic action, executing command: '%s'", command)
63         data = subprocess.check_output(command, shell=True)
64         log.debug("\n%s" % data)
65
66
67 class Runner(object):
68     queue = None
69     dump_process = None
70     runners = []
71
72     @staticmethod
73     def get_cls(runner_type):
74         '''return class of specified type'''
75         for runner in utils.itersubclasses(Runner):
76             if runner_type == runner.__execution_type__:
77                 return runner
78         raise RuntimeError("No such runner_type %s" % runner_type)
79
80     @staticmethod
81     def get_types():
82         '''return a list of known runner type (class) names'''
83         types = []
84         for runner in utils.itersubclasses(Runner):
85             types.append(runner)
86         return types
87
88     @staticmethod
89     def get(config):
90         """Returns instance of a scenario runner for execution type.
91         """
92         # if there is no runner, start the output serializer subprocess
93         if len(Runner.runners) == 0:
94             log.debug("Starting dump process file '%s'" %
95                       config["output_filename"])
96             Runner.queue = multiprocessing.Queue()
97             Runner.dump_process = multiprocessing.Process(
98                 target=_output_serializer_main,
99                 name="Dumper",
100                 args=(config["output_filename"], Runner.queue))
101             Runner.dump_process.start()
102
103         return Runner.get_cls(config["type"])(config, Runner.queue)
104
105     @staticmethod
106     def release(runner):
107         '''Release the runner'''
108         Runner.runners.remove(runner)
109         # if this was the last runner, stop the output serializer subprocess
110         if len(Runner.runners) == 0:
111             log.debug("Stopping dump process")
112             Runner.queue.put('_TERMINATE_')
113             Runner.dump_process.join()
114
115     @staticmethod
116     def terminate_all():
117         '''Terminate all runners (subprocesses)'''
118         log.debug("Terminating all runners")
119         for runner in Runner.runners:
120             if runner.periodic_action_process:
121                 log.debug("Terminating periodic action process")
122                 runner.periodic_action_process.terminate()
123                 runner.periodic_action_process = None
124             runner.process.terminate()
125             runner.process.join()
126             Runner.release(runner)
127
128     def __init__(self, config, queue):
129         self.context = {}
130         self.config = config
131         self.periodic_action_process = None
132         self.result_queue = queue
133         Runner.runners.append(self)
134
135     def run_pre_start_action(self):
136         '''run a potentially configured pre-start action'''
137         if "pre-start-action" in self.config:
138             command = self.config["pre-start-action"]["command"]
139             log.debug("pre start action: command: '%s'" % command)
140             data = subprocess.check_output(command, shell=True)
141             log.debug("pre-start data: \n%s" % data)
142             output = "{'pre-start-action-data': %s}" % data
143             self.result_queue.put(output)
144
145     def run_post_stop_action(self):
146         '''run a potentially configured post-stop action'''
147         if "post-stop-action" in self.config:
148             command = self.config["post-stop-action"]["command"]
149             log.debug("post stop action: command: '%s'" % command)
150             data = subprocess.check_output(command, shell=True)
151             log.debug("post-stop data: \n%s" % data)
152             output = "{'post-stop-action-data': %s}" % data
153             self.result_queue.put(output)
154
155     def run(self, scenario_type, scenario_cfg):
156         class_name = base_scenario.Scenario.get(scenario_type)
157         path_split = class_name.split(".")
158         module_path = ".".join(path_split[:-1])
159         module = importlib.import_module(module_path)
160         cls = getattr(module, path_split[-1])
161
162         self.config['object'] = class_name
163
164         self.run_pre_start_action()
165
166         if "single-shot-action" in self.config:
167             single_action_process = multiprocessing.Process(
168                 target=_single_action,
169                 name="single-shot-action",
170                 args=(self.config["single-shot-action"]["after"],
171                       self.config["single-shot-action"]["command"],
172                       self.result_queue))
173             single_action_process.start()
174
175         if "periodic-action" in self.config:
176             self.periodic_action_process = multiprocessing.Process(
177                 target=_periodic_action,
178                 name="periodic-action",
179                 args=(self.config["periodic-action"]["interval"],
180                       self.config["periodic-action"]["command"],
181                       self.result_queue))
182             self.periodic_action_process.start()
183
184         self._run_benchmark(cls, "run", scenario_cfg)
185
186     def join(self):
187         self.process.join()
188         if self.periodic_action_process:
189             self.periodic_action_process.terminate()
190             self.periodic_action_process = None
191         self.run_post_stop_action()
192         return self.process.exitcode