1 ##############################################################################
2 # Copyright (c) 2018 Nokia and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 import multiprocessing
16 from yardstick.benchmark.runners import duration
17 from yardstick.common import exceptions as y_exc
20 class DurationRunnerTest(unittest.TestCase):
21 class MyMethod(object):
22 SLA_VALIDATION_ERROR_SIDE_EFFECT = 1
23 BROAD_EXCEPTION_SIDE_EFFECT = 2
25 def __init__(self, side_effect=0):
27 self.side_effect = side_effect
29 def __call__(self, data):
31 data['my_key'] = self.count
32 if self.side_effect == self.SLA_VALIDATION_ERROR_SIDE_EFFECT:
33 raise y_exc.SLAValidationError(case_name='My Case',
34 error_msg='my error message')
35 elif self.side_effect == self.BROAD_EXCEPTION_SIDE_EFFECT:
36 raise y_exc.YardstickException
41 'runner': {'interval': 0, "duration": 0},
45 self.benchmark = mock.Mock()
46 self.benchmark_cls = mock.Mock(return_value=self.benchmark)
48 def _assert_defaults__worker_run_setup_and_teardown(self):
49 self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
50 self.benchmark.setup.assert_called_once()
51 self.benchmark.teardown.assert_called_once()
53 def _assert_defaults__worker_run_one_iteration(self):
54 self.benchmark.pre_run_wait_time.assert_called_once_with(0)
55 self.benchmark.my_method.assert_called_once_with({})
56 self.benchmark.post_run_wait_time.assert_called_once_with(0)
58 @mock.patch.object(os, 'getpid')
59 @mock.patch.object(multiprocessing, 'Process')
60 def test__run_benchmark_called_with(self, mock_multiprocessing_process,
62 mock_os_getpid.return_value = 101
64 runner = duration.DurationRunner({})
65 benchmark_cls = mock.Mock()
66 runner._run_benchmark(benchmark_cls, 'my_method', self.scenario_cfg,
68 mock_multiprocessing_process.assert_called_once_with(
69 name='Duration-some_type-101',
70 target=duration._worker_process,
71 args=(runner.result_queue, benchmark_cls, 'my_method',
72 self.scenario_cfg, {}, runner.aborted, runner.output_queue))
74 @mock.patch.object(os, 'getpid')
75 def test__worker_process_runner_id(self, mock_os_getpid):
76 mock_os_getpid.return_value = 101
78 duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
79 self.scenario_cfg, {},
80 multiprocessing.Event(), mock.Mock())
82 self.assertEqual(self.scenario_cfg['runner']['runner_id'], 101)
84 def test__worker_process_called_with_cfg(self):
85 duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
86 self.scenario_cfg, {},
87 multiprocessing.Event(), mock.Mock())
89 self._assert_defaults__worker_run_setup_and_teardown()
90 self._assert_defaults__worker_run_one_iteration()
92 def test__worker_process_called_with_cfg_loop(self):
93 self.scenario_cfg['runner']['duration'] = 0.01
95 duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
96 self.scenario_cfg, {},
97 multiprocessing.Event(), mock.Mock())
99 self._assert_defaults__worker_run_setup_and_teardown()
100 self.assertGreater(self.benchmark.pre_run_wait_time.call_count, 0)
101 self.assertGreater(self.benchmark.my_method.call_count, 0)
102 self.assertGreater(self.benchmark.post_run_wait_time.call_count, 0)
104 def test__worker_process_called_without_cfg(self):
105 scenario_cfg = {'runner': {}}
106 aborted = multiprocessing.Event()
109 duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
110 scenario_cfg, {}, aborted, mock.Mock())
112 self.benchmark_cls.assert_called_once_with(scenario_cfg, {})
113 self.benchmark.setup.assert_called_once()
114 self.benchmark.pre_run_wait_time.assert_called_once_with(1)
115 self.benchmark.my_method.assert_called_once_with({})
116 self.benchmark.post_run_wait_time.assert_called_once_with(1)
117 self.benchmark.teardown.assert_called_once()
119 def test__worker_process_output_queue(self):
120 self.benchmark.my_method = mock.Mock(return_value='my_result')
122 output_queue = multiprocessing.Queue()
123 duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
124 self.scenario_cfg, {},
125 multiprocessing.Event(), output_queue)
128 self._assert_defaults__worker_run_setup_and_teardown()
129 self._assert_defaults__worker_run_one_iteration()
130 self.assertEquals(output_queue.get(), 'my_result')
132 def test__worker_process_output_queue_multiple_iterations(self):
133 self.scenario_cfg['runner']['duration'] = 0.01
134 self.benchmark.my_method = self.MyMethod()
136 output_queue = multiprocessing.Queue()
137 duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
138 self.scenario_cfg, {},
139 multiprocessing.Event(), output_queue)
142 self._assert_defaults__worker_run_setup_and_teardown()
143 self.assertGreater(self.benchmark.pre_run_wait_time.call_count, 0)
144 self.assertGreater(self.benchmark.my_method.count, 1)
145 self.assertGreater(self.benchmark.post_run_wait_time.call_count, 0)
148 while not output_queue.empty():
150 self.assertEquals(output_queue.get(), count)
152 def test__worker_process_queue(self):
153 self.benchmark.my_method = self.MyMethod()
155 queue = multiprocessing.Queue()
156 timestamp = time.time()
157 duration._worker_process(queue, self.benchmark_cls, 'my_method',
158 self.scenario_cfg, {},
159 multiprocessing.Event(), mock.Mock())
162 self._assert_defaults__worker_run_setup_and_teardown()
163 self.benchmark.pre_run_wait_time.assert_called_once_with(0)
164 self.benchmark.post_run_wait_time.assert_called_once_with(0)
167 self.assertGreater(result['timestamp'], timestamp)
168 self.assertEqual(result['errors'], '')
169 self.assertEqual(result['data'], {'my_key': 102})
170 self.assertEqual(result['sequence'], 1)
172 def test__worker_process_queue_multiple_iterations(self):
173 self.scenario_cfg['runner']['duration'] = 0.5
174 self.benchmark.my_method = self.MyMethod()
176 queue = multiprocessing.Queue()
177 timestamp = time.time()
178 duration._worker_process(queue, self.benchmark_cls, 'my_method',
179 self.scenario_cfg, {},
180 multiprocessing.Event(), mock.Mock())
183 self._assert_defaults__worker_run_setup_and_teardown()
184 self.assertGreater(self.benchmark.pre_run_wait_time.call_count, 0)
185 self.assertGreater(self.benchmark.my_method.count, 1)
186 self.assertGreater(self.benchmark.post_run_wait_time.call_count, 0)
189 while not queue.empty():
192 self.assertGreater(result['timestamp'], timestamp)
193 self.assertEqual(result['errors'], '')
194 self.assertEqual(result['data'], {'my_key': count + 101})
195 self.assertEqual(result['sequence'], count)
197 def test__worker_process_except_sla_validation_error_no_sla_cfg(self):
198 self.benchmark.my_method = mock.Mock(
199 side_effect=y_exc.SLAValidationError)
201 duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
202 self.scenario_cfg, {},
203 multiprocessing.Event(), mock.Mock())
205 self._assert_defaults__worker_run_setup_and_teardown()
206 self._assert_defaults__worker_run_one_iteration()
208 def test__worker_process_except_sla_validation_error_sla_cfg_monitor(self):
209 self.scenario_cfg['sla'] = {'action': 'monitor'}
210 self.benchmark.my_method = mock.Mock(
211 side_effect=y_exc.SLAValidationError)
213 duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
214 self.scenario_cfg, {},
215 multiprocessing.Event(), mock.Mock())
217 self._assert_defaults__worker_run_setup_and_teardown()
218 self._assert_defaults__worker_run_one_iteration()
220 def test__worker_process_raise_sla_validation_error_sla_cfg_default(self):
221 self.scenario_cfg['sla'] = {}
222 self.benchmark.my_method = mock.Mock(
223 side_effect=y_exc.SLAValidationError)
225 with self.assertRaises(y_exc.SLAValidationError):
226 duration._worker_process(mock.Mock(), self.benchmark_cls,
227 'my_method', self.scenario_cfg, {},
228 multiprocessing.Event(), mock.Mock())
230 self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
231 self.benchmark.setup.assert_called_once()
232 self.benchmark.pre_run_wait_time.assert_called_once_with(0)
233 self.benchmark.my_method.assert_called_once_with({})
235 def test__worker_process_raise_sla_validation_error_sla_cfg_assert(self):
236 self.scenario_cfg['sla'] = {'action': 'assert'}
237 self.benchmark.my_method = mock.Mock(
238 side_effect=y_exc.SLAValidationError)
240 with self.assertRaises(y_exc.SLAValidationError):
241 duration._worker_process(mock.Mock(), self.benchmark_cls,
242 'my_method', self.scenario_cfg, {},
243 multiprocessing.Event(), mock.Mock())
245 self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
246 self.benchmark.setup.assert_called_once()
247 self.benchmark.pre_run_wait_time.assert_called_once_with(0)
248 self.benchmark.my_method.assert_called_once_with({})
250 def test__worker_process_queue_on_sla_validation_error_monitor(self):
251 self.scenario_cfg['sla'] = {'action': 'monitor'}
252 self.benchmark.my_method = self.MyMethod(
253 side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT)
255 queue = multiprocessing.Queue()
256 timestamp = time.time()
257 duration._worker_process(queue, self.benchmark_cls, 'my_method',
258 self.scenario_cfg, {},
259 multiprocessing.Event(), mock.Mock())
262 self._assert_defaults__worker_run_setup_and_teardown()
263 self.benchmark.pre_run_wait_time.assert_called_once_with(0)
264 self.benchmark.post_run_wait_time.assert_called_once_with(0)
267 self.assertGreater(result['timestamp'], timestamp)
268 self.assertEqual(result['errors'], ('My Case SLA validation failed. '
269 'Error: my error message',))
270 self.assertEqual(result['data'], {'my_key': 102})
271 self.assertEqual(result['sequence'], 1)
273 def test__worker_process_broad_exception(self):
274 self.benchmark.my_method = mock.Mock(
275 side_effect=y_exc.YardstickException)
277 duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
278 self.scenario_cfg, {},
279 multiprocessing.Event(), mock.Mock())
281 self._assert_defaults__worker_run_setup_and_teardown()
282 self._assert_defaults__worker_run_one_iteration()
284 def test__worker_process_queue_on_broad_exception(self):
285 self.benchmark.my_method = self.MyMethod(
286 side_effect=self.MyMethod.BROAD_EXCEPTION_SIDE_EFFECT)
288 queue = multiprocessing.Queue()
289 timestamp = time.time()
290 duration._worker_process(queue, self.benchmark_cls, 'my_method',
291 self.scenario_cfg, {},
292 multiprocessing.Event(), mock.Mock())
295 self._assert_defaults__worker_run_setup_and_teardown()
296 self.benchmark.pre_run_wait_time.assert_called_once_with(0)
297 self.benchmark.post_run_wait_time.assert_called_once_with(0)
300 self.assertGreater(result['timestamp'], timestamp)
301 self.assertNotEqual(result['errors'], '')
302 self.assertEqual(result['data'], {'my_key': 102})
303 self.assertEqual(result['sequence'], 1)
305 def test__worker_process_benchmark_teardown_on_broad_exception(self):
306 self.benchmark.teardown = mock.Mock(
307 side_effect=y_exc.YardstickException)
309 with self.assertRaises(SystemExit) as raised:
310 duration._worker_process(mock.Mock(), self.benchmark_cls,
311 'my_method', self.scenario_cfg, {},
312 multiprocessing.Event(), mock.Mock())
313 self.assertEqual(raised.exception.code, 1)
314 self._assert_defaults__worker_run_setup_and_teardown()
315 self._assert_defaults__worker_run_one_iteration()