Create a SampleVNF MQ consumer class
[yardstick.git] / yardstick / common / messaging / consumer.py
1 # Copyright (c) 2018 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import abc
16 import logging
17
18 from oslo_config import cfg
19 import oslo_messaging
20 import six
21
22 from yardstick.common import messaging
23
24
25 LOG = logging.getLogger(__name__)
26
27
28 @six.add_metaclass(abc.ABCMeta)
29 class NotificationHandler(object):
30     """Abstract class to define a endpoint object for a MessagingConsumer"""
31
32     def __init__(self, _id, ctx_ids, queue):
33         super(NotificationHandler, self).__init__()
34         self._id = _id
35         self._ctx_ids = ctx_ids
36         self._queue = queue
37
38
39 @six.add_metaclass(abc.ABCMeta)
40 class MessagingConsumer(object):
41     """Abstract class to implement a MQ consumer
42
43     This abstract class allows a class implementing this interface to receive
44     the messages published by a `MessagingNotifier`.
45     """
46
47     def __init__(self, topic, ids, endpoints, fanout=True):
48         """Init function.
49
50         :param topic: (string) MQ exchange topic
51         :param ids: (list of int) list of IDs of the processes implementing
52                      the MQ Notifier which will be in the message context
53         :param endpoints: (list of class) list of classes implementing the
54                           methods (see `MessagingNotifier.send_message) used by
55                           the Notifier
56         :param fanout: (bool) MQ clients may request that a copy of the message
57                        be delivered to all servers listening on a topic by
58                        setting fanout to ``True``, rather than just one of them
59         :returns: `MessagingConsumer` class object
60         """
61
62         self._ids = ids
63         self._endpoints = endpoints
64         self._transport = oslo_messaging.get_rpc_transport(
65             cfg.CONF, url=messaging.TRANSPORT_URL)
66         self._target = oslo_messaging.Target(topic=topic, fanout=fanout,
67                                              server=messaging.SERVER)
68         self._server = oslo_messaging.get_rpc_server(
69             self._transport, self._target, self._endpoints,
70             executor=messaging.RPC_SERVER_EXECUTOR,
71             access_policy=oslo_messaging.DefaultRPCAccessPolicy)
72
73     def start_rpc_server(self):
74         """Start the RPC server."""
75         if self._server:
76             self._server.start()
77
78     def stop_rpc_server(self):
79         """Stop the RPC server."""
80         if self._server:
81             self._server.stop()
82
83     def wait(self):
84         """Wait for message processing to complete (blocking)."""
85         if self._server:
86             self._server.wait()