JIRA: BOTTLENECKS-29
[bottlenecks.git] / vstf / vstf / rpc_frame_work / rpc_consumer.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import logging
11 import time
12
13 import stevedore
14
15 import pika
16 from vstf.common import message
17 from vstf.rpc_frame_work import constant
18
19 LOGGER = logging.getLogger(__name__)
20
21
22 class VstfConsumer(object):
23     """This is an example consumer that will handle unexpected interactions
24     with RabbitMQ such as channel and connection closures.
25
26     If RabbitMQ closes the connection, it will reopen it. You should
27     look at the output, as there are limited reasons why the connection may
28     be closed, which usually are tied to permission related issues or
29     socket timeouts.
30
31     If the channel is closed, it will indicate a problem with one of the
32     commands that were issued and that should surface in the output as well.
33
34     """
35
36     def __init__(self, agent,
37                  user='guest',
38                  passwd='guest',
39                  host='localhost',
40                  port='5672',
41                  agent_id="agent"):
42         """Create a new instance of the consumer class, passing in the AMQP
43         URL used to connect to RabbitMQ.
44
45         :param str    user: The user name of RabbitMQ server
46         :param str    passwd: The passwd of RabbitMQ server
47         :param str    host: The ip of RabbitMQ server
48         :param str    port: connection's port
49
50         """
51         self._connection = None
52         self._channel = None
53         self._closing = False
54         self._consumer_tag = None
55         self.user = user
56         self.passwd = passwd
57         self.srv = host
58         self.port = port
59         self.agent_id = agent_id
60         self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
61
62         # load the agent_funcs
63         try:
64             self.agent_ops = stevedore.driver.DriverManager(
65                 namespace="agent.plugins",
66                 name=agent,
67                 invoke_on_load=True)
68         except Exception as e:
69             LOGGER.error(message.dumpstrace())
70             raise e
71
72         super(VstfConsumer, self).__init__()
73
74     def connect(self):
75         """This method connects to RabbitMQ, returning the connection handle.
76         When the connection is established, the on_connection_open method
77         will be invoked by pika.
78
79         :rtype: pika.SelectConnection
80
81         """
82         LOGGER.info('Connecting to %s:%s', self.srv, self.port)
83         return pika.SelectConnection(pika.URLParameters(self.url),
84                                      self.on_connection_open,
85                                      stop_ioloop_on_close=False)
86
87     #         return pika.SelectConnection(pika.ConnectionParameters(host="%s:%s" %(self.srv,self.port)),
88     #                                      self.on_connection_open,
89     #                                      stop_ioloop_on_close=False)
90
91     def on_connection_open(self, unused_connection):
92         """This method is called by pika once the connection to RabbitMQ has
93         been established. It passes the handle to the connection object in
94         case we need it, but in this case, we'll just mark it unused.
95
96         :type unused_connection: pika.SelectConnection
97
98         """
99         LOGGER.info('Connection opened')
100         self.add_on_connection_close_callback()
101         self.open_channel()
102
103     def add_on_connection_close_callback(self):
104         """This method adds an on close callback that will be invoked by pika
105         when RabbitMQ closes the connection to the publisher unexpectedly.
106
107         """
108         LOGGER.info('Adding connection close callback')
109         self._connection.add_on_close_callback(self.on_connection_closed)
110
111     def on_connection_closed(self, connection, reply_code, reply_text):
112         """This method is invoked by pika when the connection to RabbitMQ is
113         closed unexpectedly. Since it is unexpected, we will reconnect to
114         RabbitMQ if it disconnects.
115
116         :param pika.connection.Connection connection: The closed connection obj
117         :param int reply_code: The server provided reply_code if given
118         :param str reply_text: The server provided reply_text if given
119
120         """
121         self._channel = None
122         if self._closing:
123             self._connection.ioloop.stop()
124         else:
125             LOGGER.warning('Connection closed, reopening in 2 seconds: (%s) %s',
126                            reply_code, reply_text)
127             self._connection.add_timeout(2, self.reconnect)
128
129     def reconnect(self):
130         """Will be invoked by the IOLoop timer if the connection is
131         closed. See the on_connection_closed method.
132
133         """
134         # This is the old connection IOLoop instance, stop its ioloop
135         # Sometimes the broken connection may be exception
136         try:
137             self._connection.ioloop.stop()
138         except Exception:
139             pass
140
141         while not self._closing:
142             # Create a new connection
143             try:
144                 self._connection = self.connect()
145             except Exception:
146                 LOGGER.error(message.dumpstrace())
147                 time.sleep(3)
148                 continue
149             break
150
151         # There is now a new connection, needs a new ioloop to run
152         self._connection.ioloop.start()
153
154     def open_channel(self):
155         """Open a new channel with RabbitMQ by issuing the Channel.Open RPC
156         command. When RabbitMQ responds that the channel is open, the
157         on_channel_open callback will be invoked by pika.
158
159         """
160         LOGGER.info('Creating a new channel')
161         self._connection.channel(on_open_callback=self.on_channel_open)
162
163     def on_channel_open(self, channel):
164         """This method is invoked by pika when the channel has been opened.
165         The channel object is passed in so we can make use of it.
166
167         Since the channel is now open, we'll declare the exchange to use.
168
169         :param pika.channel.Channel channel: The channel object
170
171         """
172         LOGGER.info('Channel opened')
173         self._channel = channel
174         self.add_on_channel_close_callback()
175         self.setup_exchanges()
176
177     def add_on_channel_close_callback(self):
178         """This method tells pika to call the on_channel_closed method if
179         RabbitMQ unexpectedly closes the channel.
180
181         """
182         LOGGER.info('Adding channel close callback')
183         self._channel.add_on_close_callback(self.on_channel_closed)
184
185     def on_channel_closed(self, channel, reply_code, reply_text):
186         """Invoked by pika when RabbitMQ unexpectedly closes the channel.
187         Channels are usually closed if you attempt to do something that
188         violates the protocol, such as re-declare an exchange or queue with
189         different parameters. In this case, we'll close the connection
190         to shutdown the object.
191
192         :param pika.channel.Channel: The closed channel
193         :param int reply_code: The numeric reason the channel was closed
194         :param str reply_text: The text reason the channel was closed
195
196         """
197         LOGGER.warning('Channel %i was closed: (%s) %s',
198                        channel, reply_code, reply_text)
199         self._connection.close()
200
201     def setup_exchanges(self):
202         """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
203         command. When it is complete, the on_exchange_declareok method will
204         be invoked by pika.
205
206         :param str|unicode exchange_name: The name of the exchange to declare
207
208         """
209         LOGGER.info('Declaring %s exchange %s', constant.DIRECT, constant.exchange_d)
210         self._channel.exchange_declare(self.on_direct_exchange_declareok,
211                                        constant.exchange_d,
212                                        constant.DIRECT)
213
214     #         LOGGER.info('Declaring %s exchange %s', constant.FAN, constant.fan_exchange)
215     #         self._channel.exchange_declare(self.on_fan_exchange_declareok,
216     #                                        constant.fan_exchange,
217     #                                        constant.FAN)
218
219     def on_fan_exchange_declareok(self, unused_frame):
220         """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
221         command.
222
223         :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
224
225         """
226         LOGGER.info('Exchange declared')
227         pass
228
229     def on_direct_exchange_declareok(self, unused_frame):
230         queue_name = constant.queue_common + self.agent_id
231         self.setup_queue(queue_name, self.on_direct_queue_declareok)
232
233     def setup_queue(self, queue_name, next_ops):
234         """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
235         command. When it is complete, the on_queue_declareok method will
236         be invoked by pika.
237
238         :param str|unicode queue_name: The name of the queue to declare.
239
240         """
241         LOGGER.info('Declaring queue %s', queue_name)
242         self._channel.queue_declare(next_ops,
243                                     queue=queue_name,
244                                     exclusive=True)
245
246     def on_direct_queue_declareok(self, method_frame):
247         """Method invoked by pika when the Queue.Declare RPC call made in
248         setup_queue has completed. In this method we will bind the queue
249         and exchange together with the routing key by issuing the Queue.Bind
250         RPC command. When this command is complete, the on_bindok method will
251         be invoked by pika.
252
253         :param pika.frame.Method method_frame: The Queue.DeclareOk frame
254
255         """
256         queue_name = constant.queue_common + self.agent_id
257         LOGGER.info('Binding %s to %s with %s',
258                     queue_name, constant.exchange_d, queue_name)
259         self._channel.queue_bind(self.on_bindok, queue_name,
260                                  constant.exchange_d, queue_name)
261
262     def on_bindok(self, unused_frame):
263         """Invoked by pika when the Queue.Bind method has completed. At this
264         point we will start consuming messages by calling start_consuming
265         which will invoke the needed RPC commands to start the process.
266
267         :param pika.frame.Method unused_frame: The Queue.BindOk response frame
268
269         """
270         LOGGER.info('Queue bound')
271         self.start_consuming()
272
273     def start_consuming(self):
274         """This method sets up the consumer by first calling
275         add_on_cancel_callback so that the object is notified if RabbitMQ
276         cancels the consumer. It then issues the Basic.Consume RPC command
277         which returns the consumer tag that is used to uniquely identify the
278         consumer with RabbitMQ. We keep the value to use it when we want to
279         cancel consuming. The on_message method is passed in as a callback pika
280         will invoke when a message is fully received.
281
282         """
283         queue_name = constant.queue_common + self.agent_id
284         LOGGER.info('Issuing consumer related RPC commands')
285         self.add_on_cancel_callback()
286         self._consumer_tag = self._channel.basic_consume(self.on_message,
287                                                          queue_name)
288
289     def add_on_cancel_callback(self):
290         """Add a callback that will be invoked if RabbitMQ cancels the consumer
291         for some reason. If RabbitMQ does cancel the consumer,
292         on_consumer_cancelled will be invoked by pika.
293
294         """
295         LOGGER.info('Adding consumer cancellation callback')
296         self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
297
298     def on_consumer_cancelled(self, method_frame):
299         """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
300         receiving messages.
301
302         :param pika.frame.Method method_frame: The Basic.Cancel frame
303
304         """
305         LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
306                     method_frame)
307         if self._channel:
308             self._channel.close()
309
310     def on_message(self, respone_chanl, basic_deliver, properties, body):
311         """Invoked by pika when a message is delivered from RabbitMQ. The
312         channel is passed for your convenience. The basic_deliver object that
313         is passed in carries the exchange, routing key, delivery tag and
314         a redelivered flag for the message. The properties passed in is an
315         instance of BasicProperties with the message properties and the body
316         is the message that was sent.
317
318         :param pika.channel.Channel unused_channel: The channel object
319         :param pika.Spec.Basic.Deliver: basic_deliver method
320         :param pika.Spec.BasicProperties: properties
321         :param str|unicode body: The message body
322
323         """
324         LOGGER.info('Received message # %s from %s: %s',
325                     basic_deliver.delivery_tag, properties.app_id, body)
326         try:
327             msg = message.decode(body)
328             head = message.get_context(msg)
329             main_body = message.get_body(msg)
330
331             LOGGER.debug("recive the msg: head:%(h)s, body:%(b)s",
332                          {'h': head,
333                           'b': main_body})
334
335             func = getattr(self.agent_ops.driver, main_body.get('method'))
336             response = func(**main_body.get('args'))
337         except Exception as e:
338             LOGGER.error(message.dumpstrace())
339             LOGGER.error("request happend error")
340             response = {'exception': {'name': e.__class__.__name__,
341                                       'message': e.message,
342                                       'args': e.args}}
343         finally:
344             response = message.add_context(response, **head)
345             LOGGER.debug("response the msg: head:%(h)s, body:%(b)s",
346                          {'h': response.get('head'), 'b': response.get('body')})
347
348             respone_chanl.basic_publish(exchange=constant.exchange_d,
349                                         routing_key=properties.reply_to,
350                                         properties=pika.BasicProperties(correlation_id=properties.correlation_id),
351                                         body=message.encode(response)
352                                         )
353             # no matter what happend, tell the mq-server to drop this msg.
354
355         self.acknowledge_message(basic_deliver.delivery_tag)
356
357     def acknowledge_message(self, delivery_tag):
358         """Acknowledge the message delivery from RabbitMQ by sending a
359         Basic.Ack RPC method for the delivery tag.
360
361         :param int delivery_tag: The delivery tag from the Basic.Deliver frame
362
363         """
364         LOGGER.info('Acknowledging message %s', delivery_tag)
365         self._channel.basic_ack(delivery_tag)
366
367     def stop_consuming(self):
368         """Tell RabbitMQ that you would like to stop consuming by sending the
369         Basic.Cancel RPC command.
370
371         """
372         if self._channel:
373             LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
374             self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
375
376     def on_cancelok(self, unused_frame):
377         """This method is invoked by pika when RabbitMQ acknowledges the
378         cancellation of a consumer. At this point we will close the channel.
379         This will invoke the on_channel_closed method once the channel has been
380         closed, which will in-turn close the connection.
381
382         :param pika.frame.Method unused_frame: The Basic.CancelOk frame
383
384         """
385         LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
386         self.close_channel()
387
388     def close_channel(self):
389         """Call to close the channel with RabbitMQ cleanly by issuing the
390         Channel.Close RPC command.
391
392         """
393         LOGGER.info('Closing the channel')
394         self._channel.close()
395
396     def run(self):
397         """Run the example consumer by connecting to RabbitMQ and then
398         starting the IOLoop to block and allow the SelectConnection to operate.
399
400         """
401         try:
402             self._connection = self.connect()
403         except Exception as e:
404             LOGGER.error(message.dumpstrace())
405         self._connection.ioloop.start()
406
407     def stop(self):
408         """Cleanly shutdown the connection to RabbitMQ by stopping the consumer
409         with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
410         will be invoked by pika, which will then closing the channel and
411         connection. The IOLoop is started again because this method is invoked
412         when CTRL-C is pressed raising a KeyboardInterrupt exception. This
413         exception stops the IOLoop which needs to be running for pika to
414         communicate with RabbitMQ. All of the commands issued prior to starting
415         the IOLoop will be buffered but not processed.
416
417         """
418         LOGGER.info('Stopping')
419         self._closing = True
420         self.stop_consuming()
421         self._connection.ioloop.stop()
422         self.close_connection()
423         LOGGER.info('Stopped')
424
425     def close_connection(self):
426         """This method closes the connection to RabbitMQ."""
427         LOGGER.info('Closing connection')
428         self._connection.close()