Merge "NSB PROX NFVi Test does not stop after reaching expected precision"
[yardstick.git] / yardstick / tests / unit / benchmark / runner / test_proxduration.py
1 # Copyright (c) 2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import mock
16 import unittest
17 import multiprocessing
18 import os
19
20 from yardstick.benchmark.runners import proxduration
21 from yardstick.common import constants
22 from yardstick.common import exceptions as y_exc
23
24
25 class ProxDurationRunnerTest(unittest.TestCase):
26
27     class MyMethod(object):
28         SLA_VALIDATION_ERROR_SIDE_EFFECT = 1
29         BROAD_EXCEPTION_SIDE_EFFECT = 2
30
31         def __init__(self, side_effect=0):
32             self.count = 101
33             self.side_effect = side_effect
34
35         def __call__(self, data):
36             self.count += 1
37             data['my_key'] = self.count
38             if self.side_effect == self.SLA_VALIDATION_ERROR_SIDE_EFFECT:
39                 raise y_exc.SLAValidationError(case_name='My Case',
40                                                error_msg='my error message')
41             elif self.side_effect == self.BROAD_EXCEPTION_SIDE_EFFECT:
42                 raise y_exc.YardstickException
43             return self.count
44
45     def setUp(self):
46         self.scenario_cfg = {
47             'runner': {'interval': 0, "duration": 0},
48             'type': 'some_type'
49         }
50
51         self.benchmark = mock.Mock()
52         self.benchmark_cls = mock.Mock(return_value=self.benchmark)
53
54     def _assert_defaults__worker_run_setup_and_teardown(self):
55         self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
56         self.benchmark.setup.assert_called_once()
57         self.benchmark.teardown.assert_called_once()
58
59     @mock.patch.object(os, 'getpid')
60     @mock.patch.object(multiprocessing, 'Process')
61     def test__run_benchmark_called_with(self, mock_multiprocessing_process,
62                                         mock_os_getpid):
63         mock_os_getpid.return_value = 101
64
65         runner = proxduration.ProxDurationRunner({})
66         benchmark_cls = mock.Mock()
67         runner._run_benchmark(benchmark_cls, 'my_method', self.scenario_cfg,
68                               {})
69         mock_multiprocessing_process.assert_called_once_with(
70             name='ProxDuration-some_type-101',
71             target=proxduration._worker_process,
72             args=(runner.result_queue, benchmark_cls, 'my_method',
73                   self.scenario_cfg, {}, runner.aborted, runner.output_queue))
74
75     @mock.patch.object(os, 'getpid')
76     def test__worker_process_runner_id(self, mock_os_getpid):
77         mock_os_getpid.return_value = 101
78         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
79         proxduration._worker_process(
80             mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
81             {}, multiprocessing.Event(), mock.Mock())
82
83         self.assertEqual(101, self.scenario_cfg['runner']['runner_id'])
84
85     def test__worker_process_called_with_cfg(self):
86         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
87         proxduration._worker_process(
88             mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
89             {}, multiprocessing.Event(), mock.Mock())
90
91         self._assert_defaults__worker_run_setup_and_teardown()
92
93     def test__worker_process_called_with_cfg_loop(self):
94         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
95         proxduration._worker_process(
96             mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
97             {}, multiprocessing.Event(), mock.Mock())
98
99         self._assert_defaults__worker_run_setup_and_teardown()
100         self.assertGreater(self.benchmark.my_method.call_count, 0)
101
102     def test__worker_process_called_without_cfg(self):
103         scenario_cfg = {'runner': {}}
104         aborted = multiprocessing.Event()
105         aborted.set()
106         proxduration._worker_process(
107             mock.Mock(), self.benchmark_cls, 'my_method', scenario_cfg, {},
108             aborted, mock.Mock())
109
110         self.benchmark_cls.assert_called_once_with(scenario_cfg, {})
111         self.benchmark.setup.assert_called_once()
112         self.benchmark.teardown.assert_called_once()
113
114     def test__worker_process_output_queue(self):
115         self.benchmark.my_method = mock.Mock(return_value='my_result')
116         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
117         output_queue = mock.Mock()
118         proxduration._worker_process(
119             mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
120             {}, multiprocessing.Event(), output_queue)
121
122         self._assert_defaults__worker_run_setup_and_teardown()
123         output_queue.put.assert_has_calls(
124             [mock.call('my_result', True, constants.QUEUE_PUT_TIMEOUT)])
125
126     def test__worker_process_output_queue_multiple_iterations(self):
127         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
128         self.benchmark.my_method = self.MyMethod()
129         output_queue = mock.Mock()
130         proxduration._worker_process(
131             mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
132             {}, multiprocessing.Event(), output_queue)
133
134         self._assert_defaults__worker_run_setup_and_teardown()
135         for idx in range(102, 101 + len(output_queue.method_calls)):
136             output_queue.put.assert_has_calls(
137                 [mock.call(idx, True, constants.QUEUE_PUT_TIMEOUT)])
138
139     def test__worker_process_queue(self):
140         self.benchmark.my_method = self.MyMethod()
141         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
142         queue = mock.Mock()
143         proxduration._worker_process(
144             queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
145             multiprocessing.Event(), mock.Mock())
146
147         self._assert_defaults__worker_run_setup_and_teardown()
148         benchmark_output = {'timestamp': mock.ANY,
149                             'sequence': 1,
150                             'data': {'my_key': 102},
151                             'errors': ''}
152         queue.put.assert_has_calls(
153             [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)])
154
155     def test__worker_process_queue_multiple_iterations(self):
156         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
157         self.benchmark.my_method = self.MyMethod()
158         queue = mock.Mock()
159         proxduration._worker_process(
160             queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
161             multiprocessing.Event(), mock.Mock())
162
163         self._assert_defaults__worker_run_setup_and_teardown()
164         for idx in range(102, 101 + len(queue.method_calls)):
165             benchmark_output = {'timestamp': mock.ANY,
166                                 'sequence': idx - 101,
167                                 'data': {'my_key': idx},
168                                 'errors': ''}
169             queue.put.assert_has_calls(
170                 [mock.call(benchmark_output, True,
171                            constants.QUEUE_PUT_TIMEOUT)])
172
173     def test__worker_process_except_sla_validation_error_no_sla_cfg(self):
174         self.benchmark.my_method = mock.Mock(
175             side_effect=y_exc.SLAValidationError)
176         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
177         proxduration._worker_process(
178             mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
179             {}, multiprocessing.Event(), mock.Mock())
180
181         self._assert_defaults__worker_run_setup_and_teardown()
182
183     @mock.patch.object(proxduration.LOG, 'warning')
184     def test__worker_process_except_sla_validation_error_sla_cfg_monitor(
185             self, *args):
186         self.scenario_cfg['sla'] = {'action': 'monitor'}
187         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
188         self.benchmark.my_method = mock.Mock(
189             side_effect=y_exc.SLAValidationError)
190         proxduration._worker_process(
191             mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
192             {}, multiprocessing.Event(), mock.Mock())
193
194         self._assert_defaults__worker_run_setup_and_teardown()
195
196     def test__worker_process_raise_sla_validation_error_sla_cfg_default(self):
197         self.scenario_cfg['sla'] = {}
198         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
199         self.benchmark.my_method = mock.Mock(
200             side_effect=y_exc.SLAValidationError)
201         with self.assertRaises(y_exc.SLAValidationError):
202             proxduration._worker_process(
203                 mock.Mock(), self.benchmark_cls, 'my_method',
204                 self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
205
206         self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
207         self.benchmark.setup.assert_called_once()
208         self.benchmark.my_method.assert_called_once_with({})
209
210     def test__worker_process_raise_sla_validation_error_sla_cfg_assert(self):
211         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
212         self.scenario_cfg['sla'] = {'action': 'assert'}
213         self.benchmark.my_method = mock.Mock(
214             side_effect=y_exc.SLAValidationError)
215
216         with self.assertRaises(y_exc.SLAValidationError):
217             proxduration._worker_process(
218                 mock.Mock(), self.benchmark_cls, 'my_method',
219                 self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
220
221         self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
222         self.benchmark.setup.assert_called_once()
223         self.benchmark.my_method.assert_called_once_with({})
224
225     @mock.patch.object(proxduration.LOG, 'warning')
226     def test__worker_process_queue_on_sla_validation_error_monitor(
227             self, *args):
228         self.scenario_cfg['sla'] = {'action': 'monitor'}
229         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
230         self.benchmark.my_method = self.MyMethod(
231             side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT)
232         queue = mock.Mock()
233         proxduration._worker_process(
234             queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
235             multiprocessing.Event(), mock.Mock())
236
237         self._assert_defaults__worker_run_setup_and_teardown()
238         benchmark_output = {'timestamp': mock.ANY,
239                             'sequence': 1,
240                             'data': {'my_key': 102},
241                             'errors': ('My Case SLA validation failed. '
242                                        'Error: my error message', )}
243         queue.put.assert_has_calls(
244             [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)])
245
246     @mock.patch.object(proxduration.LOG, 'exception')
247     def test__worker_process_broad_exception(self, *args):
248         self.benchmark.my_method = mock.Mock(
249             side_effect=y_exc.YardstickException)
250         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
251         proxduration._worker_process(
252             mock.Mock(), self.benchmark_cls, 'my_method',
253             self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
254
255         self._assert_defaults__worker_run_setup_and_teardown()
256
257     @mock.patch.object(proxduration.LOG, 'exception')
258     def test__worker_process_queue_on_broad_exception(self, *args):
259         self.benchmark.my_method = self.MyMethod(
260             side_effect=self.MyMethod.BROAD_EXCEPTION_SIDE_EFFECT)
261         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
262         queue = mock.Mock()
263         proxduration._worker_process(
264             queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
265             multiprocessing.Event(), mock.Mock())
266
267         benchmark_output = {'timestamp': mock.ANY,
268                             'sequence': 1,
269                             'data': {'my_key': 102},
270                             'errors': mock.ANY}
271         queue.put.assert_has_calls(
272             [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)])
273
274     @mock.patch.object(proxduration.LOG, 'exception')
275     def test__worker_process_benchmark_teardown_on_broad_exception(
276             self, *args):
277         self.benchmark.teardown = mock.Mock(
278             side_effect=y_exc.YardstickException)
279         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
280
281         with self.assertRaises(SystemExit) as raised:
282             proxduration._worker_process(
283                 mock.Mock(), self.benchmark_cls, 'my_method',
284                 self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
285         self.assertEqual(1, raised.exception.code)
286         self._assert_defaults__worker_run_setup_and_teardown()