1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 import multiprocessing
18 from yardstick.common.messaging import consumer
19 from yardstick.common.messaging import payloads
20 from yardstick.common.messaging import producer
21 from yardstick.tests.functional import base
28 class DummyPayload(payloads.Payload):
29 REQUIRED_FIELDS = {'version', 'data'}
32 class DummyEndpoint(consumer.NotificationHandler):
34 def info(self, ctxt, **kwargs):
35 if ctxt['id'] in self._ctx_ids:
36 self._queue.put('Nr {}, data: {}, id: {}'.format(
37 self._id, kwargs['data'], ctxt['id']))
40 class DummyConsumer(consumer.MessagingConsumer):
42 def __init__(self, _id, ctx_ids, queue):
44 endpoints = [DummyEndpoint(_id, ctx_ids, queue)]
45 super(DummyConsumer, self).__init__(TOPIC, ctx_ids, endpoints)
48 class DummyProducer(producer.MessagingProducer):
52 def _run_consumer(_id, ctx_ids, queue):
53 _consumer = DummyConsumer(_id, ctx_ids, queue)
54 _consumer.start_rpc_server()
58 class MessagingTestCase(base.BaseFunctionalTestCase):
61 def _terminate_consumers(num_consumers, processes):
62 for i in range(num_consumers):
63 processes[i].terminate()
65 def test_run_five_consumers(self):
66 output_queue = multiprocessing.Queue()
70 producers = [DummyProducer(TOPIC, _id=ctx_1),
71 DummyProducer(TOPIC, _id=ctx_2)]
74 for i in range(num_consumers):
75 processes.append(multiprocessing.Process(
76 name='consumer_{}'.format(i),
78 args=(i, [ctx_1, ctx_2], output_queue)))
80 self.addCleanup(self._terminate_consumers, num_consumers, processes)
82 time.sleep(2) # Let consumers to create the listeners
83 for producer in producers:
84 for message in ['message 0', 'message 1']:
85 producer.send_message(METHOD_INFO,
86 DummyPayload(version=1, data=message))
88 time.sleep(2) # Let consumers attend the calls
90 while not output_queue.empty():
91 output.append(output_queue.get(True, 1))
93 self.assertEqual(num_consumers * 4, len(output))
94 msg_template = 'Nr {}, data: {}, id: {}'
95 for i in range(num_consumers):
96 for ctx in [ctx_1, ctx_2]:
97 for message in ['message 0', 'message 1']:
98 msg = msg_template.format(i, message, ctx)
99 self.assertIn(msg, output)