Enable traffic generator PID in "NSPerf" scenario setup
[yardstick.git] / yardstick / tests / functional / common / messaging / test_messaging.py
1 # Copyright (c) 2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import multiprocessing
16 import time
17
18 from yardstick.common.messaging import consumer
19 from yardstick.common.messaging import payloads
20 from yardstick.common.messaging import producer
21 from yardstick.tests.functional import base
22
23
24 TOPIC = 'topic_MQ'
25 METHOD_INFO = 'info'
26
27
28 class DummyPayload(payloads.Payload):
29     REQUIRED_FIELDS = {'version', 'data'}
30
31
32 class DummyEndpoint(consumer.NotificationHandler):
33
34     def info(self, ctxt, **kwargs):
35         if ctxt['id'] in self._ctx_ids:
36             self._queue.put('Nr {}, data: {}, id: {}'.format(
37                 self._id, kwargs['data'], ctxt['id']))
38
39
40 class DummyConsumer(consumer.MessagingConsumer):
41
42     def __init__(self, _id, ctx_ids, queue):
43         self._id = _id
44         endpoints = [DummyEndpoint(_id, ctx_ids, queue)]
45         super(DummyConsumer, self).__init__(TOPIC, ctx_ids, endpoints)
46
47
48 class DummyProducer(producer.MessagingProducer):
49     pass
50
51
52 def _run_consumer(_id, ctx_ids, queue):
53     _consumer = DummyConsumer(_id, ctx_ids, queue)
54     _consumer.start_rpc_server()
55     _consumer.wait()
56
57
58 class MessagingTestCase(base.BaseFunctionalTestCase):
59
60     @staticmethod
61     def _terminate_consumers(num_consumers, processes):
62         for i in range(num_consumers):
63             processes[i].terminate()
64
65     def test_run_five_consumers(self):
66         output_queue = multiprocessing.Queue()
67         num_consumers = 10
68         ctx_1 = 100001
69         ctx_2 = 100002
70         producers = [DummyProducer(TOPIC, _id=ctx_1),
71                      DummyProducer(TOPIC, _id=ctx_2)]
72
73         processes = []
74         for i in range(num_consumers):
75             processes.append(multiprocessing.Process(
76                 name='consumer_{}'.format(i),
77                 target=_run_consumer,
78                 args=(i, [ctx_1, ctx_2], output_queue)))
79             processes[i].start()
80         self.addCleanup(self._terminate_consumers, num_consumers, processes)
81
82         time.sleep(2)  # Let consumers to create the listeners
83         for producer in producers:
84             for message in ['message 0', 'message 1']:
85                 producer.send_message(METHOD_INFO,
86                                       DummyPayload(version=1, data=message))
87
88         time.sleep(2)  # Let consumers attend the calls
89         output = []
90         while not output_queue.empty():
91             output.append(output_queue.get(True, 1))
92
93         self.assertEqual(num_consumers * 4, len(output))
94         msg_template = 'Nr {}, data: {}, id: {}'
95         for i in range(num_consumers):
96             for ctx in [ctx_1, ctx_2]:
97                 for message in ['message 0', 'message 1']:
98                     msg = msg_template.format(i, message, ctx)
99                     self.assertIn(msg, output)