Merge "Addition of Configurable Sampling and Configurable Confirmation Retry"
[yardstick.git] / yardstick / tests / unit / benchmark / runner / test_proxduration.py
1 ##############################################################################
2 # Copyright (c) 2018 Nokia and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import mock
11 import unittest
12 import multiprocessing
13 import os
14 import time
15
16 from yardstick.benchmark.runners import proxduration
17 from yardstick.common import exceptions as y_exc
18
19
20 class ProxDurationRunnerTest(unittest.TestCase):
21     class MyMethod(object):
22         SLA_VALIDATION_ERROR_SIDE_EFFECT = 1
23         BROAD_EXCEPTION_SIDE_EFFECT = 2
24
25         def __init__(self, side_effect=0):
26             self.count = 101
27             self.side_effect = side_effect
28
29         def __call__(self, data):
30             self.count += 1
31             data['my_key'] = self.count
32             if self.side_effect == self.SLA_VALIDATION_ERROR_SIDE_EFFECT:
33                 raise y_exc.SLAValidationError(case_name='My Case',
34                                                error_msg='my error message')
35             elif self.side_effect == self.BROAD_EXCEPTION_SIDE_EFFECT:
36                 raise y_exc.YardstickException
37             return self.count
38
39     def setUp(self):
40         self.scenario_cfg = {
41             'runner': {'interval': 0, "duration": 0},
42             'type': 'some_type'
43         }
44
45         self.benchmark = mock.Mock()
46         self.benchmark_cls = mock.Mock(return_value=self.benchmark)
47
48     def _assert_defaults__worker_run_setup_and_teardown(self):
49         self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
50         self.benchmark.setup.assert_called_once()
51         self.benchmark.teardown.assert_called_once()
52
53     @mock.patch.object(os, 'getpid')
54     @mock.patch.object(multiprocessing, 'Process')
55     def test__run_benchmark_called_with(self, mock_multiprocessing_process,
56                                         mock_os_getpid):
57         mock_os_getpid.return_value = 101
58
59         runner = proxduration.ProxDurationRunner({})
60         benchmark_cls = mock.Mock()
61         runner._run_benchmark(benchmark_cls, 'my_method', self.scenario_cfg,
62                               {})
63         mock_multiprocessing_process.assert_called_once_with(
64             name='ProxDuration-some_type-101',
65             target=proxduration._worker_process,
66             args=(runner.result_queue, benchmark_cls, 'my_method',
67                   self.scenario_cfg, {}, runner.aborted, runner.output_queue))
68
69     @mock.patch.object(os, 'getpid')
70     def test__worker_process_runner_id(self, mock_os_getpid):
71         mock_os_getpid.return_value = 101
72         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
73         proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
74                                  self.scenario_cfg, {},
75                                  multiprocessing.Event(), mock.Mock())
76
77         self.assertEqual(self.scenario_cfg['runner']['runner_id'], 101)
78
79     def test__worker_process_called_with_cfg(self):
80         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
81         proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
82                                  self.scenario_cfg, {},
83                                  multiprocessing.Event(), mock.Mock())
84
85         self._assert_defaults__worker_run_setup_and_teardown()
86
87     def test__worker_process_called_with_cfg_loop(self):
88         self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.01}
89         proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
90                                  self.scenario_cfg, {},
91                                  multiprocessing.Event(), mock.Mock())
92
93         self._assert_defaults__worker_run_setup_and_teardown()
94         self.assertGreater(self.benchmark.my_method.call_count, 2)
95
96     def test__worker_process_called_without_cfg(self):
97         scenario_cfg = {'runner': {}}
98
99         aborted = multiprocessing.Event()
100         aborted.set()
101
102         proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
103                                  scenario_cfg, {}, aborted, mock.Mock())
104
105         self.benchmark_cls.assert_called_once_with(scenario_cfg, {})
106         self.benchmark.setup.assert_called_once()
107         self.benchmark.teardown.assert_called_once()
108
109     def test__worker_process_output_queue(self):
110         self.benchmark.my_method = mock.Mock(return_value='my_result')
111         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
112         output_queue = multiprocessing.Queue()
113         proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
114                                  self.scenario_cfg, {},
115                                  multiprocessing.Event(), output_queue)
116         time.sleep(0.1)
117
118         self._assert_defaults__worker_run_setup_and_teardown()
119         self.assertEquals(output_queue.get(), 'my_result')
120
121     def test__worker_process_output_queue_multiple_iterations(self):
122         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
123         self.benchmark.my_method = self.MyMethod()
124
125         output_queue = multiprocessing.Queue()
126         proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
127                                  self.scenario_cfg, {},
128                                  multiprocessing.Event(), output_queue)
129         time.sleep(0.1)
130
131         self._assert_defaults__worker_run_setup_and_teardown()
132         self.assertGreater(self.benchmark.my_method.count, 103)
133
134         count = 101
135         while not output_queue.empty():
136             count += 1
137             self.assertEquals(output_queue.get(), count)
138
139     def test__worker_process_queue(self):
140         self.benchmark.my_method = self.MyMethod()
141         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
142         queue = multiprocessing.Queue()
143         timestamp = time.time()
144         proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
145                                  self.scenario_cfg, {},
146                                  multiprocessing.Event(), mock.Mock())
147         time.sleep(0.1)
148
149         self._assert_defaults__worker_run_setup_and_teardown()
150
151         result = queue.get()
152         self.assertGreater(result['timestamp'], timestamp)
153         self.assertEqual(result['errors'], '')
154         self.assertEqual(result['data'], {'my_key': 102})
155         self.assertEqual(result['sequence'], 1)
156
157     def test__worker_process_queue_multiple_iterations(self):
158         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
159         self.benchmark.my_method = self.MyMethod()
160
161         queue = multiprocessing.Queue()
162         timestamp = time.time()
163         proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
164                                  self.scenario_cfg, {},
165                                  multiprocessing.Event(), mock.Mock())
166         time.sleep(0.1)
167
168         self._assert_defaults__worker_run_setup_and_teardown()
169         self.assertGreater(self.benchmark.my_method.count, 103)
170
171         count = 0
172         while not queue.empty():
173             count += 1
174             result = queue.get()
175             self.assertGreater(result['timestamp'], timestamp)
176             self.assertEqual(result['errors'], '')
177             self.assertEqual(result['data'], {'my_key': count + 101})
178             self.assertEqual(result['sequence'], count)
179
180     def test__worker_process_except_sla_validation_error_no_sla_cfg(self):
181         self.benchmark.my_method = mock.Mock(
182             side_effect=y_exc.SLAValidationError)
183         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
184         proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
185                                  self.scenario_cfg, {},
186                                  multiprocessing.Event(), mock.Mock())
187
188         self._assert_defaults__worker_run_setup_and_teardown()
189
190     def test__worker_process_except_sla_validation_error_sla_cfg_monitor(self):
191         self.scenario_cfg['sla'] = {'action': 'monitor'}
192         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
193         self.benchmark.my_method = mock.Mock(
194             side_effect=y_exc.SLAValidationError)
195
196         proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
197                                  self.scenario_cfg, {},
198                                  multiprocessing.Event(), mock.Mock())
199
200         self._assert_defaults__worker_run_setup_and_teardown()
201
202     def test__worker_process_raise_sla_validation_error_sla_cfg_default(self):
203         self.scenario_cfg['sla'] = {}
204         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
205         self.benchmark.my_method = mock.Mock(
206             side_effect=y_exc.SLAValidationError)
207
208         with self.assertRaises(y_exc.SLAValidationError):
209             proxduration._worker_process(mock.Mock(), self.benchmark_cls,
210                                      'my_method', self.scenario_cfg, {},
211                                      multiprocessing.Event(), mock.Mock())
212
213         self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
214         self.benchmark.setup.assert_called_once()
215         self.benchmark.my_method.assert_called_once_with({})
216
217     def test__worker_process_raise_sla_validation_error_sla_cfg_assert(self):
218         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
219         self.scenario_cfg['sla'] = {'action': 'assert'}
220         self.benchmark.my_method = mock.Mock(
221             side_effect=y_exc.SLAValidationError)
222
223         with self.assertRaises(y_exc.SLAValidationError):
224             proxduration._worker_process(mock.Mock(), self.benchmark_cls,
225                                      'my_method', self.scenario_cfg, {},
226                                      multiprocessing.Event(), mock.Mock())
227
228         self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
229         self.benchmark.setup.assert_called_once()
230         self.benchmark.my_method.assert_called_once_with({})
231
232     def test__worker_process_queue_on_sla_validation_error_monitor(self):
233         self.scenario_cfg['sla'] = {'action': 'monitor'}
234         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
235         self.benchmark.my_method = self.MyMethod(
236             side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT)
237
238         queue = multiprocessing.Queue()
239         timestamp = time.time()
240         proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
241                                  self.scenario_cfg, {},
242                                  multiprocessing.Event(), mock.Mock())
243         time.sleep(0.1)
244
245         self._assert_defaults__worker_run_setup_and_teardown()
246
247         result = queue.get()
248         self.assertGreater(result['timestamp'], timestamp)
249         self.assertEqual(result['errors'], ('My Case SLA validation failed. '
250                                             'Error: my error message',))
251         self.assertEqual(result['data'], {'my_key': 102})
252         self.assertEqual(result['sequence'], 1)
253
254     def test__worker_process_broad_exception(self):
255         self.benchmark.my_method = mock.Mock(
256             side_effect=y_exc.YardstickException)
257         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
258         proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
259                                  self.scenario_cfg, {},
260                                  multiprocessing.Event(), mock.Mock())
261
262         self._assert_defaults__worker_run_setup_and_teardown()
263
264     def test__worker_process_queue_on_broad_exception(self):
265         self.benchmark.my_method = self.MyMethod(
266             side_effect=self.MyMethod.BROAD_EXCEPTION_SIDE_EFFECT)
267
268         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
269         queue = multiprocessing.Queue()
270         timestamp = time.time()
271         proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
272                                  self.scenario_cfg, {},
273                                  multiprocessing.Event(), mock.Mock())
274         time.sleep(0.1)
275
276         self._assert_defaults__worker_run_setup_and_teardown()
277
278         result = queue.get()
279         self.assertGreater(result['timestamp'], timestamp)
280         self.assertNotEqual(result['errors'], '')
281         self.assertEqual(result['data'], {'my_key': 102})
282         self.assertEqual(result['sequence'], 1)
283
284     def test__worker_process_benchmark_teardown_on_broad_exception(self):
285         self.benchmark.teardown = mock.Mock(
286             side_effect=y_exc.YardstickException)
287
288         self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
289
290         with self.assertRaises(SystemExit) as raised:
291             proxduration._worker_process(mock.Mock(), self.benchmark_cls,
292                                      'my_method', self.scenario_cfg, {},
293                                      multiprocessing.Event(), mock.Mock())
294         self.assertEqual(raised.exception.code, 1)
295         self._assert_defaults__worker_run_setup_and_teardown()