support for ipv6
[yardstick.git] / yardstick / benchmark / runners / iteration.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 '''A runner that runs a configurable number of times before it returns
11 '''
12
13 import os
14 import multiprocessing
15 import logging
16 import traceback
17 import time
18
19 from yardstick.benchmark.runners import base
20
21 LOG = logging.getLogger(__name__)
22
23
24 def _worker_process(queue, cls, method_name, scenario_cfg,
25                     context_cfg, aborted):
26
27     sequence = 1
28
29     runner_cfg = scenario_cfg['runner']
30
31     interval = runner_cfg.get("interval", 1)
32     iterations = runner_cfg.get("iterations", 1)
33     run_step = runner_cfg.get("run_step", "setup,run,teardown")
34     LOG.info("worker START, iterations %d times, class %s", iterations, cls)
35
36     runner_cfg['runner_id'] = os.getpid()
37
38     benchmark = cls(scenario_cfg, context_cfg)
39     if "setup" in run_step:
40         benchmark.setup()
41
42     method = getattr(benchmark, method_name)
43
44     queue.put({'runner_id': runner_cfg['runner_id'],
45                'scenario_cfg': scenario_cfg,
46                'context_cfg': context_cfg})
47
48     sla_action = None
49     if "sla" in scenario_cfg:
50         sla_action = scenario_cfg["sla"].get("action", "assert")
51     if "run" in run_step:
52         while True:
53
54             LOG.debug("runner=%(runner)s seq=%(sequence)s START" %
55                       {"runner": runner_cfg["runner_id"],
56                        "sequence": sequence})
57
58             data = {}
59             errors = ""
60
61             try:
62                 method(data)
63             except AssertionError as assertion:
64                 # SLA validation failed in scenario, determine what to do now
65                 if sla_action == "assert":
66                     raise
67                 elif sla_action == "monitor":
68                     LOG.warning("SLA validation failed: %s" % assertion.args)
69                     errors = assertion.args
70             except Exception as e:
71                 errors = traceback.format_exc()
72                 LOG.exception(e)
73
74             time.sleep(interval)
75
76             benchmark_output = {
77                 'timestamp': time.time(),
78                 'sequence': sequence,
79                 'data': data,
80                 'errors': errors
81             }
82
83             record = {'runner_id': runner_cfg['runner_id'],
84                       'benchmark': benchmark_output}
85
86             queue.put(record)
87
88             LOG.debug("runner=%(runner)s seq=%(sequence)s END" %
89                       {"runner": runner_cfg["runner_id"],
90                        "sequence": sequence})
91
92             sequence += 1
93
94             if (errors and sla_action is None) or \
95                     (sequence > iterations or aborted.is_set()):
96                 LOG.info("worker END")
97                 break
98     if "teardown" in run_step:
99         benchmark.teardown()
100
101
102 class IterationRunner(base.Runner):
103     '''Run a scenario for a configurable number of times
104
105 If the scenario ends before the time has elapsed, it will be started again.
106
107   Parameters
108     iterations - amount of times the scenario will be run for
109         type:    int
110         unit:    na
111         default: 1
112     interval - time to wait between each scenario invocation
113         type:    int
114         unit:    seconds
115         default: 1 sec
116     '''
117     __execution_type__ = 'Iteration'
118
119     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
120         self.process = multiprocessing.Process(
121             target=_worker_process,
122             args=(self.result_queue, cls, method, scenario_cfg,
123                   context_cfg, self.aborted))
124         self.process.start()