1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from oslo_config import cfg
19 from yardstick.common import messaging
20 from yardstick.common.messaging import consumer
21 from yardstick.tests.unit import base as ut_base
24 class TestEndPoint(object):
29 class _MessagingConsumer(consumer.MessagingConsumer):
33 class MessagingConsumerTestCase(ut_base.BaseUnitTestCase):
36 with mock.patch.object(oslo_messaging, 'get_rpc_server') as \
37 mock_get_rpc_server, \
38 mock.patch.object(oslo_messaging, 'get_rpc_transport') as \
39 mock_get_rpc_transport, \
40 mock.patch.object(oslo_messaging, 'Target') as \
42 mock_get_rpc_transport.return_value = 'test_rpc_transport'
43 mock_Target.return_value = 'test_Target'
45 _MessagingConsumer('test_topic', 'test_pid', [TestEndPoint],
47 mock_get_rpc_transport.assert_called_once_with(
48 cfg.CONF, url=messaging.TRANSPORT_URL)
49 mock_Target.assert_called_once_with(
50 topic='test_topic', fanout=True, server=messaging.SERVER)
51 mock_get_rpc_server.assert_called_once_with(
52 'test_rpc_transport', 'test_Target', [TestEndPoint],
53 executor=messaging.RPC_SERVER_EXECUTOR,
54 access_policy=oslo_messaging.DefaultRPCAccessPolicy)