1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ AMQP Consumer scenario definition """
16 from __future__ import absolute_import
17 from __future__ import print_function
20 from pika.exceptions import AMQPConnectionError
23 class AmqpConsumer(object):
24 """ This Class handles amqp consumer and collects collectd data """
25 EXCHANGE = 'amq.fanout'
26 EXCHANGE_TYPE = 'fanout'
28 ROUTING_KEY = 'collectd'
30 def __init__(self, amqp_url, queue):
31 super(AmqpConsumer, self).__init__()
32 self._connection = None
35 self._consumer_tag = None
40 """ connect to amqp url """
42 return pika.SelectConnection(pika.URLParameters(self._url),
43 self.on_connection_open,
44 stop_ioloop_on_close=False)
45 except AMQPConnectionError:
48 def on_connection_open(self, unused_connection):
49 """ call back from pika & open channel """
50 logging.info("list of unused connections %s", unused_connection)
51 self._connection.add_on_close_callback(self.on_connection_closed)
52 self._connection.channel(on_open_callback=self.on_channel_open)
54 def on_connection_closed(self, connection, reply_code, reply_text):
55 """ close the amqp connections. if force close, try re-connect """
56 logging.info("amqp connection (%s)", connection)
59 self._connection.ioloop.stop()
61 logging.debug(('Connection closed, reopening in 5 sec: (%s) %s',
62 reply_code, reply_text))
63 self._connection.add_timeout(5, self.reconnect)
66 """ re-connect amqp consumer"""
67 self._connection.ioloop.stop()
70 self._connection = self.connect()
71 self._connection.ioloop.start()
73 def on_channel_open(self, channel):
74 """ add close callback & setup exchange """
75 self._channel = channel
76 self.add_on_channel_close_callback()
77 self._channel.exchange_declare(self.on_exchange_declareok,
80 durable=True, auto_delete=False)
82 def add_on_channel_close_callback(self):
83 """ register for close callback """
84 self._channel.add_on_close_callback(self.on_channel_closed)
86 def on_channel_closed(self, channel, reply_code, reply_text):
87 """ close amqp channel connection """
88 logging.info("amqp channel closed channel(%s), "
89 "reply_code(%s) reply_text(%s)",
90 channel, reply_code, reply_text)
91 self._connection.close()
93 def on_exchange_declareok(self, unused_frame):
94 """ if exchange declare is ok, setup queue """
95 logging.info("amqp exchange unused frame (%s)", unused_frame)
96 self.setup_queue(self.QUEUE)
98 def setup_queue(self, queue_name):
99 """ setup queue & declare same with channel """
100 logging.info("amqp queue name (%s)", queue_name)
101 self._channel.queue_declare(self.on_queue_declareok, queue_name)
103 def on_queue_declareok(self, method_frame):
104 """ bind queue to channel """
105 logging.info("amqp queue method frame (%s)", method_frame)
106 self._channel.queue_bind(self._on_bindok, self.QUEUE,
107 self.EXCHANGE, self.ROUTING_KEY)
109 def _on_bindok(self, unused_frame):
110 """ call back on bind start consuming data from amqp queue """
111 logging.info("amqp unused frame %s", unused_frame)
112 self.add_on_cancel_callback()
113 self._consumer_tag = self._channel.basic_consume(self.on_message,
116 def add_on_cancel_callback(self):
117 """ add cancel func to amqp callback """
118 self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
120 def on_consumer_cancelled(self, method_frame):
121 """ on cancel close the channel """
122 logging.info("amqp method frame %s", method_frame)
124 self._channel.close()
126 def on_message(self, unused_channel, basic_deliver, properties, body):
127 """ parse received data from amqp server (collectd) """
128 logging.info("amqp unused channel %s, properties %s",
129 unused_channel, properties)
130 metrics = body.rsplit()
131 self._queue.put({metrics[1]: metrics[3]})
132 self.ack_message(basic_deliver.delivery_tag)
134 def ack_message(self, delivery_tag):
135 """ acknowledge amqp msg """
136 self._channel.basic_ack(delivery_tag)
138 def on_cancelok(self, unused_frame):
139 """ initiate amqp close channel on callback """
140 logging.info("amqp unused frame %s", unused_frame)
141 self._channel.close()
144 """ Initiate amqp connection. """
145 self._connection = self.connect()
146 self._connection.ioloop.start()
149 """ stop amqp consuming data """
152 self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
155 self._connection.ioloop.start()
157 def close_connection(self):
158 """ close amqp connection """
159 self._connection.close()