1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from oslo_config import cfg
22 from yardstick.common import messaging
25 LOG = logging.getLogger(__name__)
28 @six.add_metaclass(abc.ABCMeta)
29 class NotificationHandler(object):
30 """Abstract class to define a endpoint object for a MessagingConsumer"""
32 def __init__(self, _id, ctx_pids, queue):
34 self._ctx_pids = ctx_pids
38 @six.add_metaclass(abc.ABCMeta)
39 class MessagingConsumer(object):
40 """Abstract class to implement a MQ consumer
42 This abstract class allows a class implementing this interface to receive
43 the messages published by a `MessagingNotifier`.
46 def __init__(self, topic, pids, endpoints, fanout=True):
49 :param topic: (string) MQ exchange topic
50 :param pids: (list of int) list of PIDs of the processes implementing
51 the MQ Notifier which will be in the message context
52 :param endpoints: (list of class) list of classes implementing the
53 methods (see `MessagingNotifier.send_message) used by
55 :param fanout: (bool) MQ clients may request that a copy of the message
56 be delivered to all servers listening on a topic by
57 setting fanout to ``True``, rather than just one of them
58 :returns: `MessagingConsumer` class object
62 self._endpoints = endpoints
63 self._transport = oslo_messaging.get_rpc_transport(
64 cfg.CONF, url=messaging.TRANSPORT_URL)
65 self._target = oslo_messaging.Target(topic=topic, fanout=fanout,
66 server=messaging.SERVER)
67 self._server = oslo_messaging.get_rpc_server(
68 self._transport, self._target, self._endpoints,
69 executor=messaging.RPC_SERVER_EXECUTOR,
70 access_policy=oslo_messaging.DefaultRPCAccessPolicy)
72 def start_rpc_server(self):
73 """Start the RPC server."""
77 def stop_rpc_server(self):
78 """Stop the RPC server."""
83 """Wait for message processing to complete (blocking)."""