Merge "Update release note for Danube.3.2"
[yardstick.git] / yardstick / benchmark / runners / dynamictp.py
1 # Copyright 2016: Nokia
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/constant.py
18
19 """A runner that searches for the max throughput with binary search
20 """
21
22 import os
23 import multiprocessing
24 import logging
25 import traceback
26 import time
27
28 from yardstick.benchmark.runners import base
29
30 LOG = logging.getLogger(__name__)
31
32
33 def _worker_process(queue, cls, method_name, scenario_cfg,
34                     context_cfg, aborted):  # pragma: no cover
35
36     runner_cfg = scenario_cfg['runner']
37     iterations = runner_cfg.get("iterations", 1)
38     interval = runner_cfg.get("interval", 1)
39     run_step = runner_cfg.get("run_step", "setup,run,teardown")
40     delta = runner_cfg.get("delta", 1000)
41     options_cfg = scenario_cfg['options']
42     initial_rate = options_cfg.get("pps", 1000000)
43     LOG.info("worker START, class %s", cls)
44
45     runner_cfg['runner_id'] = os.getpid()
46
47     benchmark = cls(scenario_cfg, context_cfg)
48     if "setup" in run_step:
49         benchmark.setup()
50
51     method = getattr(benchmark, method_name)
52
53     queue.put({'runner_id': runner_cfg['runner_id'],
54                'scenario_cfg': scenario_cfg,
55                'context_cfg': context_cfg})
56
57     if "run" in run_step:
58         iterator = 0
59         search_max = initial_rate
60         search_min = 0
61         while iterator < iterations:
62             search_min = int(search_min / 2)
63             scenario_cfg['options']['pps'] = search_max
64             search_max_found = False
65             max_throuput_found = False
66             sequence = 0
67
68             last_min_data = {}
69             last_min_data['packets_per_second'] = 0
70
71             while True:
72                 sequence += 1
73
74                 data = {}
75                 errors = ""
76                 too_high = False
77
78                 LOG.debug("sequence: %s search_min: %s search_max: %s",
79                           sequence, search_min, search_max)
80
81                 try:
82                     method(data)
83                 except AssertionError as assertion:
84                     LOG.warning("SLA validation failed: %s" % assertion.args)
85                     too_high = True
86                 except Exception as e:
87                     errors = traceback.format_exc()
88                     LOG.exception(e)
89
90                 actual_pps = data['packets_per_second']
91
92                 if too_high:
93                     search_max = actual_pps
94
95                     if not search_max_found:
96                         search_max_found = True
97                 else:
98                     last_min_data = data
99                     search_min = actual_pps
100
101                     # Check if the actual rate is well below the asked rate
102                     if scenario_cfg['options']['pps'] > actual_pps * 1.5:
103                         search_max = actual_pps
104                         LOG.debug("Sender reached max tput: %s", search_max)
105                     elif not search_max_found:
106                         search_max = int(actual_pps * 1.5)
107
108                 if ((search_max - search_min) < delta) or \
109                    (search_max <= search_min) or (10 <= sequence):
110                     if last_min_data['packets_per_second'] > 0:
111                         data = last_min_data
112
113                     benchmark_output = {
114                         'timestamp': time.time(),
115                         'sequence': sequence,
116                         'data': data,
117                         'errors': errors
118                     }
119
120                     record = {
121                         'runner_id': runner_cfg['runner_id'],
122                         'benchmark': benchmark_output
123                     }
124
125                     queue.put(record)
126                     max_throuput_found = True
127
128                 if (errors) or aborted.is_set() or max_throuput_found:
129                     LOG.info("worker END")
130                     break
131
132                 if not search_max_found:
133                     scenario_cfg['options']['pps'] = search_max
134                 else:
135                     scenario_cfg['options']['pps'] = \
136                         (search_max - search_min) / 2 + search_min
137
138                 time.sleep(interval)
139
140             iterator += 1
141             LOG.debug("iterator: %s iterations: %s", iterator, iterations)
142
143     if "teardown" in run_step:
144         benchmark.teardown()
145
146
147 class IterationRunner(base.Runner):
148     '''Run a scenario to find the max throughput
149
150 If the scenario ends before the time has elapsed, it will be started again.
151
152   Parameters
153     interval - time to wait between each scenario invocation
154         type:    int
155         unit:    seconds
156         default: 1 sec
157     delta - stop condition for the search.
158         type:    int
159         unit:    pps
160         default: 1000 pps
161     '''
162     __execution_type__ = 'Dynamictp'
163
164     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
165         self.process = multiprocessing.Process(
166             target=_worker_process,
167             args=(self.result_queue, cls, method, scenario_cfg,
168                   context_cfg, self.aborted))
169         self.process.start()