30fa0763989dd6742cfb143d6ca5f51660e0543a
[yardstick.git] / yardstick / benchmark / runners / base.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import importlib
11 import json
12 import logging
13 import multiprocessing
14 import subprocess
15 import time
16
17 log = logging.getLogger(__name__)
18
19 import yardstick.common.utils as utils
20 from yardstick.benchmark.scenarios import base as base_scenario
21
22
23 def _output_serializer_main(filename, queue):
24     '''entrypoint for the singleton subprocess writing to outfile
25     Use of this process enables multiple instances of a scenario without
26     messing up the output file.
27     '''
28     with open(filename, 'a+') as outfile:
29         while True:
30             # blocks until data becomes available
31             record = queue.get()
32             if record == '_TERMINATE_':
33                 outfile.close()
34                 break
35             else:
36                 json.dump(record, outfile)
37                 outfile.write('\n')
38
39
40 def _single_action(seconds, command, queue):
41     '''entrypoint for the single action process'''
42     log.debug("single action, fires after %d seconds (from now)", seconds)
43     time.sleep(seconds)
44     log.debug("single action: executing command: '%s'", command)
45     data = subprocess.check_output(command, shell=True)
46     log.debug("\n%s" % data)
47
48
49 def _periodic_action(interval, command, queue):
50     '''entrypoint for the periodic action process'''
51     log.debug("periodic action, fires every: %d seconds", interval)
52     time_spent = 0
53     while True:
54         time.sleep(interval)
55         time_spent += interval
56         log.debug("periodic action, executing command: '%s'", command)
57         data = subprocess.check_output(command, shell=True)
58         log.debug("\n%s" % data)
59
60
61 class Runner(object):
62     queue = None
63     dump_process = None
64     runners = []
65
66     @staticmethod
67     def get_cls(runner_type):
68         '''return class of specified type'''
69         for runner in utils.itersubclasses(Runner):
70             if runner_type == runner.__execution_type__:
71                 return runner
72         raise RuntimeError("No such runner_type %s" % runner_type)
73
74     @staticmethod
75     def get_types():
76         '''return a list of known runner type (class) names'''
77         types = []
78         for runner in utils.itersubclasses(Runner):
79             types.append(runner)
80         return types
81
82     @staticmethod
83     def get(config):
84         """Returns instance of a scenario runner for execution type.
85         """
86         # if there is no runner, start the output serializer subprocess
87         if len(Runner.runners) == 0:
88             log.debug("Starting dump process file '%s'" %
89                       config["output_filename"])
90             Runner.queue = multiprocessing.Queue()
91             Runner.dump_process = multiprocessing.Process(
92                 target=_output_serializer_main,
93                 name="Dumper",
94                 args=(config["output_filename"], Runner.queue))
95             Runner.dump_process.start()
96
97         return Runner.get_cls(config["type"])(config, Runner.queue)
98
99     @staticmethod
100     def release(runner):
101         '''Release the runner'''
102         Runner.runners.remove(runner)
103         # if this was the last runner, stop the output serializer subprocess
104         if len(Runner.runners) == 0:
105             log.debug("Stopping dump process")
106             Runner.queue.put('_TERMINATE_')
107             Runner.dump_process.join()
108
109     @staticmethod
110     def terminate_all():
111         '''Terminate all runners (subprocesses)'''
112         log.debug("Terminating all runners")
113         for runner in Runner.runners:
114             if runner.periodic_action_process:
115                 log.debug("Terminating periodic action process")
116                 runner.periodic_action_process.terminate()
117                 runner.periodic_action_process = None
118             runner.process.terminate()
119             runner.process.join()
120             Runner.release(runner)
121
122     def __init__(self, config, queue):
123         self.context = {}
124         self.config = config
125         self.periodic_action_process = None
126         self.result_queue = queue
127         Runner.runners.append(self)
128
129     def run_pre_start_action(self):
130         '''run a potentially configured pre-start action'''
131         if "pre-start-action" in self.config:
132             command = self.config["pre-start-action"]["command"]
133             log.debug("pre start action: command: '%s'" % command)
134             data = subprocess.check_output(command, shell=True)
135             log.debug("pre-start data: \n%s" % data)
136             output = "{'pre-start-action-data': %s}" % data
137             self.result_queue.put(output)
138
139     def run_post_stop_action(self):
140         '''run a potentially configured post-stop action'''
141         if "post-stop-action" in self.config:
142             command = self.config["post-stop-action"]["command"]
143             log.debug("post stop action: command: '%s'" % command)
144             data = subprocess.check_output(command, shell=True)
145             log.debug("post-stop data: \n%s" % data)
146             output = "{'post-stop-action-data': %s}" % data
147             self.result_queue.put(output)
148
149     def run(self, scenario_type, scenario_args):
150         class_name = base_scenario.Scenario.get(scenario_type)
151         path_split = class_name.split(".")
152         module_path = ".".join(path_split[:-1])
153         module = importlib.import_module(module_path)
154         cls = getattr(module, path_split[-1])
155
156         self.config['object'] = class_name
157
158         self.run_pre_start_action()
159
160         if "single-shot-action" in self.config:
161             single_action_process = multiprocessing.Process(
162                 target=_single_action,
163                 name="single-shot-action",
164                 args=(self.config["single-shot-action"]["after"],
165                       self.config["single-shot-action"]["command"],
166                       self.result_queue))
167             single_action_process.start()
168
169         if "periodic-action" in self.config:
170             self.periodic_action_process = multiprocessing.Process(
171                 target=_periodic_action,
172                 name="periodic-action",
173                 args=(self.config["periodic-action"]["interval"],
174                       self.config["periodic-action"]["command"],
175                       self.result_queue))
176             self.periodic_action_process.start()
177
178         self._run_benchmark(cls, "run", scenario_args)
179
180     def join(self):
181         self.process.join()
182         if self.periodic_action_process:
183             self.periodic_action_process.terminate()
184             self.periodic_action_process = None
185         self.run_post_stop_action()
186         return self.process.exitcode