1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from oslo_config import cfg
22 from yardstick.common import messaging
25 LOG = logging.getLogger(__name__)
28 @six.add_metaclass(abc.ABCMeta)
29 class NotificationHandler(object):
30 """Abstract class to define a endpoint object for a MessagingConsumer"""
32 def __init__(self, _id, ctx_ids, queue):
33 super(NotificationHandler, self).__init__()
35 self._ctx_ids = ctx_ids
39 @six.add_metaclass(abc.ABCMeta)
40 class MessagingConsumer(object):
41 """Abstract class to implement a MQ consumer
43 This abstract class allows a class implementing this interface to receive
44 the messages published by a `MessagingNotifier`.
47 def __init__(self, topic, ids, endpoints, fanout=True):
50 :param topic: (string) MQ exchange topic
51 :param ids: (list of int) list of IDs of the processes implementing
52 the MQ Notifier which will be in the message context
53 :param endpoints: (list of class) list of classes implementing the
54 methods (see `MessagingNotifier.send_message) used by
56 :param fanout: (bool) MQ clients may request that a copy of the message
57 be delivered to all servers listening on a topic by
58 setting fanout to ``True``, rather than just one of them
59 :returns: `MessagingConsumer` class object
63 self._endpoints = endpoints
64 self._transport = oslo_messaging.get_rpc_transport(
65 cfg.CONF, url=messaging.TRANSPORT_URL)
66 self._target = oslo_messaging.Target(topic=topic, fanout=fanout,
67 server=messaging.SERVER)
68 self._server = oslo_messaging.get_rpc_server(
69 self._transport, self._target, self._endpoints,
70 executor=messaging.RPC_SERVER_EXECUTOR,
71 access_policy=oslo_messaging.DefaultRPCAccessPolicy)
73 def start_rpc_server(self):
74 """Start the RPC server."""
78 def stop_rpc_server(self):
79 """Stop the RPC server."""
84 """Wait for message processing to complete (blocking)."""