MessagingConsumer accepts messages from multiple producers 01/55701/4
authorRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
Fri, 13 Apr 2018 17:40:38 +0000 (18:40 +0100)
committerRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
Thu, 19 Apr 2018 08:02:39 +0000 (09:02 +0100)
The messaging consumer now can store a list of PID of several producers.
The notification handler can compare the procedence of a message from
a list of PID.

JIRA: YARDSTICK-1074

Change-Id: I193f83c2b471e5bf1298ac728be52533aded0c1a
Signed-off-by: Rodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
yardstick/common/messaging/consumer.py
yardstick/tests/functional/common/messaging/test_messaging.py

index a0feeb3..24ec6f1 100644 (file)
@@ -29,9 +29,9 @@ LOG = logging.getLogger(__name__)
 class NotificationHandler(object):
     """Abstract class to define a endpoint object for a MessagingConsumer"""
 
-    def __init__(self, id, ctx_pid, queue):
-        self._id = id
-        self._ctx_pid = ctx_pid
+    def __init__(self, _id, ctx_pids, queue):
+        self._id = _id
+        self._ctx_pids = ctx_pids
         self._queue = queue
 
 
@@ -43,12 +43,12 @@ class MessagingConsumer(object):
     the messages published by a `MessagingNotifier`.
     """
 
-    def __init__(self, topic, pid, endpoints, fanout=True):
+    def __init__(self, topic, pids, endpoints, fanout=True):
         """Init function.
 
         :param topic: (string) MQ exchange topic
-        :param pid: (int) PID of the process implementing the MQ Notifier which
-                    will be in the message context
+        :param pids: (list of int) list of PIDs of the processes implementing
+                     the MQ Notifier which will be in the message context
         :param endpoints: (list of class) list of classes implementing the
                           methods (see `MessagingNotifier.send_message) used by
                           the Notifier
@@ -58,7 +58,7 @@ class MessagingConsumer(object):
         :returns: `MessagingConsumer` class object
         """
 
-        self._pid = pid
+        self._pids = pids
         self._endpoints = endpoints
         self._transport = oslo_messaging.get_rpc_transport(
             cfg.CONF, url=messaging.TRANSPORT_URL)
index 96deeb3..9987434 100644 (file)
@@ -13,7 +13,6 @@
 # limitations under the License.
 
 import multiprocessing
-import os
 import time
 
 from yardstick.common.messaging import consumer
@@ -33,24 +32,25 @@ class DummyPayload(payloads.Payload):
 class DummyEndpoint(consumer.NotificationHandler):
 
     def info(self, ctxt, **kwargs):
-        if ctxt['pid'] == self._ctx_pid:
-            self._queue.put('ID {}, data: {}'.format(self._id, kwargs['data']))
+        if ctxt['pid'] in self._ctx_pids:
+            self._queue.put('ID {}, data: {}, pid: {}'.format(
+                self._id, kwargs['data'], ctxt['pid']))
 
 
 class DummyConsumer(consumer.MessagingConsumer):
 
-    def __init__(self, id, ctx_pid, queue):
-        self._id = id
-        endpoints = [DummyEndpoint(id, ctx_pid, queue)]
-        super(DummyConsumer, self).__init__(TOPIC, ctx_pid, endpoints)
+    def __init__(self, _id, ctx_pids, queue):
+        self._id = _id
+        endpoints = [DummyEndpoint(_id, ctx_pids, queue)]
+        super(DummyConsumer, self).__init__(TOPIC, ctx_pids, endpoints)
 
 
 class DummyProducer(producer.MessagingProducer):
     pass
 
 
-def _run_consumer(id, ctx_pid, queue):
-    _consumer = DummyConsumer(id, ctx_pid, queue)
+def _run_consumer(_id, ctx_pids, queue):
+    _consumer = DummyConsumer(_id, ctx_pids, queue)
     _consumer.start_rpc_server()
     _consumer.wait()
 
@@ -65,30 +65,35 @@ class MessagingTestCase(base.BaseFunctionalTestCase):
     def test_run_five_consumers(self):
         output_queue = multiprocessing.Queue()
         num_consumers = 10
-        ctx_id = os.getpid()
-        producer = DummyProducer(TOPIC, pid=ctx_id)
+        ctx_1 = 100001
+        ctx_2 = 100002
+        producers = [DummyProducer(TOPIC, pid=ctx_1),
+                     DummyProducer(TOPIC, pid=ctx_2)]
 
         processes = []
         for i in range(num_consumers):
             processes.append(multiprocessing.Process(
                 name='consumer_{}'.format(i),
                 target=_run_consumer,
-                args=(i, ctx_id, output_queue)))
+                args=(i, [ctx_1, ctx_2], output_queue)))
             processes[i].start()
         self.addCleanup(self._terminate_consumers, num_consumers, processes)
 
         time.sleep(2)  # Let consumers to create the listeners
-        producer.send_message(METHOD_INFO, DummyPayload(version=1,
-                                                        data='message 0'))
-        producer.send_message(METHOD_INFO, DummyPayload(version=1,
-                                                        data='message 1'))
-        time.sleep(2)  # Let consumers attend the calls
+        for producer in producers:
+            for message in ['message 0', 'message 1']:
+                producer.send_message(METHOD_INFO,
+                                      DummyPayload(version=1, data=message))
 
+        time.sleep(2)  # Let consumers attend the calls
         output = []
         while not output_queue.empty():
             output.append(output_queue.get(True, 1))
 
-        self.assertEqual(num_consumers * 2, len(output))
+        self.assertEqual(num_consumers * 4, len(output))
+        msg_template = 'ID {}, data: {}, pid: {}'
         for i in range(num_consumers):
-            self.assertIn('ID {}, data: {}'.format(1, 'message 0'), output)
-            self.assertIn('ID {}, data: {}'.format(1, 'message 1'), output)
+            for ctx in [ctx_1, ctx_2]:
+                for message in ['message 0', 'message 1']:
+                    msg = msg_template.format(i, message, ctx)
+                    self.assertIn(msg, output)