1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 import multiprocessing
22 from yardstick.benchmark.runners import iteration_ipc
23 from yardstick.common import messaging
24 from yardstick.common.messaging import payloads
25 from yardstick.tests.unit import base as ut_base
28 class RunnerIterationIPCEndpointTestCase(ut_base.BaseUnitTestCase):
31 self._id = uuid.uuid1().int
32 self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int]
33 self._queue = multiprocessing.Queue()
34 self.runner = iteration_ipc.RunnerIterationIPCEndpoint(
35 self._id, self._ctx_ids, self._queue)
36 self._kwargs = {'version': 1, 'iteration': 10, 'kpi': {}}
37 self._pload_dict = payloads.TrafficGeneratorPayload.dict_to_obj(
38 self._kwargs).obj_to_dict()
40 def test_tg_method_started(self):
42 ctxt = {'id': self._ctx_ids[0]}
43 self.runner.tg_method_started(ctxt, **self._kwargs)
47 while not self._queue.empty():
48 output.append(self._queue.get(True, 1))
50 self.assertEqual(1, len(output))
51 self.assertEqual(self._ctx_ids[0], output[0]['id'])
52 self.assertEqual(messaging.TG_METHOD_STARTED, output[0]['action'])
53 self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
55 def test_tg_method_finished(self):
57 ctxt = {'id': self._ctx_ids[0]}
58 self.runner.tg_method_finished(ctxt, **self._kwargs)
62 while not self._queue.empty():
63 output.append(self._queue.get(True, 1))
65 self.assertEqual(1, len(output))
66 self.assertEqual(self._ctx_ids[0], output[0]['id'])
67 self.assertEqual(messaging.TG_METHOD_FINISHED, output[0]['action'])
68 self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
70 def test_tg_method_iteration(self):
72 ctxt = {'id': self._ctx_ids[0]}
73 self.runner.tg_method_iteration(ctxt, **self._kwargs)
77 while not self._queue.empty():
78 output.append(self._queue.get(True, 1))
80 self.assertEqual(1, len(output))
81 self.assertEqual(self._ctx_ids[0], output[0]['id'])
82 self.assertEqual(messaging.TG_METHOD_ITERATION, output[0]['action'])
83 self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
86 class RunnerIterationIPCConsumerTestCase(ut_base.BaseUnitTestCase):
89 self._id = uuid.uuid1().int
90 self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int]
91 self.consumer = iteration_ipc.RunnerIterationIPCConsumer(
92 self._id, self._ctx_ids)
93 self.consumer._queue = mock.Mock()
96 self.assertEqual({self._ctx_ids[0]: [], self._ctx_ids[1]: []},
97 self.consumer._kpi_per_id)
99 def test_is_all_kpis_received_in_iteration(self):
100 payload = payloads.TrafficGeneratorPayload(
101 version=1, iteration=1, kpi={})
102 msg1 = {'action': messaging.TG_METHOD_ITERATION,
103 'id': self._ctx_ids[0], 'payload': payload}
104 msg2 = {'action': messaging.TG_METHOD_ITERATION,
105 'id': self._ctx_ids[1], 'payload': payload}
106 self.consumer.iteration_index = 1
108 self.consumer._queue.empty.side_effect = [False, True]
109 self.consumer._queue.get.return_value = msg1
110 self.assertFalse(self.consumer.is_all_kpis_received_in_iteration())
112 self.consumer._queue.empty.side_effect = [False, True]
113 self.consumer._queue.get.return_value = msg2
114 self.assertTrue(self.consumer.is_all_kpis_received_in_iteration())
117 class IterationIPCRunnerTestCase(ut_base.BaseUnitTestCase):
119 @mock.patch.object(iteration_ipc, '_worker_process')
120 @mock.patch.object(os, 'getpid', return_value=12345678)
121 @mock.patch.object(multiprocessing, 'Process', return_value=mock.Mock())
122 def test__run_benchmark(self, mock_process, mock_getpid, mock_worker):
124 scenario_cfg = {'type': 'scenario_type'}
125 context_cfg = 'context_cfg'
126 name = '%s-%s-%s' % ('IterationIPC', 'scenario_type', 12345678)
127 runner = iteration_ipc.IterationIPCRunner(mock.ANY)
128 mock_getpid.reset_mock()
130 runner._run_benchmark('class', method, scenario_cfg, context_cfg)
131 mock_process.assert_called_once_with(
134 args=(runner.result_queue, 'class', method, scenario_cfg,
135 context_cfg, runner.aborted, runner.output_queue))
136 mock_getpid.assert_called_once()