drain runner queue and undo cancel_join_thread
[yardstick.git] / yardstick / benchmark / runners / dynamictp.py
1 # Copyright 2016: Nokia
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/constant.py
18
19 """A runner that searches for the max throughput with binary search
20 """
21
22 import os
23 import multiprocessing
24 import logging
25 import traceback
26 import time
27
28 from yardstick.benchmark.runners import base
29
30 LOG = logging.getLogger(__name__)
31
32
33 def _worker_process(queue, cls, method_name, scenario_cfg,
34                     context_cfg, aborted):  # pragma: no cover
35
36     runner_cfg = scenario_cfg['runner']
37     iterations = runner_cfg.get("iterations", 1)
38     interval = runner_cfg.get("interval", 1)
39     run_step = runner_cfg.get("run_step", "setup,run,teardown")
40     delta = runner_cfg.get("delta", 1000)
41     options_cfg = scenario_cfg['options']
42     initial_rate = options_cfg.get("pps", 1000000)
43     LOG.info("worker START, class %s", cls)
44
45     runner_cfg['runner_id'] = os.getpid()
46
47     benchmark = cls(scenario_cfg, context_cfg)
48     if "setup" in run_step:
49         benchmark.setup()
50
51     method = getattr(benchmark, method_name)
52
53     queue.put({'runner_id': runner_cfg['runner_id'],
54                'scenario_cfg': scenario_cfg,
55                'context_cfg': context_cfg})
56
57     if "run" in run_step:
58         iterator = 0
59         search_max = initial_rate
60         search_min = 0
61         while iterator < iterations:
62             search_min = int(search_min / 2)
63             scenario_cfg['options']['pps'] = search_max
64             search_max_found = False
65             max_throuput_found = False
66             sequence = 0
67
68             last_min_data = {}
69             last_min_data['packets_per_second'] = 0
70
71             while True:
72                 sequence += 1
73
74                 data = {}
75                 errors = ""
76                 too_high = False
77
78                 LOG.debug("sequence: %s search_min: %s search_max: %s",
79                           sequence, search_min, search_max)
80
81                 try:
82                     method(data)
83                 except AssertionError as assertion:
84                     LOG.warning("SLA validation failed: %s" % assertion.args)
85                     too_high = True
86                 except Exception as e:
87                     errors = traceback.format_exc()
88                     LOG.exception(e)
89
90                 actual_pps = data['packets_per_second']
91
92                 if too_high:
93                     search_max = actual_pps
94
95                     if not search_max_found:
96                         search_max_found = True
97                 else:
98                     last_min_data = data
99                     search_min = actual_pps
100
101                     # Check if the actual rate is well below the asked rate
102                     if scenario_cfg['options']['pps'] > actual_pps * 1.5:
103                         search_max = actual_pps
104                         LOG.debug("Sender reached max tput: %s", search_max)
105                     elif not search_max_found:
106                         search_max = int(actual_pps * 1.5)
107
108                 if ((search_max - search_min) < delta) or \
109                    (search_max <= search_min) or (10 <= sequence):
110                     if last_min_data['packets_per_second'] > 0:
111                         data = last_min_data
112
113                     benchmark_output = {
114                         'timestamp': time.time(),
115                         'sequence': sequence,
116                         'data': data,
117                         'errors': errors
118                     }
119
120                     record = {
121                         'runner_id': runner_cfg['runner_id'],
122                         'benchmark': benchmark_output
123                     }
124
125                     queue.put(record)
126                     max_throuput_found = True
127
128                 if (errors) or aborted.is_set() or max_throuput_found:
129                     LOG.info("worker END")
130                     break
131
132                 if not search_max_found:
133                     scenario_cfg['options']['pps'] = search_max
134                 else:
135                     scenario_cfg['options']['pps'] = \
136                         (search_max - search_min) / 2 + search_min
137
138                 time.sleep(interval)
139
140             iterator += 1
141             LOG.debug("iterator: %s iterations: %s", iterator, iterations)
142
143     if "teardown" in run_step:
144         try:
145             benchmark.teardown()
146         except Exception:
147             # catch any exception in teardown and convert to simple exception
148             # never pass exceptions back to multiprocessing, because some exceptions can
149             # be unpicklable
150             # https://bugs.python.org/issue9400
151             LOG.exception("")
152             raise SystemExit(1)
153
154     LOG.debug("queue.qsize() = %s", queue.qsize())
155
156
157 class IterationRunner(base.Runner):
158     '''Run a scenario to find the max throughput
159
160 If the scenario ends before the time has elapsed, it will be started again.
161
162   Parameters
163     interval - time to wait between each scenario invocation
164         type:    int
165         unit:    seconds
166         default: 1 sec
167     delta - stop condition for the search.
168         type:    int
169         unit:    pps
170         default: 1000 pps
171     '''
172     __execution_type__ = 'Dynamictp'
173
174     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
175         self.process = multiprocessing.Process(
176             target=_worker_process,
177             args=(self.result_queue, cls, method, scenario_cfg,
178                   context_cfg, self.aborted))
179         self.process.start()