Merge "Added traffic update capability to Ixload TG"
[yardstick.git] / yardstick / benchmark / runners / proxduration.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/constant.py
18
19 """A runner that runs a specific time before it returns
20 """
21
22 from __future__ import absolute_import
23
24 import os
25 import multiprocessing
26 import logging
27 import traceback
28 import time
29
30 from yardstick.benchmark.runners import base
31 from yardstick.common import exceptions as y_exc
32 from yardstick.common import constants
33
34 LOG = logging.getLogger(__name__)
35
36 def _worker_process(queue, cls, method_name, scenario_cfg,
37                     context_cfg, aborted, output_queue):
38
39     sequence = 1
40
41     runner_cfg = scenario_cfg['runner']
42
43     requested_interval = interval = runner_cfg.get("interval", 1)
44     duration = runner_cfg.get("duration", 60)
45     sampled = runner_cfg.get("sampled", False)
46
47     LOG.info("Worker START, duration is %ds", duration)
48     LOG.debug("class is %s", cls)
49
50     runner_cfg['runner_id'] = os.getpid()
51
52     benchmark = cls(scenario_cfg, context_cfg)
53     benchmark.setup()
54     method = getattr(benchmark, method_name)
55
56     sla_action = None
57     if "sla" in scenario_cfg:
58         sla_action = scenario_cfg["sla"].get("action", "assert")
59
60
61     start = time.time()
62     timeout = start + duration
63     while True:
64
65         LOG.debug("runner=%(runner)s seq=%(sequence)s START",
66                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
67
68         data = {}
69         errors = ""
70
71         benchmark.pre_run_wait_time(interval)
72
73         if sampled:
74             try:
75                 pre_adjustment = time.time()
76                 result = method(data)
77                 post_adjustment = time.time()
78                 if requested_interval > post_adjustment - pre_adjustment:
79                     interval = requested_interval - (post_adjustment - pre_adjustment)
80                 else:
81                     interval = 0
82
83             except y_exc.SLAValidationError as error:
84                 # SLA validation failed in scenario, determine what to do now
85                 if sla_action == "assert":
86                     raise
87                 elif sla_action == "monitor":
88                     LOG.warning("SLA validation failed: %s", error.args)
89                     errors = error.args
90             # catch all exceptions because with multiprocessing we can have un-picklable exception
91             # problems  https://bugs.python.org/issue9400
92             except Exception:  # pylint: disable=broad-except
93                 errors = traceback.format_exc()
94                 LOG.exception("")
95             else:
96                 if result:
97                     # add timeout for put so we don't block test
98                     # if we do timeout we don't care about dropping individual KPIs
99                     output_queue.put(result, True, constants.QUEUE_PUT_TIMEOUT)
100
101             benchmark_output = {
102                 'timestamp': time.time(),
103                 'sequence': sequence,
104                 'data': data,
105                 'errors': errors
106             }
107
108             queue.put(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)
109         else:
110             LOG.debug("No sample collected ...Sequence %s", sequence)
111
112
113         sequence += 1
114
115         if ((errors and sla_action is None) or time.time() > timeout
116                 or aborted.is_set() or benchmark.is_ended()):
117             LOG.info("Worker END")
118             break
119
120     try:
121         benchmark.teardown()
122     except Exception:
123         # catch any exception in teardown and convert to simple exception
124         # never pass exceptions back to multiprocessing, because some exceptions can
125         # be unpicklable
126         # https://bugs.python.org/issue9400
127         LOG.exception("")
128         raise SystemExit(1)
129
130     LOG.debug("queue.qsize() = %s", queue.qsize())
131     LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
132     LOG.info("Exiting ProxDuration Runner...")
133
134 class ProxDurationRunner(base.Runner):
135     """Run a scenario for a certain amount of time
136
137 If the scenario ends before the time has elapsed, it will be started again.
138
139   Parameters
140     duration - amount of time the scenario will be run for
141         type:    int
142         unit:    seconds
143         default: 60 sec
144     interval - time to wait between each scenario invocation
145         type:    int
146         unit:    seconds
147         default: 1 sec
148     sampled - Sample data is required yes/no
149         type:    boolean
150         unit:    True/False
151         default: False
152     confirmation - Number of confirmation retries
153         type:    int
154         unit:    retry attempts
155         default: 0
156     """
157     __execution_type__ = 'ProxDuration'
158
159     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
160         name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
161         self.process = multiprocessing.Process(
162             name=name,
163             target=_worker_process,
164             args=(self.result_queue, cls, method, scenario_cfg,
165                   context_cfg, self.aborted, self.output_queue))
166         self.process.start()