Merge "Workaround to fix Heat stack deletion bug in Shade"
[yardstick.git] / yardstick / benchmark / runners / dynamictp.py
1 # Copyright 2016: Nokia
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/constant.py
18
19 """A runner that searches for the max throughput with binary search
20 """
21
22 import logging
23 import multiprocessing
24 import time
25 import traceback
26
27 import os
28
29 from yardstick.benchmark.runners import base
30
31 LOG = logging.getLogger(__name__)
32
33
34 def _worker_process(queue, cls, method_name, scenario_cfg,
35                     context_cfg, aborted):  # pragma: no cover
36
37     runner_cfg = scenario_cfg['runner']
38     iterations = runner_cfg.get("iterations", 1)
39     interval = runner_cfg.get("interval", 1)
40     run_step = runner_cfg.get("run_step", "setup,run,teardown")
41     delta = runner_cfg.get("delta", 1000)
42     options_cfg = scenario_cfg['options']
43     initial_rate = options_cfg.get("pps", 1000000)
44     LOG.info("worker START, class %s", cls)
45
46     runner_cfg['runner_id'] = os.getpid()
47
48     benchmark = cls(scenario_cfg, context_cfg)
49     if "setup" in run_step:
50         benchmark.setup()
51
52     method = getattr(benchmark, method_name)
53
54     queue.put({'runner_id': runner_cfg['runner_id'],
55                'scenario_cfg': scenario_cfg,
56                'context_cfg': context_cfg})
57
58     if "run" in run_step:
59         iterator = 0
60         search_max = initial_rate
61         search_min = 0
62         while iterator < iterations:
63             search_min = int(search_min / 2)
64             scenario_cfg['options']['pps'] = search_max
65             search_max_found = False
66             max_throuput_found = False
67             sequence = 0
68
69             last_min_data = {'packets_per_second': 0}
70
71             while True:
72                 sequence += 1
73
74                 data = {}
75                 errors = ""
76                 too_high = False
77
78                 LOG.debug("sequence: %s search_min: %s search_max: %s",
79                           sequence, search_min, search_max)
80
81                 try:
82                     method(data)
83                 except AssertionError as assertion:
84                     LOG.warning("SLA validation failed: %s" % assertion.args)
85                     too_high = True
86                 except Exception as e:
87                     errors = traceback.format_exc()
88                     LOG.exception(e)
89
90                 actual_pps = data['packets_per_second']
91
92                 if too_high:
93                     search_max = actual_pps
94
95                     if not search_max_found:
96                         search_max_found = True
97                 else:
98                     last_min_data = data
99                     search_min = actual_pps
100
101                     # Check if the actual rate is well below the asked rate
102                     if scenario_cfg['options']['pps'] > actual_pps * 1.5:
103                         search_max = actual_pps
104                         LOG.debug("Sender reached max tput: %s", search_max)
105                     elif not search_max_found:
106                         search_max = int(actual_pps * 1.5)
107
108                 if ((search_max - search_min) < delta) or \
109                    (search_max <= search_min) or (10 <= sequence):
110                     if last_min_data['packets_per_second'] > 0:
111                         data = last_min_data
112
113                     benchmark_output = {
114                         'timestamp': time.time(),
115                         'sequence': sequence,
116                         'data': data,
117                         'errors': errors
118                     }
119
120                     record = {
121                         'runner_id': runner_cfg['runner_id'],
122                         'benchmark': benchmark_output
123                     }
124
125                     queue.put(record)
126                     max_throuput_found = True
127
128                 if errors or aborted.is_set() or max_throuput_found:
129                     LOG.info("worker END")
130                     break
131
132                 if not search_max_found:
133                     scenario_cfg['options']['pps'] = search_max
134                 else:
135                     scenario_cfg['options']['pps'] = \
136                         (search_max - search_min) / 2 + search_min
137
138                 time.sleep(interval)
139
140             iterator += 1
141             LOG.debug("iterator: %s iterations: %s", iterator, iterations)
142
143     if "teardown" in run_step:
144         try:
145             benchmark.teardown()
146         except Exception:
147             # catch any exception in teardown and convert to simple exception
148             # never pass exceptions back to multiprocessing, because some exceptions can
149             # be unpicklable
150             # https://bugs.python.org/issue9400
151             LOG.exception("")
152             raise SystemExit(1)
153
154     LOG.debug("queue.qsize() = %s", queue.qsize())
155
156
157 class IterationRunner(base.Runner):
158     """Run a scenario to find the max throughput
159
160 If the scenario ends before the time has elapsed, it will be started again.
161
162   Parameters
163     interval - time to wait between each scenario invocation
164         type:    int
165         unit:    seconds
166         default: 1 sec
167     delta - stop condition for the search.
168         type:    int
169         unit:    pps
170         default: 1000 pps
171     """
172     __execution_type__ = 'Dynamictp'
173
174     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
175         name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
176         self.process = multiprocessing.Process(
177             name=name,
178             target=_worker_process,
179             args=(self.result_queue, cls, method, scenario_cfg,
180                   context_cfg, self.aborted))
181         self.process.start()