1 ##############################################################################
2 # Copyright (c) 2018 Nokia and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 import multiprocessing
16 from yardstick.benchmark.runners import arithmetic
17 from yardstick.common import exceptions as y_exc
20 class ArithmeticRunnerTest(unittest.TestCase):
21 class MyMethod(object):
22 SLA_VALIDATION_ERROR_SIDE_EFFECT = 1
23 BROAD_EXCEPTION_SIDE_EFFECT = 2
25 def __init__(self, side_effect=0):
27 self.side_effect = side_effect
29 def __call__(self, data):
31 data['my_key'] = self.count
32 if self.side_effect == self.SLA_VALIDATION_ERROR_SIDE_EFFECT:
33 raise y_exc.SLAValidationError(case_name='My Case',
34 error_msg='my error message')
35 elif self.side_effect == self.BROAD_EXCEPTION_SIDE_EFFECT:
36 raise y_exc.YardstickException
43 'iter_type': 'nested_for_loops',
62 self.benchmark = mock.Mock()
63 self.benchmark_cls = mock.Mock(return_value=self.benchmark)
65 def _assert_defaults__worker_process_run_setup_and_teardown(self):
66 self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
67 self.benchmark.setup.assert_called_once()
68 self.benchmark.teardown.assert_called_once()
70 @mock.patch.object(os, 'getpid')
71 @mock.patch.object(multiprocessing, 'Process')
72 def test__run_benchmark_called_with(self, mock_multiprocessing_process,
74 mock_os_getpid.return_value = 101
76 runner = arithmetic.ArithmeticRunner({})
77 benchmark_cls = mock.Mock()
78 runner._run_benchmark(benchmark_cls, 'my_method', self.scenario_cfg,
80 mock_multiprocessing_process.assert_called_once_with(
81 name='Arithmetic-some_type-101',
82 target=arithmetic._worker_process,
83 args=(runner.result_queue, benchmark_cls, 'my_method',
84 self.scenario_cfg, {}, runner.aborted, runner.output_queue))
86 @mock.patch.object(os, 'getpid')
87 def test__worker_process_runner_id(self, mock_os_getpid):
88 mock_os_getpid.return_value = 101
90 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
91 'my_method', self.scenario_cfg, {},
92 multiprocessing.Event(), mock.Mock())
94 self.assertEqual(self.scenario_cfg['runner']['runner_id'], 101)
96 @mock.patch.object(time, 'sleep')
97 def test__worker_process_calls_nested_for_loops(self, mock_time_sleep):
98 self.scenario_cfg['runner']['interval'] = 99
100 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
101 'my_method', self.scenario_cfg, {},
102 multiprocessing.Event(), mock.Mock())
104 self._assert_defaults__worker_process_run_setup_and_teardown()
105 self.benchmark.my_method.assert_has_calls([mock.call({})] * 8)
106 self.assertEqual(self.benchmark.my_method.call_count, 8)
107 mock_time_sleep.assert_has_calls([mock.call(99)] * 8)
108 self.assertEqual(mock_time_sleep.call_count, 8)
110 @mock.patch.object(time, 'sleep')
111 def test__worker_process_calls_tuple_loops(self, mock_time_sleep):
112 self.scenario_cfg['runner']['interval'] = 99
113 self.scenario_cfg['runner']['iter_type'] = 'tuple_loops'
115 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
116 'my_method', self.scenario_cfg, {},
117 multiprocessing.Event(), mock.Mock())
119 self._assert_defaults__worker_process_run_setup_and_teardown()
120 self.benchmark.my_method.assert_has_calls([mock.call({})] * 2)
121 self.assertEqual(self.benchmark.my_method.call_count, 2)
122 mock_time_sleep.assert_has_calls([mock.call(99)] * 2)
123 self.assertEqual(mock_time_sleep.call_count, 2)
125 def test__worker_process_stored_options_nested_for_loops(self):
126 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
127 'my_method', self.scenario_cfg, {},
128 multiprocessing.Event(), mock.Mock())
130 self.assertDictEqual(self.scenario_cfg['options'],
131 {'stride': 128, 'size': 2000})
133 def test__worker_process_stored_options_tuple_loops(self):
134 self.scenario_cfg['runner']['iter_type'] = 'tuple_loops'
136 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
137 'my_method', self.scenario_cfg, {},
138 multiprocessing.Event(), mock.Mock())
140 self.assertDictEqual(self.scenario_cfg['options'],
141 {'stride': 128, 'size': 1000})
143 def test__worker_process_aborted_set_early(self):
144 aborted = multiprocessing.Event()
146 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
147 'my_method', self.scenario_cfg, {},
148 aborted, mock.Mock())
150 self._assert_defaults__worker_process_run_setup_and_teardown()
151 self.assertEqual(self.scenario_cfg['options'], {})
152 self.benchmark.my_method.assert_not_called()
154 def test__worker_process_output_queue_nested_for_loops(self):
155 self.benchmark.my_method = self.MyMethod()
157 output_queue = multiprocessing.Queue()
158 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
159 'my_method', self.scenario_cfg, {},
160 multiprocessing.Event(), output_queue)
163 self._assert_defaults__worker_process_run_setup_and_teardown()
164 self.assertEqual(self.benchmark.my_method.count, 109)
166 while not output_queue.empty():
167 result.append(output_queue.get())
168 self.assertListEqual(result, [102, 103, 104, 105, 106, 107, 108, 109])
170 def test__worker_process_output_queue_tuple_loops(self):
171 self.scenario_cfg['runner']['iter_type'] = 'tuple_loops'
172 self.benchmark.my_method = self.MyMethod()
174 output_queue = multiprocessing.Queue()
175 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
176 'my_method', self.scenario_cfg, {},
177 multiprocessing.Event(), output_queue)
180 self._assert_defaults__worker_process_run_setup_and_teardown()
181 self.assertEqual(self.benchmark.my_method.count, 103)
183 while not output_queue.empty():
184 result.append(output_queue.get())
185 self.assertListEqual(result, [102, 103])
187 def test__worker_process_queue_nested_for_loops(self):
188 self.benchmark.my_method = self.MyMethod()
190 queue = multiprocessing.Queue()
191 timestamp = time.time()
192 arithmetic._worker_process(queue, self.benchmark_cls, 'my_method',
193 self.scenario_cfg, {},
194 multiprocessing.Event(), mock.Mock())
197 self._assert_defaults__worker_process_run_setup_and_teardown()
198 self.assertEqual(self.benchmark.my_method.count, 109)
200 while not queue.empty():
203 self.assertEqual(result['errors'], '')
204 self.assertEqual(result['data'], {'my_key': count + 101})
205 self.assertEqual(result['sequence'], count)
206 self.assertGreater(result['timestamp'], timestamp)
207 timestamp = result['timestamp']
209 def test__worker_process_queue_tuple_loops(self):
210 self.scenario_cfg['runner']['iter_type'] = 'tuple_loops'
211 self.benchmark.my_method = self.MyMethod()
213 queue = multiprocessing.Queue()
214 timestamp = time.time()
215 arithmetic._worker_process(queue, self.benchmark_cls, 'my_method',
216 self.scenario_cfg, {},
217 multiprocessing.Event(), mock.Mock())
220 self._assert_defaults__worker_process_run_setup_and_teardown()
221 self.assertEqual(self.benchmark.my_method.count, 103)
223 while not queue.empty():
226 self.assertEqual(result['errors'], '')
227 self.assertEqual(result['data'], {'my_key': count + 101})
228 self.assertEqual(result['sequence'], count)
229 self.assertGreater(result['timestamp'], timestamp)
230 timestamp = result['timestamp']
232 def test__worker_process_except_sla_validation_error_no_sla_cfg(self):
233 self.benchmark.my_method = mock.Mock(
234 side_effect=y_exc.SLAValidationError)
236 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
237 'my_method', self.scenario_cfg, {},
238 multiprocessing.Event(), mock.Mock())
240 self._assert_defaults__worker_process_run_setup_and_teardown()
241 self.assertEqual(self.benchmark.my_method.call_count, 8)
242 self.assertDictEqual(self.scenario_cfg['options'],
243 {'stride': 128, 'size': 2000})
245 def test__worker_process_output_on_sla_validation_error_no_sla_cfg(self):
246 self.benchmark.my_method = self.MyMethod(
247 side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT)
249 queue = multiprocessing.Queue()
250 output_queue = multiprocessing.Queue()
251 timestamp = time.time()
252 arithmetic._worker_process(queue, self.benchmark_cls, 'my_method',
253 self.scenario_cfg, {},
254 multiprocessing.Event(), output_queue)
257 self._assert_defaults__worker_process_run_setup_and_teardown()
258 self.assertEqual(self.benchmark.my_method.count, 109)
259 self.assertDictEqual(self.scenario_cfg['options'],
260 {'stride': 128, 'size': 2000})
262 while not queue.empty():
265 self.assertEqual(result['errors'], '')
266 self.assertEqual(result['data'], {'my_key': count + 101})
267 self.assertEqual(result['sequence'], count)
268 self.assertGreater(result['timestamp'], timestamp)
269 timestamp = result['timestamp']
270 self.assertEqual(count, 8)
271 self.assertTrue(output_queue.empty())
273 def test__worker_process_except_sla_validation_error_sla_cfg_monitor(self):
274 self.scenario_cfg['sla'] = {'action': 'monitor'}
275 self.benchmark.my_method = mock.Mock(
276 side_effect=y_exc.SLAValidationError)
278 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
279 'my_method', self.scenario_cfg, {},
280 multiprocessing.Event(), mock.Mock())
282 self._assert_defaults__worker_process_run_setup_and_teardown()
283 self.assertEqual(self.benchmark.my_method.call_count, 8)
284 self.assertDictEqual(self.scenario_cfg['options'],
285 {'stride': 128, 'size': 2000})
287 def test__worker_process_output_sla_validation_error_sla_cfg_monitor(self):
288 self.scenario_cfg['sla'] = {'action': 'monitor'}
289 self.benchmark.my_method = self.MyMethod(
290 side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT)
292 queue = multiprocessing.Queue()
293 output_queue = multiprocessing.Queue()
294 timestamp = time.time()
295 arithmetic._worker_process(queue, self.benchmark_cls, 'my_method',
296 self.scenario_cfg, {},
297 multiprocessing.Event(), output_queue)
300 self._assert_defaults__worker_process_run_setup_and_teardown()
301 self.assertEqual(self.benchmark.my_method.count, 109)
302 self.assertDictEqual(self.scenario_cfg['options'],
303 {'stride': 128, 'size': 2000})
305 while not queue.empty():
308 self.assertEqual(result['errors'],
309 ('My Case SLA validation failed. '
310 'Error: my error message',))
311 self.assertEqual(result['data'], {'my_key': count + 101})
312 self.assertEqual(result['sequence'], count)
313 self.assertGreater(result['timestamp'], timestamp)
314 timestamp = result['timestamp']
315 self.assertEqual(count, 8)
316 self.assertTrue(output_queue.empty())
318 def test__worker_process_raise_sla_validation_error_sla_cfg_assert(self):
319 self.scenario_cfg['sla'] = {'action': 'assert'}
320 self.benchmark.my_method = mock.Mock(
321 side_effect=y_exc.SLAValidationError)
323 with self.assertRaises(y_exc.SLAValidationError):
324 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
325 'my_method', self.scenario_cfg, {},
326 multiprocessing.Event(), mock.Mock())
327 self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
328 self.benchmark.my_method.assert_called_once()
329 self.benchmark.setup.assert_called_once()
330 self.benchmark.teardown.assert_not_called()
332 def test__worker_process_output_sla_validation_error_sla_cfg_assert(self):
333 self.scenario_cfg['sla'] = {'action': 'assert'}
334 self.benchmark.my_method = self.MyMethod(
335 side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT)
337 queue = multiprocessing.Queue()
338 output_queue = multiprocessing.Queue()
339 with self.assertRaisesRegexp(
340 y_exc.SLAValidationError,
341 'My Case SLA validation failed. Error: my error message'):
342 arithmetic._worker_process(queue, self.benchmark_cls, 'my_method',
343 self.scenario_cfg, {},
344 multiprocessing.Event(), output_queue)
347 self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
348 self.benchmark.setup.assert_called_once()
349 self.assertEqual(self.benchmark.my_method.count, 102)
350 self.benchmark.teardown.assert_not_called()
351 self.assertTrue(queue.empty())
352 self.assertTrue(output_queue.empty())
354 def test__worker_process_broad_exception_no_sla_cfg_early_exit(self):
355 self.benchmark.my_method = mock.Mock(
356 side_effect=y_exc.YardstickException)
358 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
359 'my_method', self.scenario_cfg, {},
360 multiprocessing.Event(), mock.Mock())
362 self._assert_defaults__worker_process_run_setup_and_teardown()
363 self.benchmark.my_method.assert_called_once()
364 self.assertDictEqual(self.scenario_cfg['options'],
365 {'stride': 64, 'size': 500})
367 def test__worker_process_output_on_broad_exception_no_sla_cfg(self):
368 self.benchmark.my_method = self.MyMethod(
369 side_effect=self.MyMethod.BROAD_EXCEPTION_SIDE_EFFECT)
371 queue = multiprocessing.Queue()
372 output_queue = multiprocessing.Queue()
373 timestamp = time.time()
374 arithmetic._worker_process(queue, self.benchmark_cls, 'my_method',
375 self.scenario_cfg, {},
376 multiprocessing.Event(), output_queue)
379 self._assert_defaults__worker_process_run_setup_and_teardown()
380 self.assertEqual(self.benchmark.my_method.count, 102)
381 self.assertDictEqual(self.scenario_cfg['options'],
382 {'stride': 64, 'size': 500})
383 self.assertEqual(queue.qsize(), 1)
385 self.assertGreater(result['timestamp'], timestamp)
386 self.assertEqual(result['data'], {'my_key': 102})
387 self.assertRegexpMatches(
389 'YardstickException: An unknown exception occurred.')
390 self.assertEqual(result['sequence'], 1)
391 self.assertTrue(output_queue.empty())
393 def test__worker_process_broad_exception_sla_cfg_not_none(self):
394 self.scenario_cfg['sla'] = {'action': 'some action'}
395 self.benchmark.my_method = mock.Mock(
396 side_effect=y_exc.YardstickException)
398 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
399 'my_method', self.scenario_cfg, {},
400 multiprocessing.Event(), mock.Mock())
402 self._assert_defaults__worker_process_run_setup_and_teardown()
403 self.assertEqual(self.benchmark.my_method.call_count, 8)
404 self.assertDictEqual(self.scenario_cfg['options'],
405 {'stride': 128, 'size': 2000})
407 def test__worker_process_output_on_broad_exception_sla_cfg_not_none(self):
408 self.scenario_cfg['sla'] = {'action': 'some action'}
409 self.benchmark.my_method = self.MyMethod(
410 side_effect=self.MyMethod.BROAD_EXCEPTION_SIDE_EFFECT)
412 queue = multiprocessing.Queue()
413 output_queue = multiprocessing.Queue()
414 timestamp = time.time()
415 arithmetic._worker_process(queue, self.benchmark_cls, 'my_method',
416 self.scenario_cfg, {},
417 multiprocessing.Event(), output_queue)
420 self._assert_defaults__worker_process_run_setup_and_teardown()
421 self.assertEqual(self.benchmark.my_method.count, 109)
422 self.assertDictEqual(self.scenario_cfg['options'],
423 {'stride': 128, 'size': 2000})
424 self.assertTrue(output_queue.empty())
426 while not queue.empty():
429 self.assertGreater(result['timestamp'], timestamp)
430 self.assertEqual(result['data'], {'my_key': count + 101})
431 self.assertRegexpMatches(
433 'YardstickException: An unknown exception occurred.')
434 self.assertEqual(result['sequence'], count)
436 def test__worker_process_benchmark_teardown_on_broad_exception(self):
437 self.benchmark.teardown = mock.Mock(
438 side_effect=y_exc.YardstickException)
440 with self.assertRaises(SystemExit) as raised:
441 arithmetic._worker_process(mock.Mock(), self.benchmark_cls,
442 'my_method', self.scenario_cfg, {},
443 multiprocessing.Event(), mock.Mock())
444 self.assertEqual(raised.exception.code, 1)
445 self._assert_defaults__worker_process_run_setup_and_teardown()
446 self.assertEqual(self.benchmark.my_method.call_count, 8)