Merge "Update plotter.py to new yardstick.out format"
[yardstick.git] / yardstick / benchmark / runners / duration.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/constant.py
18
19 '''A runner that runs a specific time before it returns
20 '''
21
22 import os
23 import multiprocessing
24 import logging
25 import traceback
26 import time
27
28 from yardstick.benchmark.runners import base
29
30 LOG = logging.getLogger(__name__)
31
32
33 def _worker_process(queue, cls, method_name, scenario_cfg,
34                     context_cfg, aborted):
35
36     sequence = 1
37
38     runner_cfg = scenario_cfg['runner']
39
40     interval = runner_cfg.get("interval", 1)
41     duration = runner_cfg.get("duration", 60)
42     LOG.info("worker START, duration %d sec, class %s", duration, cls)
43
44     runner_cfg['runner_id'] = os.getpid()
45
46     benchmark = cls(scenario_cfg, context_cfg)
47     benchmark.setup()
48     method = getattr(benchmark, method_name)
49
50     sla_action = None
51     if "sla" in scenario_cfg:
52         sla_action = scenario_cfg["sla"].get("action", "assert")
53
54     queue.put({'runner_id': runner_cfg['runner_id'],
55                'scenario_cfg': scenario_cfg,
56                'context_cfg': context_cfg})
57
58     start = time.time()
59     while True:
60
61         LOG.debug("runner=%(runner)s seq=%(sequence)s START" %
62                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
63
64         data = {}
65         errors = ""
66
67         try:
68             method(data)
69         except AssertionError as assertion:
70             # SLA validation failed in scenario, determine what to do now
71             if sla_action == "assert":
72                 raise
73             elif sla_action == "monitor":
74                 LOG.warning("SLA validation failed: %s" % assertion.args)
75                 errors = assertion.args
76         except Exception as e:
77             errors = traceback.format_exc()
78             LOG.exception(e)
79
80         time.sleep(interval)
81
82         benchmark_output = {
83             'timestamp': time.time(),
84             'sequence': sequence,
85             'data': data,
86             'errors': errors
87         }
88
89         record = {'runner_id': runner_cfg['runner_id'],
90                   'benchmark': benchmark_output}
91
92         queue.put(record)
93
94         LOG.debug("runner=%(runner)s seq=%(sequence)s END" %
95                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
96
97         sequence += 1
98
99         if (errors and sla_action is None) or \
100                 (time.time() - start > duration or aborted.is_set()):
101             LOG.info("worker END")
102             break
103
104     benchmark.teardown()
105
106
107 class DurationRunner(base.Runner):
108     '''Run a scenario for a certain amount of time
109
110 If the scenario ends before the time has elapsed, it will be started again.
111
112   Parameters
113     duration - amount of time the scenario will be run for
114         type:    int
115         unit:    seconds
116         default: 1 sec
117     interval - time to wait between each scenario invocation
118         type:    int
119         unit:    seconds
120         default: 1 sec
121     '''
122     __execution_type__ = 'Duration'
123
124     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
125         self.process = multiprocessing.Process(
126             target=_worker_process,
127             args=(self.result_queue, cls, method, scenario_cfg,
128                   context_cfg, self.aborted))
129         self.process.start()