1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from __future__ import absolute_import
17 import multiprocessing
20 from yardstick.network_services.nfvi.collectd import AmqpConsumer
23 class TestAmqpConsumer(unittest.TestCase):
25 self.queue = multiprocessing.Queue()
26 self.url = 'amqp://admin:admin@1.1.1.1:5672/%2F'
27 self.amqp_consumer = AmqpConsumer(self.url, self.queue)
29 def test___init__(self):
30 self.assertEqual(self.url, self.amqp_consumer._url)
32 def test_connect(self):
33 self.assertRaises(RuntimeError, self.amqp_consumer.connect)
35 def test_on_connection_open(self):
36 self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
37 self.amqp_consumer._connection.add_on_close_callback = \
38 mock.Mock(return_value=0)
39 self.amqp_consumer._connection.channel = mock.Mock(return_value=0)
40 self.assertEqual(None, self.amqp_consumer.on_connection_open(10))
42 def test_on_connection_closed(self):
43 self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
44 self.amqp_consumer._connection.ioloop = mock.Mock()
45 self.amqp_consumer._connection.ioloop.stop = mock.Mock(return_value=0)
46 self.amqp_consumer._connection.add_timeout = mock.Mock(return_value=0)
47 self.amqp_consumer._closing = True
48 self.assertEqual(None,
49 self.amqp_consumer.on_connection_closed("", 404,
51 self.amqp_consumer._closing = False
52 self.assertEqual(None,
53 self.amqp_consumer.on_connection_closed("", 404,
56 def test_reconnect(self):
57 self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
58 self.amqp_consumer._connection.ioloop = mock.Mock()
59 self.amqp_consumer._connection.ioloop.stop = mock.Mock(return_value=0)
60 self.amqp_consumer.connect = mock.Mock(return_value=0)
61 self.amqp_consumer._closing = True
62 self.assertEqual(None, self.amqp_consumer.reconnect())
64 def test_on_channel_open(self):
65 self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
66 self.amqp_consumer._connection.add_on_close_callback = \
67 mock.Mock(return_value=0)
68 self.amqp_consumer._channel = mock.Mock()
69 self.amqp_consumer.add_on_channel_close_callback = mock.Mock()
70 self.amqp_consumer._channel.exchange_declare = \
71 mock.Mock(return_value=0)
72 self.assertEqual(None,
73 self.amqp_consumer.on_channel_open(
74 self.amqp_consumer._channel))
76 def test_add_on_channel_close_callback(self):
77 self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
78 self.amqp_consumer._connection.add_on_close_callback = \
79 mock.Mock(return_value=0)
80 self.amqp_consumer._channel = mock.Mock()
81 self.amqp_consumer._channel.add_on_close_callback = mock.Mock()
82 self.assertEqual(None,
83 self.amqp_consumer.add_on_channel_close_callback())
85 def test_on_channel_closed(self):
86 self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
87 self.amqp_consumer._connection.close = mock.Mock(return_value=0)
88 _channel = mock.Mock()
89 self.assertEqual(None,
90 self.amqp_consumer.on_channel_closed(_channel,
93 def test_ion_exchange_declareok(self):
94 self.amqp_consumer.setup_queue = mock.Mock(return_value=0)
95 self.assertEqual(None, self.amqp_consumer.on_exchange_declareok(10))
97 def test_setup_queue(self):
98 self.amqp_consumer._channel = mock.Mock()
99 self.amqp_consumer._channel.add_on_close_callback = mock.Mock()
100 self.assertEqual(None, self.amqp_consumer.setup_queue("collectd"))
102 def test_on_queue_declareok(self):
103 self.amqp_consumer._channel = mock.Mock()
104 self.amqp_consumer._channel.queue_bind = mock.Mock()
105 self.assertEqual(None, self.amqp_consumer.on_queue_declareok(10))
107 def test__on_bindok(self):
108 self.amqp_consumer._channel = mock.Mock()
109 self.amqp_consumer._channel.basic_consume = mock.Mock()
110 self.amqp_consumer.add_on_cancel_callback = mock.Mock()
111 self.assertEqual(None, self.amqp_consumer._on_bindok(10))
113 def test_add_on_cancel_callback(self):
114 self.amqp_consumer._channel = mock.Mock()
115 self.amqp_consumer._channel.add_on_cancel_callback = mock.Mock()
116 self.assertEqual(None, self.amqp_consumer.add_on_cancel_callback())
118 def test_on_consumer_cancelled(self):
119 self.amqp_consumer._channel = mock.Mock()
120 self.amqp_consumer._channel.close = mock.Mock()
121 self.assertEqual(None, self.amqp_consumer.on_consumer_cancelled(10))
123 def test_on_message(self):
124 body = "msg {} cpu/cpu-0/ipc 101010:10"
126 basic_deliver = mock.Mock()
127 basic_deliver.delivery_tag = mock.Mock(return_value=0)
128 self.amqp_consumer.ack_message = mock.Mock()
129 self.assertEqual(None,
130 self.amqp_consumer.on_message(10, basic_deliver,
133 def test_ack_message(self):
134 self.amqp_consumer._channel = mock.Mock()
135 self.amqp_consumer._channel.basic_ack = mock.Mock()
136 self.assertEqual(None, self.amqp_consumer.ack_message(10))
138 def test_on_cancelok(self):
139 self.amqp_consumer._channel = mock.Mock()
140 self.amqp_consumer._channel.close = mock.Mock()
141 self.assertEqual(None, self.amqp_consumer.on_cancelok(10))
144 self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
145 self.amqp_consumer.connect = mock.Mock()
146 self.amqp_consumer._connection.ioloop.start = mock.Mock()
147 self.assertEqual(None, self.amqp_consumer.run())
150 self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
151 self.amqp_consumer.connect = mock.Mock()
152 self.amqp_consumer._connection.ioloop.start = mock.Mock()
153 self.amqp_consumer._channel = mock.Mock()
154 self.amqp_consumer._channel.basic_cancel = mock.Mock()
155 self.assertEqual(None, self.amqp_consumer.stop())
157 def test_close_connection(self):
158 self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
159 self.amqp_consumer._connection.close = mock.Mock()
160 self.assertEqual(None, self.amqp_consumer.close_connection())