1 ##############################################################################
2 # Copyright (c) 2018 Nokia and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 import multiprocessing
16 from yardstick.benchmark.runners import proxduration
17 from yardstick.common import exceptions as y_exc
20 class ProxDurationRunnerTest(unittest.TestCase):
21 class MyMethod(object):
22 SLA_VALIDATION_ERROR_SIDE_EFFECT = 1
23 BROAD_EXCEPTION_SIDE_EFFECT = 2
25 def __init__(self, side_effect=0):
27 self.side_effect = side_effect
29 def __call__(self, data):
31 data['my_key'] = self.count
32 if self.side_effect == self.SLA_VALIDATION_ERROR_SIDE_EFFECT:
33 raise y_exc.SLAValidationError(case_name='My Case',
34 error_msg='my error message')
35 elif self.side_effect == self.BROAD_EXCEPTION_SIDE_EFFECT:
36 raise y_exc.YardstickException
41 'runner': {'interval': 0, "duration": 0},
45 self.benchmark = mock.Mock()
46 self.benchmark_cls = mock.Mock(return_value=self.benchmark)
48 def _assert_defaults__worker_run_setup_and_teardown(self):
49 self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
50 self.benchmark.setup.assert_called_once()
51 self.benchmark.teardown.assert_called_once()
53 @mock.patch.object(os, 'getpid')
54 @mock.patch.object(multiprocessing, 'Process')
55 def test__run_benchmark_called_with(self, mock_multiprocessing_process,
57 mock_os_getpid.return_value = 101
59 runner = proxduration.ProxDurationRunner({})
60 benchmark_cls = mock.Mock()
61 runner._run_benchmark(benchmark_cls, 'my_method', self.scenario_cfg,
63 mock_multiprocessing_process.assert_called_once_with(
64 name='ProxDuration-some_type-101',
65 target=proxduration._worker_process,
66 args=(runner.result_queue, benchmark_cls, 'my_method',
67 self.scenario_cfg, {}, runner.aborted, runner.output_queue))
69 @mock.patch.object(os, 'getpid')
70 def test__worker_process_runner_id(self, mock_os_getpid):
71 mock_os_getpid.return_value = 101
72 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
73 proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
74 self.scenario_cfg, {},
75 multiprocessing.Event(), mock.Mock())
77 self.assertEqual(self.scenario_cfg['runner']['runner_id'], 101)
79 def test__worker_process_called_with_cfg(self):
80 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
81 proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
82 self.scenario_cfg, {},
83 multiprocessing.Event(), mock.Mock())
85 self._assert_defaults__worker_run_setup_and_teardown()
87 def test__worker_process_called_with_cfg_loop(self):
88 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.01}
89 proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
90 self.scenario_cfg, {},
91 multiprocessing.Event(), mock.Mock())
93 self._assert_defaults__worker_run_setup_and_teardown()
94 self.assertGreater(self.benchmark.my_method.call_count, 2)
96 def test__worker_process_called_without_cfg(self):
97 scenario_cfg = {'runner': {}}
99 aborted = multiprocessing.Event()
102 proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
103 scenario_cfg, {}, aborted, mock.Mock())
105 self.benchmark_cls.assert_called_once_with(scenario_cfg, {})
106 self.benchmark.setup.assert_called_once()
107 self.benchmark.teardown.assert_called_once()
109 def test__worker_process_output_queue(self):
110 self.benchmark.my_method = mock.Mock(return_value='my_result')
111 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
112 output_queue = multiprocessing.Queue()
113 proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
114 self.scenario_cfg, {},
115 multiprocessing.Event(), output_queue)
118 self._assert_defaults__worker_run_setup_and_teardown()
119 self.assertEquals(output_queue.get(), 'my_result')
121 def test__worker_process_output_queue_multiple_iterations(self):
122 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
123 self.benchmark.my_method = self.MyMethod()
125 output_queue = multiprocessing.Queue()
126 proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
127 self.scenario_cfg, {},
128 multiprocessing.Event(), output_queue)
131 self._assert_defaults__worker_run_setup_and_teardown()
132 self.assertGreater(self.benchmark.my_method.count, 103)
135 while not output_queue.empty():
137 self.assertEquals(output_queue.get(), count)
139 def test__worker_process_queue(self):
140 self.benchmark.my_method = self.MyMethod()
141 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
142 queue = multiprocessing.Queue()
143 timestamp = time.time()
144 proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
145 self.scenario_cfg, {},
146 multiprocessing.Event(), mock.Mock())
149 self._assert_defaults__worker_run_setup_and_teardown()
152 self.assertGreater(result['timestamp'], timestamp)
153 self.assertEqual(result['errors'], '')
154 self.assertEqual(result['data'], {'my_key': 102})
155 self.assertEqual(result['sequence'], 1)
157 def test__worker_process_queue_multiple_iterations(self):
158 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
159 self.benchmark.my_method = self.MyMethod()
161 queue = multiprocessing.Queue()
162 timestamp = time.time()
163 proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
164 self.scenario_cfg, {},
165 multiprocessing.Event(), mock.Mock())
168 self._assert_defaults__worker_run_setup_and_teardown()
169 self.assertGreater(self.benchmark.my_method.count, 103)
172 while not queue.empty():
175 self.assertGreater(result['timestamp'], timestamp)
176 self.assertEqual(result['errors'], '')
177 self.assertEqual(result['data'], {'my_key': count + 101})
178 self.assertEqual(result['sequence'], count)
180 def test__worker_process_except_sla_validation_error_no_sla_cfg(self):
181 self.benchmark.my_method = mock.Mock(
182 side_effect=y_exc.SLAValidationError)
183 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
184 proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
185 self.scenario_cfg, {},
186 multiprocessing.Event(), mock.Mock())
188 self._assert_defaults__worker_run_setup_and_teardown()
190 def test__worker_process_except_sla_validation_error_sla_cfg_monitor(self):
191 self.scenario_cfg['sla'] = {'action': 'monitor'}
192 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
193 self.benchmark.my_method = mock.Mock(
194 side_effect=y_exc.SLAValidationError)
196 proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
197 self.scenario_cfg, {},
198 multiprocessing.Event(), mock.Mock())
200 self._assert_defaults__worker_run_setup_and_teardown()
202 def test__worker_process_raise_sla_validation_error_sla_cfg_default(self):
203 self.scenario_cfg['sla'] = {}
204 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
205 self.benchmark.my_method = mock.Mock(
206 side_effect=y_exc.SLAValidationError)
208 with self.assertRaises(y_exc.SLAValidationError):
209 proxduration._worker_process(mock.Mock(), self.benchmark_cls,
210 'my_method', self.scenario_cfg, {},
211 multiprocessing.Event(), mock.Mock())
213 self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
214 self.benchmark.setup.assert_called_once()
215 self.benchmark.my_method.assert_called_once_with({})
217 def test__worker_process_raise_sla_validation_error_sla_cfg_assert(self):
218 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
219 self.scenario_cfg['sla'] = {'action': 'assert'}
220 self.benchmark.my_method = mock.Mock(
221 side_effect=y_exc.SLAValidationError)
223 with self.assertRaises(y_exc.SLAValidationError):
224 proxduration._worker_process(mock.Mock(), self.benchmark_cls,
225 'my_method', self.scenario_cfg, {},
226 multiprocessing.Event(), mock.Mock())
228 self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
229 self.benchmark.setup.assert_called_once()
230 self.benchmark.my_method.assert_called_once_with({})
232 def test__worker_process_queue_on_sla_validation_error_monitor(self):
233 self.scenario_cfg['sla'] = {'action': 'monitor'}
234 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
235 self.benchmark.my_method = self.MyMethod(
236 side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT)
238 queue = multiprocessing.Queue()
239 timestamp = time.time()
240 proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
241 self.scenario_cfg, {},
242 multiprocessing.Event(), mock.Mock())
245 self._assert_defaults__worker_run_setup_and_teardown()
248 self.assertGreater(result['timestamp'], timestamp)
249 self.assertEqual(result['errors'], ('My Case SLA validation failed. '
250 'Error: my error message',))
251 self.assertEqual(result['data'], {'my_key': 102})
252 self.assertEqual(result['sequence'], 1)
254 def test__worker_process_broad_exception(self):
255 self.benchmark.my_method = mock.Mock(
256 side_effect=y_exc.YardstickException)
257 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
258 proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
259 self.scenario_cfg, {},
260 multiprocessing.Event(), mock.Mock())
262 self._assert_defaults__worker_run_setup_and_teardown()
264 def test__worker_process_queue_on_broad_exception(self):
265 self.benchmark.my_method = self.MyMethod(
266 side_effect=self.MyMethod.BROAD_EXCEPTION_SIDE_EFFECT)
268 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
269 queue = multiprocessing.Queue()
270 timestamp = time.time()
271 proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
272 self.scenario_cfg, {},
273 multiprocessing.Event(), mock.Mock())
276 self._assert_defaults__worker_run_setup_and_teardown()
279 self.assertGreater(result['timestamp'], timestamp)
280 self.assertNotEqual(result['errors'], '')
281 self.assertEqual(result['data'], {'my_key': 102})
282 self.assertEqual(result['sequence'], 1)
284 def test__worker_process_benchmark_teardown_on_broad_exception(self):
285 self.benchmark.teardown = mock.Mock(
286 side_effect=y_exc.YardstickException)
288 self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
290 with self.assertRaises(SystemExit) as raised:
291 proxduration._worker_process(mock.Mock(), self.benchmark_cls,
292 'my_method', self.scenario_cfg, {},
293 multiprocessing.Event(), mock.Mock())
294 self.assertEqual(raised.exception.code, 1)
295 self._assert_defaults__worker_run_setup_and_teardown()