Merge "Cleanup CGNAPT unit tests"
[yardstick.git] / yardstick / benchmark / runners / dynamictp.py
1 # Copyright 2016: Nokia
2 # All Rights Reserved.
3 #
4 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
5 #    not use this file except in compliance with the License. You may obtain
6 #    a copy of the License at
7 #
8 #         http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #    Unless required by applicable law or agreed to in writing, software
11 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 #    License for the specific language governing permissions and limitations
14 #    under the License.
15
16 # yardstick comment: this is a modified copy of
17 # rally/rally/benchmark/runners/constant.py
18
19 """A runner that searches for the max throughput with binary search
20 """
21
22 import logging
23 import multiprocessing
24 import time
25 import traceback
26
27 import os
28
29 from yardstick.benchmark.runners import base
30 from yardstick.common import exceptions as y_exc
31
32 LOG = logging.getLogger(__name__)
33
34
35 def _worker_process(queue, cls, method_name, scenario_cfg,
36                     context_cfg, aborted):  # pragma: no cover
37
38     runner_cfg = scenario_cfg['runner']
39     iterations = runner_cfg.get("iterations", 1)
40     interval = runner_cfg.get("interval", 1)
41     run_step = runner_cfg.get("run_step", "setup,run,teardown")
42     delta = runner_cfg.get("delta", 1000)
43     options_cfg = scenario_cfg['options']
44     initial_rate = options_cfg.get("pps", 1000000)
45     LOG.info("worker START, class %s", cls)
46
47     runner_cfg['runner_id'] = os.getpid()
48
49     benchmark = cls(scenario_cfg, context_cfg)
50     if "setup" in run_step:
51         benchmark.setup()
52
53     method = getattr(benchmark, method_name)
54
55     queue.put({'runner_id': runner_cfg['runner_id'],
56                'scenario_cfg': scenario_cfg,
57                'context_cfg': context_cfg})
58
59     if "run" in run_step:
60         iterator = 0
61         search_max = initial_rate
62         search_min = 0
63         while iterator < iterations:
64             search_min = int(search_min / 2)
65             scenario_cfg['options']['pps'] = search_max
66             search_max_found = False
67             max_throuput_found = False
68             sequence = 0
69
70             last_min_data = {'packets_per_second': 0}
71
72             while True:
73                 sequence += 1
74
75                 data = {}
76                 errors = ""
77                 too_high = False
78
79                 LOG.debug("sequence: %s search_min: %s search_max: %s",
80                           sequence, search_min, search_max)
81
82                 try:
83                     method(data)
84                 except y_exc.SLAValidationError as error:
85                     LOG.warning("SLA validation failed: %s", error.args)
86                     too_high = True
87                 except Exception as e:  # pylint: disable=broad-except
88                     errors = traceback.format_exc()
89                     LOG.exception(e)
90
91                 actual_pps = data['packets_per_second']
92
93                 if too_high:
94                     search_max = actual_pps
95
96                     if not search_max_found:
97                         search_max_found = True
98                 else:
99                     last_min_data = data
100                     search_min = actual_pps
101
102                     # Check if the actual rate is well below the asked rate
103                     if scenario_cfg['options']['pps'] > actual_pps * 1.5:
104                         search_max = actual_pps
105                         LOG.debug("Sender reached max tput: %s", search_max)
106                     elif not search_max_found:
107                         search_max = int(actual_pps * 1.5)
108
109                 if ((search_max - search_min) < delta) or \
110                    (search_max <= search_min) or (10 <= sequence):
111                     if last_min_data['packets_per_second'] > 0:
112                         data = last_min_data
113
114                     benchmark_output = {
115                         'timestamp': time.time(),
116                         'sequence': sequence,
117                         'data': data,
118                         'errors': errors
119                     }
120
121                     record = {
122                         'runner_id': runner_cfg['runner_id'],
123                         'benchmark': benchmark_output
124                     }
125
126                     queue.put(record)
127                     max_throuput_found = True
128
129                 if errors or aborted.is_set() or max_throuput_found:
130                     LOG.info("worker END")
131                     break
132
133                 if not search_max_found:
134                     scenario_cfg['options']['pps'] = search_max
135                 else:
136                     scenario_cfg['options']['pps'] = \
137                         (search_max - search_min) / 2 + search_min
138
139                 time.sleep(interval)
140
141             iterator += 1
142             LOG.debug("iterator: %s iterations: %s", iterator, iterations)
143
144     if "teardown" in run_step:
145         try:
146             benchmark.teardown()
147         except Exception:
148             # catch any exception in teardown and convert to simple exception
149             # never pass exceptions back to multiprocessing, because some exceptions can
150             # be unpicklable
151             # https://bugs.python.org/issue9400
152             LOG.exception("")
153             raise SystemExit(1)
154
155     LOG.debug("queue.qsize() = %s", queue.qsize())
156
157
158 class IterationRunner(base.Runner):
159     """Run a scenario to find the max throughput
160
161 If the scenario ends before the time has elapsed, it will be started again.
162
163   Parameters
164     interval - time to wait between each scenario invocation
165         type:    int
166         unit:    seconds
167         default: 1 sec
168     delta - stop condition for the search.
169         type:    int
170         unit:    pps
171         default: 1000 pps
172     """
173     __execution_type__ = 'Dynamictp'
174
175     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
176         name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
177         self.process = multiprocessing.Process(
178             name=name,
179             target=_worker_process,
180             args=(self.result_queue, cls, method, scenario_cfg,
181                   context_cfg, self.aborted))
182         self.process.start()