1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 import multiprocessing
19 from yardstick.common.messaging import consumer
20 from yardstick.common.messaging import payloads
21 from yardstick.common.messaging import producer
22 from yardstick.tests.functional import base
29 class DummyPayload(payloads.Payload):
30 REQUIRED_FIELDS = {'version', 'data'}
33 class DummyEndpoint(consumer.NotificationHandler):
35 def info(self, ctxt, **kwargs):
36 if ctxt['pid'] == self._ctx_pid:
37 self._queue.put('ID {}, data: {}'.format(self._id, kwargs['data']))
40 class DummyConsumer(consumer.MessagingConsumer):
42 def __init__(self, id, ctx_pid, queue):
44 endpoints = [DummyEndpoint(id, ctx_pid, queue)]
45 super(DummyConsumer, self).__init__(TOPIC, ctx_pid, endpoints)
48 class DummyProducer(producer.MessagingProducer):
52 def _run_consumer(id, ctx_pid, queue):
53 _consumer = DummyConsumer(id, ctx_pid, queue)
54 _consumer.start_rpc_server()
58 class MessagingTestCase(base.BaseFunctionalTestCase):
61 def _terminate_consumers(num_consumers, processes):
62 for i in range(num_consumers):
63 processes[i].terminate()
65 def test_run_five_consumers(self):
66 output_queue = multiprocessing.Queue()
69 producer = DummyProducer(TOPIC, pid=ctx_id)
72 for i in range(num_consumers):
73 processes.append(multiprocessing.Process(
74 name='consumer_{}'.format(i),
76 args=(i, ctx_id, output_queue)))
78 self.addCleanup(self._terminate_consumers, num_consumers, processes)
80 time.sleep(2) # Let consumers to create the listeners
81 producer.send_message(METHOD_INFO, DummyPayload(version=1,
83 producer.send_message(METHOD_INFO, DummyPayload(version=1,
85 time.sleep(2) # Let consumers attend the calls
88 while not output_queue.empty():
89 output.append(output_queue.get(True, 1))
91 self.assertEqual(num_consumers * 2, len(output))
92 for i in range(num_consumers):
93 self.assertIn('ID {}, data: {}'.format(1, 'message 0'), output)
94 self.assertIn('ID {}, data: {}'.format(1, 'message 1'), output)