Merge "Add UT: DurationRunner worker normal operation"
[yardstick.git] / yardstick / tests / unit / benchmark / runner / test_iteration_ipc.py
1 # Copyright (c) 2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import multiprocessing
16 import time
17 import os
18 import uuid
19
20 import mock
21
22 from yardstick.benchmark.runners import iteration_ipc
23 from yardstick.common import messaging
24 from yardstick.common.messaging import payloads
25 from yardstick.tests.unit import base as ut_base
26
27
28 class RunnerIterationIPCEndpointTestCase(ut_base.BaseUnitTestCase):
29
30     def setUp(self):
31         self._id = uuid.uuid1().int
32         self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int]
33         self._queue = multiprocessing.Queue()
34         self.runner = iteration_ipc.RunnerIterationIPCEndpoint(
35             self._id, self._ctx_ids, self._queue)
36         self._kwargs = {'version': 1, 'iteration': 10, 'kpi': {}}
37         self._pload_dict = payloads.TrafficGeneratorPayload.dict_to_obj(
38             self._kwargs).obj_to_dict()
39
40     def test_tg_method_started(self):
41         self._queue.empty()
42         ctxt = {'id': self._ctx_ids[0]}
43         self.runner.tg_method_started(ctxt, **self._kwargs)
44         time.sleep(0.2)
45
46         output = []
47         while not self._queue.empty():
48             output.append(self._queue.get(True, 1))
49
50         self.assertEqual(1, len(output))
51         self.assertEqual(self._ctx_ids[0], output[0]['id'])
52         self.assertEqual(messaging.TG_METHOD_STARTED, output[0]['action'])
53         self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
54
55     def test_tg_method_finished(self):
56         self._queue.empty()
57         ctxt = {'id': self._ctx_ids[0]}
58         self.runner.tg_method_finished(ctxt, **self._kwargs)
59         time.sleep(0.2)
60
61         output = []
62         while not self._queue.empty():
63             output.append(self._queue.get(True, 1))
64
65         self.assertEqual(1, len(output))
66         self.assertEqual(self._ctx_ids[0], output[0]['id'])
67         self.assertEqual(messaging.TG_METHOD_FINISHED, output[0]['action'])
68         self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
69
70     def test_tg_method_iteration(self):
71         self._queue.empty()
72         ctxt = {'id': self._ctx_ids[0]}
73         self.runner.tg_method_iteration(ctxt, **self._kwargs)
74         time.sleep(0.2)
75
76         output = []
77         while not self._queue.empty():
78             output.append(self._queue.get(True, 1))
79
80         self.assertEqual(1, len(output))
81         self.assertEqual(self._ctx_ids[0], output[0]['id'])
82         self.assertEqual(messaging.TG_METHOD_ITERATION, output[0]['action'])
83         self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
84
85
86 class RunnerIterationIPCConsumerTestCase(ut_base.BaseUnitTestCase):
87
88     def setUp(self):
89         self._id = uuid.uuid1().int
90         self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int]
91         self.consumer = iteration_ipc.RunnerIterationIPCConsumer(
92             self._id, self._ctx_ids)
93         self.consumer._queue = mock.Mock()
94
95     def test__init(self):
96         self.assertEqual({self._ctx_ids[0]: [], self._ctx_ids[1]: []},
97                          self.consumer._kpi_per_id)
98
99     def test_is_all_kpis_received_in_iteration(self):
100         payload = payloads.TrafficGeneratorPayload(
101             version=1, iteration=1, kpi={})
102         msg1 = {'action': messaging.TG_METHOD_ITERATION,
103                 'id': self._ctx_ids[0], 'payload': payload}
104         msg2 = {'action': messaging.TG_METHOD_ITERATION,
105                 'id': self._ctx_ids[1], 'payload': payload}
106         self.consumer.iteration_index = 1
107
108         self.consumer._queue.empty.side_effect = [False, True]
109         self.consumer._queue.get.return_value = msg1
110         self.assertFalse(self.consumer.is_all_kpis_received_in_iteration())
111
112         self.consumer._queue.empty.side_effect = [False, True]
113         self.consumer._queue.get.return_value = msg2
114         self.assertTrue(self.consumer.is_all_kpis_received_in_iteration())
115
116
117 class IterationIPCRunnerTestCase(ut_base.BaseUnitTestCase):
118
119     @mock.patch.object(iteration_ipc, '_worker_process')
120     @mock.patch.object(os, 'getpid', return_value=12345678)
121     @mock.patch.object(multiprocessing, 'Process', return_value=mock.Mock())
122     def test__run_benchmark(self, mock_process, mock_getpid, mock_worker):
123         method = 'method'
124         scenario_cfg = {'type': 'scenario_type'}
125         context_cfg = 'context_cfg'
126         name = '%s-%s-%s' % ('IterationIPC', 'scenario_type', 12345678)
127         runner = iteration_ipc.IterationIPCRunner(mock.ANY)
128         mock_getpid.reset_mock()
129
130         runner._run_benchmark('class', method, scenario_cfg, context_cfg)
131         mock_process.assert_called_once_with(
132             name=name,
133             target=mock_worker,
134             args=(runner.result_queue, 'class', method, scenario_cfg,
135                   context_cfg, runner.aborted, runner.output_queue))
136         mock_getpid.assert_called_once()