1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 import multiprocessing
20 from yardstick.benchmark.runners import proxduration
21 from yardstick.common import constants
22 from yardstick.common import exceptions as y_exc
25 class ProxDurationRunnerTest(unittest.TestCase):
27 class MyMethod(object):
28 SLA_VALIDATION_ERROR_SIDE_EFFECT = 1
29 BROAD_EXCEPTION_SIDE_EFFECT = 2
31 def __init__(self, side_effect=0):
33 self.side_effect = side_effect
35 def __call__(self, data):
37 data['my_key'] = self.count
38 if self.side_effect == self.SLA_VALIDATION_ERROR_SIDE_EFFECT:
39 raise y_exc.SLAValidationError(case_name='My Case',
40 error_msg='my error message')
41 elif self.side_effect == self.BROAD_EXCEPTION_SIDE_EFFECT:
42 raise y_exc.YardstickException
47 'runner': {'interval': 0, "duration": 0},
51 self.benchmark = mock.Mock()
52 self.benchmark_cls = mock.Mock(return_value=self.benchmark)
54 def _assert_defaults__worker_run_setup_and_teardown(self):
55 self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
56 self.benchmark.setup.assert_called_once()
57 self.benchmark.teardown.assert_called_once()
59 @mock.patch.object(os, 'getpid')
60 @mock.patch.object(multiprocessing, 'Process')
61 def test__run_benchmark_called_with(self, mock_multiprocessing_process,
63 mock_os_getpid.return_value = 101
65 runner = proxduration.ProxDurationRunner({})
66 benchmark_cls = mock.Mock()
67 runner._run_benchmark(benchmark_cls, 'my_method', self.scenario_cfg,
69 mock_multiprocessing_process.assert_called_once_with(
70 name='ProxDuration-some_type-101',
71 target=proxduration._worker_process,
72 args=(runner.result_queue, benchmark_cls, 'my_method',
73 self.scenario_cfg, {}, runner.aborted, runner.output_queue))
75 @mock.patch.object(os, 'getpid')
76 def test__worker_process_runner_id(self, mock_os_getpid):
77 mock_os_getpid.return_value = 101
78 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
79 proxduration._worker_process(
80 mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
81 {}, multiprocessing.Event(), mock.Mock())
83 self.assertEqual(101, self.scenario_cfg['runner']['runner_id'])
85 def test__worker_process_called_with_cfg(self):
86 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
87 proxduration._worker_process(
88 mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
89 {}, multiprocessing.Event(), mock.Mock())
91 self._assert_defaults__worker_run_setup_and_teardown()
93 def test__worker_process_called_with_cfg_loop(self):
94 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
95 proxduration._worker_process(
96 mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
97 {}, multiprocessing.Event(), mock.Mock())
99 self._assert_defaults__worker_run_setup_and_teardown()
100 self.assertGreater(self.benchmark.my_method.call_count, 0)
102 def test__worker_process_called_without_cfg(self):
103 scenario_cfg = {'runner': {}}
104 aborted = multiprocessing.Event()
106 proxduration._worker_process(
107 mock.Mock(), self.benchmark_cls, 'my_method', scenario_cfg, {},
108 aborted, mock.Mock())
110 self.benchmark_cls.assert_called_once_with(scenario_cfg, {})
111 self.benchmark.setup.assert_called_once()
112 self.benchmark.teardown.assert_called_once()
114 def test__worker_process_output_queue(self):
115 self.benchmark.my_method = mock.Mock(return_value='my_result')
116 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
117 output_queue = mock.Mock()
118 proxduration._worker_process(
119 mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
120 {}, multiprocessing.Event(), output_queue)
122 self._assert_defaults__worker_run_setup_and_teardown()
123 output_queue.put.assert_has_calls(
124 [mock.call('my_result', True, constants.QUEUE_PUT_TIMEOUT)])
126 def test__worker_process_output_queue_multiple_iterations(self):
127 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
128 self.benchmark.my_method = self.MyMethod()
129 output_queue = mock.Mock()
130 proxduration._worker_process(
131 mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
132 {}, multiprocessing.Event(), output_queue)
134 self._assert_defaults__worker_run_setup_and_teardown()
135 for idx in range(102, 101 + len(output_queue.method_calls)):
136 output_queue.put.assert_has_calls(
137 [mock.call(idx, True, constants.QUEUE_PUT_TIMEOUT)])
139 def test__worker_process_queue(self):
140 self.benchmark.my_method = self.MyMethod()
141 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
143 proxduration._worker_process(
144 queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
145 multiprocessing.Event(), mock.Mock())
147 self._assert_defaults__worker_run_setup_and_teardown()
148 benchmark_output = {'timestamp': mock.ANY,
150 'data': {'my_key': 102},
152 queue.put.assert_has_calls(
153 [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)])
155 def test__worker_process_queue_multiple_iterations(self):
156 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
157 self.benchmark.my_method = self.MyMethod()
159 proxduration._worker_process(
160 queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
161 multiprocessing.Event(), mock.Mock())
163 self._assert_defaults__worker_run_setup_and_teardown()
164 for idx in range(102, 101 + len(queue.method_calls)):
165 benchmark_output = {'timestamp': mock.ANY,
166 'sequence': idx - 101,
167 'data': {'my_key': idx},
169 queue.put.assert_has_calls(
170 [mock.call(benchmark_output, True,
171 constants.QUEUE_PUT_TIMEOUT)])
173 def test__worker_process_except_sla_validation_error_no_sla_cfg(self):
174 self.benchmark.my_method = mock.Mock(
175 side_effect=y_exc.SLAValidationError)
176 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
177 proxduration._worker_process(
178 mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
179 {}, multiprocessing.Event(), mock.Mock())
181 self._assert_defaults__worker_run_setup_and_teardown()
183 @mock.patch.object(proxduration.LOG, 'warning')
184 def test__worker_process_except_sla_validation_error_sla_cfg_monitor(
186 self.scenario_cfg['sla'] = {'action': 'monitor'}
187 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
188 self.benchmark.my_method = mock.Mock(
189 side_effect=y_exc.SLAValidationError)
190 proxduration._worker_process(
191 mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
192 {}, multiprocessing.Event(), mock.Mock())
194 self._assert_defaults__worker_run_setup_and_teardown()
196 def test__worker_process_raise_sla_validation_error_sla_cfg_default(self):
197 self.scenario_cfg['sla'] = {}
198 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
199 self.benchmark.my_method = mock.Mock(
200 side_effect=y_exc.SLAValidationError)
201 with self.assertRaises(y_exc.SLAValidationError):
202 proxduration._worker_process(
203 mock.Mock(), self.benchmark_cls, 'my_method',
204 self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
206 self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
207 self.benchmark.setup.assert_called_once()
208 self.benchmark.my_method.assert_called_once_with({})
210 def test__worker_process_raise_sla_validation_error_sla_cfg_assert(self):
211 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
212 self.scenario_cfg['sla'] = {'action': 'assert'}
213 self.benchmark.my_method = mock.Mock(
214 side_effect=y_exc.SLAValidationError)
216 with self.assertRaises(y_exc.SLAValidationError):
217 proxduration._worker_process(
218 mock.Mock(), self.benchmark_cls, 'my_method',
219 self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
221 self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
222 self.benchmark.setup.assert_called_once()
223 self.benchmark.my_method.assert_called_once_with({})
225 @mock.patch.object(proxduration.LOG, 'warning')
226 def test__worker_process_queue_on_sla_validation_error_monitor(
228 self.scenario_cfg['sla'] = {'action': 'monitor'}
229 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
230 self.benchmark.my_method = self.MyMethod(
231 side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT)
233 proxduration._worker_process(
234 queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
235 multiprocessing.Event(), mock.Mock())
237 self._assert_defaults__worker_run_setup_and_teardown()
238 benchmark_output = {'timestamp': mock.ANY,
240 'data': {'my_key': 102},
241 'errors': ('My Case SLA validation failed. '
242 'Error: my error message', )}
243 queue.put.assert_has_calls(
244 [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)])
246 @mock.patch.object(proxduration.LOG, 'exception')
247 def test__worker_process_broad_exception(self, *args):
248 self.benchmark.my_method = mock.Mock(
249 side_effect=y_exc.YardstickException)
250 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
251 proxduration._worker_process(
252 mock.Mock(), self.benchmark_cls, 'my_method',
253 self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
255 self._assert_defaults__worker_run_setup_and_teardown()
257 @mock.patch.object(proxduration.LOG, 'exception')
258 def test__worker_process_queue_on_broad_exception(self, *args):
259 self.benchmark.my_method = self.MyMethod(
260 side_effect=self.MyMethod.BROAD_EXCEPTION_SIDE_EFFECT)
261 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
263 proxduration._worker_process(
264 queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
265 multiprocessing.Event(), mock.Mock())
267 benchmark_output = {'timestamp': mock.ANY,
269 'data': {'my_key': 102},
271 queue.put.assert_has_calls(
272 [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)])
274 @mock.patch.object(proxduration.LOG, 'exception')
275 def test__worker_process_benchmark_teardown_on_broad_exception(
277 self.benchmark.teardown = mock.Mock(
278 side_effect=y_exc.YardstickException)
279 self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
281 with self.assertRaises(SystemExit) as raised:
282 proxduration._worker_process(
283 mock.Mock(), self.benchmark_cls, 'my_method',
284 self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
285 self.assertEqual(1, raised.exception.code)
286 self._assert_defaults__worker_run_setup_and_teardown()