e2a21c23c9c011e196ad3f0626f637b51e154412
[yardstick.git] / yardstick / benchmark / runners / duration.py
1 ##############################################################################
2 # Copyright (c) 2015 Ericsson AB and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 '''A runner that runs a specific time before it returns
11 '''
12
13 import os
14 import multiprocessing
15 import logging
16 import traceback
17 import time
18
19 from yardstick.benchmark.runners import base
20
21 LOG = logging.getLogger(__name__)
22
23
24 def _worker_process(queue, cls, method_name, scenario_cfg,
25                     context_cfg, aborted):
26
27     sequence = 1
28
29     runner_cfg = scenario_cfg['runner']
30
31     interval = runner_cfg.get("interval", 1)
32     duration = runner_cfg.get("duration", 60)
33     LOG.info("worker START, duration %d sec, class %s", duration, cls)
34
35     runner_cfg['runner_id'] = os.getpid()
36
37     benchmark = cls(scenario_cfg, context_cfg)
38     benchmark.setup()
39     method = getattr(benchmark, method_name)
40
41     sla_action = None
42     if "sla" in scenario_cfg:
43         sla_action = scenario_cfg["sla"].get("action", "assert")
44
45     queue.put({'runner_id': runner_cfg['runner_id'],
46                'scenario_cfg': scenario_cfg,
47                'context_cfg': context_cfg})
48
49     start = time.time()
50     while True:
51
52         LOG.debug("runner=%(runner)s seq=%(sequence)s START" %
53                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
54
55         data = {}
56         errors = ""
57
58         try:
59             method(data)
60         except AssertionError as assertion:
61             # SLA validation failed in scenario, determine what to do now
62             if sla_action == "assert":
63                 raise
64             elif sla_action == "monitor":
65                 LOG.warning("SLA validation failed: %s" % assertion.args)
66                 errors = assertion.args
67         except Exception as e:
68             errors = traceback.format_exc()
69             LOG.exception(e)
70
71         time.sleep(interval)
72
73         benchmark_output = {
74             'timestamp': time.time(),
75             'sequence': sequence,
76             'data': data,
77             'errors': errors
78         }
79
80         record = {'runner_id': runner_cfg['runner_id'],
81                   'benchmark': benchmark_output}
82
83         queue.put(record)
84
85         LOG.debug("runner=%(runner)s seq=%(sequence)s END" %
86                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
87
88         sequence += 1
89
90         if (errors and sla_action is None) or \
91                 (time.time() - start > duration or aborted.is_set()):
92             LOG.info("worker END")
93             break
94
95     benchmark.teardown()
96
97
98 class DurationRunner(base.Runner):
99     '''Run a scenario for a certain amount of time
100
101 If the scenario ends before the time has elapsed, it will be started again.
102
103   Parameters
104     duration - amount of time the scenario will be run for
105         type:    int
106         unit:    seconds
107         default: 1 sec
108     interval - time to wait between each scenario invocation
109         type:    int
110         unit:    seconds
111         default: 1 sec
112     '''
113     __execution_type__ = 'Duration'
114
115     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
116         self.process = multiprocessing.Process(
117             target=_worker_process,
118             args=(self.result_queue, cls, method, scenario_cfg,
119                   context_cfg, self.aborted))
120         self.process.start()