3a6b2e1d6975383bdb8ccf4d1e7e9eddaa8e5589
[yardstick.git] / yardstick / benchmark / runners / iteration.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 '''A runner that runs a configurable number of times before it returns
11 '''
12
13 import os
14 import multiprocessing
15 import logging
16 import traceback
17 import time
18
19 from yardstick.benchmark.runners import base
20
21 LOG = logging.getLogger(__name__)
22
23
24 def _worker_process(queue, cls, method_name, scenario_cfg):
25
26     sequence = 1
27
28     runner_cfg = scenario_cfg['runner']
29
30     interval = runner_cfg.get("interval", 1)
31     iterations = runner_cfg.get("iterations", 1)
32     LOG.info("worker START, iterations %d times, class %s", iterations, cls)
33
34     runner_cfg['runner_id'] = os.getpid()
35
36     benchmark = cls(runner_cfg)
37     benchmark.setup()
38     method = getattr(benchmark, method_name)
39
40     queue.put({'runner_id': runner_cfg['runner_id'],
41                'scenario_cfg': scenario_cfg})
42
43     sla_action = None
44     if "sla" in scenario_cfg:
45         sla_action = scenario_cfg["sla"].get("action", "assert")
46
47     while True:
48
49         LOG.debug("runner=%(runner)s seq=%(sequence)s START" %
50                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
51
52         data = {}
53         errors = ""
54
55         try:
56             data = method(scenario_cfg)
57         except AssertionError as assertion:
58             # SLA validation failed in scenario, determine what to do now
59             if sla_action == "assert":
60                 raise
61             elif sla_action == "monitor":
62                 LOG.warning("SLA validation failed: %s" % assertion.args)
63                 errors = assertion.args
64         except Exception as e:
65             errors = traceback.format_exc()
66             LOG.exception(e)
67
68         time.sleep(interval)
69
70         benchmark_output = {
71             'timestamp': time.time(),
72             'sequence': sequence,
73             'data': data,
74             'errors': errors
75         }
76
77         record = {'runner_id': runner_cfg['runner_id'],
78                   'benchmark': benchmark_output}
79
80         queue.put(record)
81
82         LOG.debug("runner=%(runner)s seq=%(sequence)s END" %
83                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
84
85         sequence += 1
86
87         if (errors and sla_action is None) or (sequence > iterations):
88             LOG.info("worker END")
89             break
90
91     benchmark.teardown()
92
93
94 class IterationRunner(base.Runner):
95     '''Run a scenario for a configurable number of times
96
97 If the scenario ends before the time has elapsed, it will be started again.
98
99   Parameters
100     iterations - amount of times the scenario will be run for
101         type:    int
102         unit:    na
103         default: 1
104     interval - time to wait between each scenario invocation
105         type:    int
106         unit:    seconds
107         default: 1 sec
108     '''
109     __execution_type__ = 'Iteration'
110
111     def _run_benchmark(self, cls, method, scenario_cfg):
112         self.process = multiprocessing.Process(
113             target=_worker_process,
114             args=(self.result_queue, cls, method, scenario_cfg))
115         self.process.start()