Merge "Separate out test_parse_to_value_exception()"
[yardstick.git] / yardstick / benchmark / runners / proxduration.py
1 # Copyright 2014: Mirantis Inc.
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/constant.py
18
19 """A runner that runs a specific time before it returns
20 """
21
22 from __future__ import absolute_import
23
24 import os
25 import multiprocessing
26 import logging
27 import traceback
28 import time
29
30 from yardstick.benchmark.runners import base
31 from yardstick.common import exceptions as y_exc
32 from yardstick.common import constants
33
34 LOG = logging.getLogger(__name__)
35
36 def _worker_process(queue, cls, method_name, scenario_cfg,
37                     context_cfg, aborted, output_queue):
38
39     sequence = 1
40
41     runner_cfg = scenario_cfg['runner']
42
43     requested_interval = interval = runner_cfg.get("interval", 1)
44     duration = runner_cfg.get("duration", 60)
45     sampled = runner_cfg.get("sampled", False)
46
47     LOG.info("Worker START, duration is %ds", duration)
48     LOG.debug("class is %s", cls)
49
50     runner_cfg['runner_id'] = os.getpid()
51
52     benchmark = cls(scenario_cfg, context_cfg)
53     benchmark.setup()
54     method = getattr(benchmark, method_name)
55
56     sla_action = None
57     if "sla" in scenario_cfg:
58         sla_action = scenario_cfg["sla"].get("action", "assert")
59
60
61     start = time.time()
62     timeout = start + duration
63     while True:
64
65         LOG.debug("runner=%(runner)s seq=%(sequence)s START",
66                   {"runner": runner_cfg["runner_id"], "sequence": sequence})
67
68         data = {}
69         errors = ""
70
71         benchmark.pre_run_wait_time(interval)
72
73         if sampled:
74             try:
75                 pre_adjustment = time.time()
76                 result = method(data)
77                 post_adjustment = time.time()
78                 if requested_interval > post_adjustment - pre_adjustment:
79                     interval = requested_interval - (post_adjustment - pre_adjustment)
80                 else:
81                     interval = 0
82
83             except y_exc.SLAValidationError as error:
84                 # SLA validation failed in scenario, determine what to do now
85                 if sla_action == "assert":
86                     raise
87                 elif sla_action == "monitor":
88                     LOG.warning("SLA validation failed: %s", error.args)
89                     errors = error.args
90             # catch all exceptions because with multiprocessing we can have un-picklable exception
91             # problems  https://bugs.python.org/issue9400
92             except Exception:  # pylint: disable=broad-except
93                 errors = traceback.format_exc()
94                 LOG.exception("")
95             else:
96                 if result:
97                     # add timeout for put so we don't block test
98                     # if we do timeout we don't care about dropping individual KPIs
99                     output_queue.put(result, True, constants.QUEUE_PUT_TIMEOUT)
100
101             benchmark_output = {
102                 'timestamp': time.time(),
103                 'sequence': sequence,
104                 'data': data,
105                 'errors': errors
106             }
107
108             queue.put(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)
109         else:
110             LOG.debug("No sample collected ...Sequence %s", sequence)
111
112
113         sequence += 1
114
115         if (errors and sla_action is None) or time.time() > timeout or aborted.is_set():
116             LOG.info("Worker END")
117             break
118
119     try:
120         benchmark.teardown()
121     except Exception:
122         # catch any exception in teardown and convert to simple exception
123         # never pass exceptions back to multiprocessing, because some exceptions can
124         # be unpicklable
125         # https://bugs.python.org/issue9400
126         LOG.exception("")
127         raise SystemExit(1)
128
129     LOG.debug("queue.qsize() = %s", queue.qsize())
130     LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
131     LOG.info("Exiting ProxDuration Runner...")
132
133 class ProxDurationRunner(base.Runner):
134     """Run a scenario for a certain amount of time
135
136 If the scenario ends before the time has elapsed, it will be started again.
137
138   Parameters
139     duration - amount of time the scenario will be run for
140         type:    int
141         unit:    seconds
142         default: 60 sec
143     interval - time to wait between each scenario invocation
144         type:    int
145         unit:    seconds
146         default: 1 sec
147     sampled - Sample data is required yes/no
148         type:    boolean
149         unit:    True/False
150         default: False
151     confirmation - Number of confirmation retries
152         type:    int
153         unit:    retry attempts
154         default: 0
155     """
156     __execution_type__ = 'ProxDuration'
157
158     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
159         name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
160         self.process = multiprocessing.Process(
161             name=name,
162             target=_worker_process,
163             args=(self.result_queue, cls, method, scenario_cfg,
164                   context_cfg, self.aborted, self.output_queue))
165         self.process.start()