Merge "NSB: cancel all queue join threads"
[yardstick.git] / yardstick / benchmark / runners / dynamictp.py
1 # Copyright 2016: Nokia
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/constant.py
18
19 """A runner that searches for the max throughput with binary search
20 """
21
22 import os
23 import multiprocessing
24 import logging
25 import traceback
26 import time
27
28 from yardstick.benchmark.runners import base
29
30 LOG = logging.getLogger(__name__)
31
32
33 def _worker_process(queue, cls, method_name, scenario_cfg,
34                     context_cfg, aborted):  # pragma: no cover
35
36     queue.cancel_join_thread()
37     runner_cfg = scenario_cfg['runner']
38     iterations = runner_cfg.get("iterations", 1)
39     interval = runner_cfg.get("interval", 1)
40     run_step = runner_cfg.get("run_step", "setup,run,teardown")
41     delta = runner_cfg.get("delta", 1000)
42     options_cfg = scenario_cfg['options']
43     initial_rate = options_cfg.get("pps", 1000000)
44     LOG.info("worker START, class %s", cls)
45
46     runner_cfg['runner_id'] = os.getpid()
47
48     benchmark = cls(scenario_cfg, context_cfg)
49     if "setup" in run_step:
50         benchmark.setup()
51
52     method = getattr(benchmark, method_name)
53
54     queue.put({'runner_id': runner_cfg['runner_id'],
55                'scenario_cfg': scenario_cfg,
56                'context_cfg': context_cfg})
57
58     if "run" in run_step:
59         iterator = 0
60         search_max = initial_rate
61         search_min = 0
62         while iterator < iterations:
63             search_min = int(search_min / 2)
64             scenario_cfg['options']['pps'] = search_max
65             search_max_found = False
66             max_throuput_found = False
67             sequence = 0
68
69             last_min_data = {}
70             last_min_data['packets_per_second'] = 0
71
72             while True:
73                 sequence += 1
74
75                 data = {}
76                 errors = ""
77                 too_high = False
78
79                 LOG.debug("sequence: %s search_min: %s search_max: %s",
80                           sequence, search_min, search_max)
81
82                 try:
83                     method(data)
84                 except AssertionError as assertion:
85                     LOG.warning("SLA validation failed: %s" % assertion.args)
86                     too_high = True
87                 except Exception as e:
88                     errors = traceback.format_exc()
89                     LOG.exception(e)
90
91                 actual_pps = data['packets_per_second']
92
93                 if too_high:
94                     search_max = actual_pps
95
96                     if not search_max_found:
97                         search_max_found = True
98                 else:
99                     last_min_data = data
100                     search_min = actual_pps
101
102                     # Check if the actual rate is well below the asked rate
103                     if scenario_cfg['options']['pps'] > actual_pps * 1.5:
104                         search_max = actual_pps
105                         LOG.debug("Sender reached max tput: %s", search_max)
106                     elif not search_max_found:
107                         search_max = int(actual_pps * 1.5)
108
109                 if ((search_max - search_min) < delta) or \
110                    (search_max <= search_min) or (10 <= sequence):
111                     if last_min_data['packets_per_second'] > 0:
112                         data = last_min_data
113
114                     benchmark_output = {
115                         'timestamp': time.time(),
116                         'sequence': sequence,
117                         'data': data,
118                         'errors': errors
119                     }
120
121                     record = {
122                         'runner_id': runner_cfg['runner_id'],
123                         'benchmark': benchmark_output
124                     }
125
126                     queue.put(record)
127                     max_throuput_found = True
128
129                 if (errors) or aborted.is_set() or max_throuput_found:
130                     LOG.info("worker END")
131                     break
132
133                 if not search_max_found:
134                     scenario_cfg['options']['pps'] = search_max
135                 else:
136                     scenario_cfg['options']['pps'] = \
137                         (search_max - search_min) / 2 + search_min
138
139                 time.sleep(interval)
140
141             iterator += 1
142             LOG.debug("iterator: %s iterations: %s", iterator, iterations)
143
144     if "teardown" in run_step:
145         benchmark.teardown()
146
147
148 class IterationRunner(base.Runner):
149     '''Run a scenario to find the max throughput
150
151 If the scenario ends before the time has elapsed, it will be started again.
152
153   Parameters
154     interval - time to wait between each scenario invocation
155         type:    int
156         unit:    seconds
157         default: 1 sec
158     delta - stop condition for the search.
159         type:    int
160         unit:    pps
161         default: 1000 pps
162     '''
163     __execution_type__ = 'Dynamictp'
164
165     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
166         self.process = multiprocessing.Process(
167             target=_worker_process,
168             args=(self.result_queue, cls, method, scenario_cfg,
169                   context_cfg, self.aborted))
170         self.process.start()