e38ed37494047c5898ff211d11378b7b45e298fa
[yardstick.git] / yardstick / benchmark / runners / iteration.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 '''A runner that runs a configurable number of times before it returns
11 '''
12
13 import os
14 import multiprocessing
15 import logging
16 import traceback
17 import time
18
19 from yardstick.benchmark.runners import base
20
21 LOG = logging.getLogger(__name__)
22
23
24 def _worker_process(queue, cls, method_name, scenario_cfg,
25                     context_cfg, aborted):
26
27     sequence = 1
28
29     runner_cfg = scenario_cfg['runner']
30
31     interval = runner_cfg.get("interval", 1)
32     iterations = runner_cfg.get("iterations", 1)
33     LOG.info("worker START, iterations %d times, class %s", iterations, cls)
34
35     runner_cfg['runner_id'] = os.getpid()
36
37     benchmark = cls(scenario_cfg, context_cfg)
38     benchmark.setup()
39     method = getattr(benchmark, method_name)
40
41     queue.put({'runner_id': runner_cfg['runner_id'],
42                'scenario_cfg': scenario_cfg,
43                'context_cfg': context_cfg})
44
45     sla_action = None
46     if "sla" in scenario_cfg:
47         sla_action = scenario_cfg["sla"].get("action", "assert")
48
49     while True:
50
51         LOG.debug("runner=%(runner)s seq=%(sequence)s START" %
52                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
53
54         data = {}
55         errors = ""
56
57         try:
58             method(data)
59         except AssertionError as assertion:
60             # SLA validation failed in scenario, determine what to do now
61             if sla_action == "assert":
62                 raise
63             elif sla_action == "monitor":
64                 LOG.warning("SLA validation failed: %s" % assertion.args)
65                 errors = assertion.args
66         except Exception as e:
67             errors = traceback.format_exc()
68             LOG.exception(e)
69
70         time.sleep(interval)
71
72         benchmark_output = {
73             'timestamp': time.time(),
74             'sequence': sequence,
75             'data': data,
76             'errors': errors
77         }
78
79         record = {'runner_id': runner_cfg['runner_id'],
80                   'benchmark': benchmark_output}
81
82         queue.put(record)
83
84         LOG.debug("runner=%(runner)s seq=%(sequence)s END" %
85                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
86
87         sequence += 1
88
89         if (errors and sla_action is None) or \
90                 (sequence > iterations or aborted.is_set()):
91             LOG.info("worker END")
92             break
93
94     benchmark.teardown()
95
96
97 class IterationRunner(base.Runner):
98     '''Run a scenario for a configurable number of times
99
100 If the scenario ends before the time has elapsed, it will be started again.
101
102   Parameters
103     iterations - amount of times the scenario will be run for
104         type:    int
105         unit:    na
106         default: 1
107     interval - time to wait between each scenario invocation
108         type:    int
109         unit:    seconds
110         default: 1 sec
111     '''
112     __execution_type__ = 'Iteration'
113
114     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
115         self.process = multiprocessing.Process(
116             target=_worker_process,
117             args=(self.result_queue, cls, method, scenario_cfg,
118                   context_cfg, self.aborted))
119         self.process.start()