Merge "Add UT: DurationRunner worker normal operation"
[yardstick.git] / yardstick / tests / unit / benchmark / runner / test_duration.py
1 ##############################################################################
2 # Copyright (c) 2018 Nokia and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import mock
11 import unittest
12 import multiprocessing
13 import os
14 import time
15
16 from yardstick.benchmark.runners import duration
17
18
19 class DurationRunnerTest(unittest.TestCase):
20     class MyMethod(object):
21         def __init__(self):
22             self.count = 101
23
24         def __call__(self, data):
25             self.count += 1
26             data['my_key'] = self.count
27             return self.count
28
29     def setUp(self):
30         self.scenario_cfg = {
31             'runner': {'interval': 0, "duration": 0},
32             'type': 'some_type'
33         }
34
35         self.benchmark = mock.Mock()
36         self.benchmark_cls = mock.Mock(return_value=self.benchmark)
37
38     def _assert_defaults__worker_run_setup_and_teardown(self):
39         self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
40         self.benchmark.setup.assert_called_once()
41         self.benchmark.teardown.assert_called_once()
42
43     def _assert_defaults__worker_run_one_iteration(self):
44         self.benchmark.pre_run_wait_time.assert_called_once_with(0)
45         self.benchmark.my_method.assert_called_once_with({})
46         self.benchmark.post_run_wait_time.assert_called_once_with(0)
47
48     @mock.patch.object(os, 'getpid')
49     @mock.patch.object(multiprocessing, 'Process')
50     def test__run_benchmark_called_with(self, mock_multiprocessing_process,
51                                         mock_os_getpid):
52         mock_os_getpid.return_value = 101
53
54         runner = duration.DurationRunner({})
55         benchmark_cls = mock.Mock()
56         runner._run_benchmark(benchmark_cls, 'my_method', self.scenario_cfg,
57                               {})
58         mock_multiprocessing_process.assert_called_once_with(
59             name='Duration-some_type-101',
60             target=duration._worker_process,
61             args=(runner.result_queue, benchmark_cls, 'my_method',
62                   self.scenario_cfg, {}, runner.aborted, runner.output_queue))
63
64     @mock.patch.object(os, 'getpid')
65     def test__worker_process_runner_id(self, mock_os_getpid):
66         mock_os_getpid.return_value = 101
67
68         duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
69                                  self.scenario_cfg, {},
70                                  multiprocessing.Event(), mock.Mock())
71
72         self.assertEqual(self.scenario_cfg['runner']['runner_id'], 101)
73
74     def test__worker_process_called_with_cfg(self):
75         duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
76                                  self.scenario_cfg, {},
77                                  multiprocessing.Event(), mock.Mock())
78
79         self._assert_defaults__worker_run_setup_and_teardown()
80         self._assert_defaults__worker_run_one_iteration()
81
82     def test__worker_process_called_with_cfg_loop(self):
83         self.scenario_cfg['runner']['duration'] = 0.01
84
85         duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
86                                  self.scenario_cfg, {},
87                                  multiprocessing.Event(), mock.Mock())
88
89         self._assert_defaults__worker_run_setup_and_teardown()
90         self.assertGreater(self.benchmark.pre_run_wait_time.call_count, 2)
91         self.assertGreater(self.benchmark.my_method.call_count, 2)
92         self.assertGreater(self.benchmark.post_run_wait_time.call_count, 2)
93
94     def test__worker_process_called_without_cfg(self):
95         scenario_cfg = {'runner': {}}
96         aborted = multiprocessing.Event()
97         aborted.set()
98
99         duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
100                                  scenario_cfg, {}, aborted, mock.Mock())
101
102         self.benchmark_cls.assert_called_once_with(scenario_cfg, {})
103         self.benchmark.setup.assert_called_once()
104         self.benchmark.pre_run_wait_time.assert_called_once_with(1)
105         self.benchmark.my_method.assert_called_once_with({})
106         self.benchmark.post_run_wait_time.assert_called_once_with(1)
107         self.benchmark.teardown.assert_called_once()
108
109     def test__worker_process_output_queue(self):
110         self.benchmark.my_method = mock.Mock(return_value='my_result')
111
112         output_queue = multiprocessing.Queue()
113         duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
114                                  self.scenario_cfg, {},
115                                  multiprocessing.Event(), output_queue)
116         time.sleep(0.1)
117
118         self._assert_defaults__worker_run_setup_and_teardown()
119         self._assert_defaults__worker_run_one_iteration()
120         self.assertEquals(output_queue.get(), 'my_result')
121
122     def test__worker_process_output_queue_multiple_iterations(self):
123         self.scenario_cfg['runner']['duration'] = 0.01
124         self.benchmark.my_method = self.MyMethod()
125
126         output_queue = multiprocessing.Queue()
127         duration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
128                                  self.scenario_cfg, {},
129                                  multiprocessing.Event(), output_queue)
130         time.sleep(0.1)
131
132         self._assert_defaults__worker_run_setup_and_teardown()
133         self.assertGreater(self.benchmark.pre_run_wait_time.call_count, 2)
134         self.assertGreater(self.benchmark.my_method.count, 103)
135         self.assertGreater(self.benchmark.post_run_wait_time.call_count, 2)
136
137         count = 101
138         while not output_queue.empty():
139             count += 1
140             self.assertEquals(output_queue.get(), count)
141
142     def test__worker_process_queue(self):
143         self.benchmark.my_method = self.MyMethod()
144
145         queue = multiprocessing.Queue()
146         timestamp = time.time()
147         duration._worker_process(queue, self.benchmark_cls, 'my_method',
148                                  self.scenario_cfg, {},
149                                  multiprocessing.Event(), mock.Mock())
150         time.sleep(0.1)
151
152         self._assert_defaults__worker_run_setup_and_teardown()
153         self.benchmark.pre_run_wait_time.assert_called_once_with(0)
154         self.benchmark.post_run_wait_time.assert_called_once_with(0)
155
156         result = queue.get()
157         self.assertGreater(result['timestamp'], timestamp)
158         self.assertEqual(result['errors'], '')
159         self.assertEqual(result['data'], {'my_key': 102})
160         self.assertEqual(result['sequence'], 1)
161
162     def test__worker_process_queue_multiple_iterations(self):
163         self.scenario_cfg['runner']['duration'] = 0.5
164         self.benchmark.my_method = self.MyMethod()
165
166         queue = multiprocessing.Queue()
167         timestamp = time.time()
168         duration._worker_process(queue, self.benchmark_cls, 'my_method',
169                                  self.scenario_cfg, {},
170                                  multiprocessing.Event(), mock.Mock())
171         time.sleep(0.1)
172
173         self._assert_defaults__worker_run_setup_and_teardown()
174         self.assertGreater(self.benchmark.pre_run_wait_time.call_count, 2)
175         self.assertGreater(self.benchmark.my_method.count, 103)
176         self.assertGreater(self.benchmark.post_run_wait_time.call_count, 2)
177
178         count = 0
179         while not queue.empty():
180             count += 1
181             result = queue.get()
182             self.assertGreater(result['timestamp'], timestamp)
183             self.assertEqual(result['errors'], '')
184             self.assertEqual(result['data'], {'my_key': count + 101})
185             self.assertEqual(result['sequence'], count)