Merge "add CheckConnectivity scenario"
[yardstick.git] / yardstick / network_services / nfvi / collectd.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ AMQP Consumer scenario definition """
15
16 from __future__ import absolute_import
17 from __future__ import print_function
18 import logging
19 import pika
20 from pika.exceptions import AMQPConnectionError
21
22
23 class AmqpConsumer(object):
24     """ This Class handles amqp consumer and collects collectd data """
25     EXCHANGE = 'amq.fanout'
26     EXCHANGE_TYPE = 'fanout'
27     QUEUE = ''
28     ROUTING_KEY = 'collectd'
29
30     def __init__(self, amqp_url, queue):
31         super(AmqpConsumer, self).__init__()
32         self._connection = None
33         self._channel = None
34         self._closing = False
35         self._consumer_tag = None
36         self._url = amqp_url
37         self._queue = queue
38         self._queue.cancel_join_thread()
39
40     def connect(self):
41         """ connect to amqp url """
42         try:
43             return pika.SelectConnection(pika.URLParameters(self._url),
44                                          self.on_connection_open,
45                                          stop_ioloop_on_close=False)
46         except AMQPConnectionError:
47             raise RuntimeError
48
49     def on_connection_open(self, unused_connection):
50         """ call back from pika & open channel """
51         logging.info("list of unused connections %s", unused_connection)
52         self._connection.add_on_close_callback(self.on_connection_closed)
53         self._connection.channel(on_open_callback=self.on_channel_open)
54
55     def on_connection_closed(self, connection, reply_code, reply_text):
56         """ close the amqp connections. if force close, try re-connect """
57         logging.info("amqp connection (%s)", connection)
58         self._channel = None
59         if self._closing:
60             self._connection.ioloop.stop()
61         else:
62             logging.debug(('Connection closed, reopening in 5 sec: (%s) %s',
63                            reply_code, reply_text))
64             self._connection.add_timeout(5, self.reconnect)
65
66     def reconnect(self):
67         """ re-connect amqp consumer"""
68         self._connection.ioloop.stop()
69
70         if not self._closing:
71             self._connection = self.connect()
72             self._connection.ioloop.start()
73
74     def on_channel_open(self, channel):
75         """ add close callback & setup exchange """
76         self._channel = channel
77         self.add_on_channel_close_callback()
78         self._channel.exchange_declare(self.on_exchange_declareok,
79                                        self.EXCHANGE,
80                                        self.EXCHANGE_TYPE,
81                                        durable=True, auto_delete=False)
82
83     def add_on_channel_close_callback(self):
84         """ register for close callback """
85         self._channel.add_on_close_callback(self.on_channel_closed)
86
87     def on_channel_closed(self, channel, reply_code, reply_text):
88         """ close amqp channel connection """
89         logging.info("amqp channel closed channel(%s), "
90                      "reply_code(%s) reply_text(%s)",
91                      channel, reply_code, reply_text)
92         self._connection.close()
93
94     def on_exchange_declareok(self, unused_frame):
95         """ if exchange declare is ok, setup queue """
96         logging.info("amqp exchange unused frame (%s)", unused_frame)
97         self.setup_queue(self.QUEUE)
98
99     def setup_queue(self, queue_name):
100         """ setup queue & declare same with channel """
101         logging.info("amqp queue name (%s)", queue_name)
102         self._channel.queue_declare(self.on_queue_declareok, queue_name)
103
104     def on_queue_declareok(self, method_frame):
105         """ bind queue to channel """
106         logging.info("amqp queue method frame (%s)", method_frame)
107         self._channel.queue_bind(self._on_bindok, self.QUEUE,
108                                  self.EXCHANGE, self.ROUTING_KEY)
109
110     def _on_bindok(self, unused_frame):
111         """ call back on bind start consuming data from amqp queue """
112         logging.info("amqp unused frame %s", unused_frame)
113         self.add_on_cancel_callback()
114         self._consumer_tag = self._channel.basic_consume(self.on_message,
115                                                          self.QUEUE)
116
117     def add_on_cancel_callback(self):
118         """ add cancel func to amqp callback """
119         self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
120
121     def on_consumer_cancelled(self, method_frame):
122         """ on cancel close the channel """
123         logging.info("amqp method frame %s", method_frame)
124         if self._channel:
125             self._channel.close()
126
127     def on_message(self, unused_channel, basic_deliver, properties, body):
128         """ parse received data from amqp server (collectd) """
129         logging.info("amqp unused channel %s, properties %s",
130                      unused_channel, properties)
131         metrics = body.rsplit()
132         self._queue.put({metrics[1]: metrics[3]})
133         self.ack_message(basic_deliver.delivery_tag)
134
135     def ack_message(self, delivery_tag):
136         """ acknowledge amqp msg """
137         self._channel.basic_ack(delivery_tag)
138
139     def on_cancelok(self, unused_frame):
140         """ initiate amqp close channel on callback """
141         logging.info("amqp unused frame %s", unused_frame)
142         self._channel.close()
143
144     def run(self):
145         """ Initiate amqp connection. """
146         self._connection = self.connect()
147         self._connection.ioloop.start()
148
149     def stop(self):
150         """ stop amqp consuming data """
151         self._closing = True
152         if self._channel:
153             self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
154
155         if self._connection:
156             self._connection.ioloop.start()
157
158     def close_connection(self):
159         """ close amqp connection """
160         self._connection.close()