8483226796fe89bbb3bf489d8bb4ff970211cf96
[yardstick.git] / yardstick / benchmark / runners / base.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import importlib
11 import logging
12 import multiprocessing
13 import subprocess
14 import time
15
16 log = logging.getLogger(__name__)
17
18 import yardstick.common.utils as utils
19 from yardstick.benchmark.scenarios import base as base_scenario
20 from yardstick.dispatcher.base import Base as DispatcherBase
21
22
23 def _output_serializer_main(filename, queue):
24     '''entrypoint for the singleton subprocess writing to outfile
25     Use of this process enables multiple instances of a scenario without
26     messing up the output file.
27     '''
28     config = {}
29     config["type"] = "File"
30     config["file_path"] = filename
31     dispatcher = DispatcherBase.get(config)
32
33     while True:
34         # blocks until data becomes available
35         record = queue.get()
36         if record == '_TERMINATE_':
37             break
38         else:
39             dispatcher.record_result_data(record)
40
41
42 def _single_action(seconds, command, queue):
43     '''entrypoint for the single action process'''
44     log.debug("single action, fires after %d seconds (from now)", seconds)
45     time.sleep(seconds)
46     log.debug("single action: executing command: '%s'", command)
47     data = subprocess.check_output(command, shell=True)
48     log.debug("\n%s" % data)
49
50
51 def _periodic_action(interval, command, queue):
52     '''entrypoint for the periodic action process'''
53     log.debug("periodic action, fires every: %d seconds", interval)
54     time_spent = 0
55     while True:
56         time.sleep(interval)
57         time_spent += interval
58         log.debug("periodic action, executing command: '%s'", command)
59         data = subprocess.check_output(command, shell=True)
60         log.debug("\n%s" % data)
61
62
63 class Runner(object):
64     queue = None
65     dump_process = None
66     runners = []
67
68     @staticmethod
69     def get_cls(runner_type):
70         '''return class of specified type'''
71         for runner in utils.itersubclasses(Runner):
72             if runner_type == runner.__execution_type__:
73                 return runner
74         raise RuntimeError("No such runner_type %s" % runner_type)
75
76     @staticmethod
77     def get_types():
78         '''return a list of known runner type (class) names'''
79         types = []
80         for runner in utils.itersubclasses(Runner):
81             types.append(runner)
82         return types
83
84     @staticmethod
85     def get(config):
86         """Returns instance of a scenario runner for execution type.
87         """
88         # if there is no runner, start the output serializer subprocess
89         if len(Runner.runners) == 0:
90             log.debug("Starting dump process file '%s'" %
91                       config["output_filename"])
92             Runner.queue = multiprocessing.Queue()
93             Runner.dump_process = multiprocessing.Process(
94                 target=_output_serializer_main,
95                 name="Dumper",
96                 args=(config["output_filename"], Runner.queue))
97             Runner.dump_process.start()
98
99         return Runner.get_cls(config["type"])(config, Runner.queue)
100
101     @staticmethod
102     def release(runner):
103         '''Release the runner'''
104         Runner.runners.remove(runner)
105         # if this was the last runner, stop the output serializer subprocess
106         if len(Runner.runners) == 0:
107             log.debug("Stopping dump process")
108             Runner.queue.put('_TERMINATE_')
109             Runner.dump_process.join()
110
111     @staticmethod
112     def terminate_all():
113         '''Terminate all runners (subprocesses)'''
114         log.debug("Terminating all runners")
115         for runner in Runner.runners:
116             if runner.periodic_action_process:
117                 log.debug("Terminating periodic action process")
118                 runner.periodic_action_process.terminate()
119                 runner.periodic_action_process = None
120             runner.process.terminate()
121             runner.process.join()
122             Runner.release(runner)
123
124     def __init__(self, config, queue):
125         self.context = {}
126         self.config = config
127         self.periodic_action_process = None
128         self.result_queue = queue
129         Runner.runners.append(self)
130
131     def run_pre_start_action(self):
132         '''run a potentially configured pre-start action'''
133         if "pre-start-action" in self.config:
134             command = self.config["pre-start-action"]["command"]
135             log.debug("pre start action: command: '%s'" % command)
136             data = subprocess.check_output(command, shell=True)
137             log.debug("pre-start data: \n%s" % data)
138             output = "{'pre-start-action-data': %s}" % data
139             self.result_queue.put(output)
140
141     def run_post_stop_action(self):
142         '''run a potentially configured post-stop action'''
143         if "post-stop-action" in self.config:
144             command = self.config["post-stop-action"]["command"]
145             log.debug("post stop action: command: '%s'" % command)
146             data = subprocess.check_output(command, shell=True)
147             log.debug("post-stop data: \n%s" % data)
148             output = "{'post-stop-action-data': %s}" % data
149             self.result_queue.put(output)
150
151     def run(self, scenario_type, scenario_args):
152         class_name = base_scenario.Scenario.get(scenario_type)
153         path_split = class_name.split(".")
154         module_path = ".".join(path_split[:-1])
155         module = importlib.import_module(module_path)
156         cls = getattr(module, path_split[-1])
157
158         self.config['object'] = class_name
159
160         self.run_pre_start_action()
161
162         if "single-shot-action" in self.config:
163             single_action_process = multiprocessing.Process(
164                 target=_single_action,
165                 name="single-shot-action",
166                 args=(self.config["single-shot-action"]["after"],
167                       self.config["single-shot-action"]["command"],
168                       self.result_queue))
169             single_action_process.start()
170
171         if "periodic-action" in self.config:
172             self.periodic_action_process = multiprocessing.Process(
173                 target=_periodic_action,
174                 name="periodic-action",
175                 args=(self.config["periodic-action"]["interval"],
176                       self.config["periodic-action"]["command"],
177                       self.result_queue))
178             self.periodic_action_process.start()
179
180         self._run_benchmark(cls, "run", scenario_args)
181
182     def join(self):
183         self.process.join()
184         if self.periodic_action_process:
185             self.periodic_action_process.terminate()
186             self.periodic_action_process = None
187         self.run_post_stop_action()
188         return self.process.exitcode