MessagingConsumer accepts messages from multiple producers
[yardstick.git] / yardstick / common / messaging / consumer.py
1 # Copyright (c) 2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import abc
16 import logging
17
18 from oslo_config import cfg
19 import oslo_messaging
20 import six
21
22 from yardstick.common import messaging
23
24
25 LOG = logging.getLogger(__name__)
26
27
28 @six.add_metaclass(abc.ABCMeta)
29 class NotificationHandler(object):
30     """Abstract class to define a endpoint object for a MessagingConsumer"""
31
32     def __init__(self, _id, ctx_pids, queue):
33         self._id = _id
34         self._ctx_pids = ctx_pids
35         self._queue = queue
36
37
38 @six.add_metaclass(abc.ABCMeta)
39 class MessagingConsumer(object):
40     """Abstract class to implement a MQ consumer
41
42     This abstract class allows a class implementing this interface to receive
43     the messages published by a `MessagingNotifier`.
44     """
45
46     def __init__(self, topic, pids, endpoints, fanout=True):
47         """Init function.
48
49         :param topic: (string) MQ exchange topic
50         :param pids: (list of int) list of PIDs of the processes implementing
51                      the MQ Notifier which will be in the message context
52         :param endpoints: (list of class) list of classes implementing the
53                           methods (see `MessagingNotifier.send_message) used by
54                           the Notifier
55         :param fanout: (bool) MQ clients may request that a copy of the message
56                        be delivered to all servers listening on a topic by
57                        setting fanout to ``True``, rather than just one of them
58         :returns: `MessagingConsumer` class object
59         """
60
61         self._pids = pids
62         self._endpoints = endpoints
63         self._transport = oslo_messaging.get_rpc_transport(
64             cfg.CONF, url=messaging.TRANSPORT_URL)
65         self._target = oslo_messaging.Target(topic=topic, fanout=fanout,
66                                              server=messaging.SERVER)
67         self._server = oslo_messaging.get_rpc_server(
68             self._transport, self._target, self._endpoints,
69             executor=messaging.RPC_SERVER_EXECUTOR,
70             access_policy=oslo_messaging.DefaultRPCAccessPolicy)
71
72     def start_rpc_server(self):
73         """Start the RPC server."""
74         if self._server:
75             self._server.start()
76
77     def stop_rpc_server(self):
78         """Stop the RPC server."""
79         if self._server:
80             self._server.stop()
81
82     def wait(self):
83         """Wait for message processing to complete (blocking)."""
84         if self._server:
85             self._server.wait()