f7aacfd6b859b4ddc44f24aece3d9e9f88ddbd15
[bottlenecks.git] / vstf / vstf / rpc_frame_work / rpc_consumer.py
1 #!/usr/bin/env python
2 # coding=utf-8
3 import logging
4 import time
5
6 import stevedore
7
8 import pika
9 from vstf.common import message
10 from vstf.rpc_frame_work import constant
11
12 LOGGER = logging.getLogger(__name__)
13
14
15 class VstfConsumer(object):
16     """This is an example consumer that will handle unexpected interactions
17     with RabbitMQ such as channel and connection closures.
18
19     If RabbitMQ closes the connection, it will reopen it. You should
20     look at the output, as there are limited reasons why the connection may
21     be closed, which usually are tied to permission related issues or
22     socket timeouts.
23
24     If the channel is closed, it will indicate a problem with one of the
25     commands that were issued and that should surface in the output as well.
26
27     """
28
29     def __init__(self, agent,
30                  user='guest',
31                  passwd='guest',
32                  host='localhost',
33                  port='5672',
34                  agent_id="agent"):
35         """Create a new instance of the consumer class, passing in the AMQP
36         URL used to connect to RabbitMQ.
37
38         :param str    user: The user name of RabbitMQ server
39         :param str    passwd: The passwd of RabbitMQ server
40         :param str    host: The ip of RabbitMQ server
41         :param str    port: connection's port
42
43         """
44         self._connection = None
45         self._channel = None
46         self._closing = False
47         self._consumer_tag = None
48         self.user = user
49         self.passwd = passwd
50         self.srv = host
51         self.port = port
52         self.agent_id = agent_id
53         self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
54
55         # load the agent_funcs
56         try:
57             self.agent_ops = stevedore.driver.DriverManager(
58                 namespace="agent.plugins",
59                 name=agent,
60                 invoke_on_load=True)
61         except Exception as e:
62             LOGGER.error(message.dumpstrace())
63             raise e
64
65         super(VstfConsumer, self).__init__()
66
67     def connect(self):
68         """This method connects to RabbitMQ, returning the connection handle.
69         When the connection is established, the on_connection_open method
70         will be invoked by pika.
71
72         :rtype: pika.SelectConnection
73
74         """
75         LOGGER.info('Connecting to %s:%s', self.srv, self.port)
76         return pika.SelectConnection(pika.URLParameters(self.url),
77                                      self.on_connection_open,
78                                      stop_ioloop_on_close=False)
79
80     #         return pika.SelectConnection(pika.ConnectionParameters(host="%s:%s" %(self.srv,self.port)),
81     #                                      self.on_connection_open,
82     #                                      stop_ioloop_on_close=False)
83
84     def on_connection_open(self, unused_connection):
85         """This method is called by pika once the connection to RabbitMQ has
86         been established. It passes the handle to the connection object in
87         case we need it, but in this case, we'll just mark it unused.
88
89         :type unused_connection: pika.SelectConnection
90
91         """
92         LOGGER.info('Connection opened')
93         self.add_on_connection_close_callback()
94         self.open_channel()
95
96     def add_on_connection_close_callback(self):
97         """This method adds an on close callback that will be invoked by pika
98         when RabbitMQ closes the connection to the publisher unexpectedly.
99
100         """
101         LOGGER.info('Adding connection close callback')
102         self._connection.add_on_close_callback(self.on_connection_closed)
103
104     def on_connection_closed(self, connection, reply_code, reply_text):
105         """This method is invoked by pika when the connection to RabbitMQ is
106         closed unexpectedly. Since it is unexpected, we will reconnect to
107         RabbitMQ if it disconnects.
108
109         :param pika.connection.Connection connection: The closed connection obj
110         :param int reply_code: The server provided reply_code if given
111         :param str reply_text: The server provided reply_text if given
112
113         """
114         self._channel = None
115         if self._closing:
116             self._connection.ioloop.stop()
117         else:
118             LOGGER.warning('Connection closed, reopening in 2 seconds: (%s) %s',
119                            reply_code, reply_text)
120             self._connection.add_timeout(2, self.reconnect)
121
122     def reconnect(self):
123         """Will be invoked by the IOLoop timer if the connection is
124         closed. See the on_connection_closed method.
125
126         """
127         # This is the old connection IOLoop instance, stop its ioloop
128         # Sometimes the broken connection may be exception
129         try:
130             self._connection.ioloop.stop()
131         except Exception:
132             pass
133
134         while not self._closing:
135             # Create a new connection
136             try:
137                 self._connection = self.connect()
138             except Exception:
139                 LOGGER.error(message.dumpstrace())
140                 time.sleep(3)
141                 continue
142             break
143
144         # There is now a new connection, needs a new ioloop to run
145         self._connection.ioloop.start()
146
147     def open_channel(self):
148         """Open a new channel with RabbitMQ by issuing the Channel.Open RPC
149         command. When RabbitMQ responds that the channel is open, the
150         on_channel_open callback will be invoked by pika.
151
152         """
153         LOGGER.info('Creating a new channel')
154         self._connection.channel(on_open_callback=self.on_channel_open)
155
156     def on_channel_open(self, channel):
157         """This method is invoked by pika when the channel has been opened.
158         The channel object is passed in so we can make use of it.
159
160         Since the channel is now open, we'll declare the exchange to use.
161
162         :param pika.channel.Channel channel: The channel object
163
164         """
165         LOGGER.info('Channel opened')
166         self._channel = channel
167         self.add_on_channel_close_callback()
168         self.setup_exchanges()
169
170     def add_on_channel_close_callback(self):
171         """This method tells pika to call the on_channel_closed method if
172         RabbitMQ unexpectedly closes the channel.
173
174         """
175         LOGGER.info('Adding channel close callback')
176         self._channel.add_on_close_callback(self.on_channel_closed)
177
178     def on_channel_closed(self, channel, reply_code, reply_text):
179         """Invoked by pika when RabbitMQ unexpectedly closes the channel.
180         Channels are usually closed if you attempt to do something that
181         violates the protocol, such as re-declare an exchange or queue with
182         different parameters. In this case, we'll close the connection
183         to shutdown the object.
184
185         :param pika.channel.Channel: The closed channel
186         :param int reply_code: The numeric reason the channel was closed
187         :param str reply_text: The text reason the channel was closed
188
189         """
190         LOGGER.warning('Channel %i was closed: (%s) %s',
191                        channel, reply_code, reply_text)
192         self._connection.close()
193
194     def setup_exchanges(self):
195         """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
196         command. When it is complete, the on_exchange_declareok method will
197         be invoked by pika.
198
199         :param str|unicode exchange_name: The name of the exchange to declare
200
201         """
202         LOGGER.info('Declaring %s exchange %s', constant.DIRECT, constant.exchange_d)
203         self._channel.exchange_declare(self.on_direct_exchange_declareok,
204                                        constant.exchange_d,
205                                        constant.DIRECT)
206
207     #         LOGGER.info('Declaring %s exchange %s', constant.FAN, constant.fan_exchange)
208     #         self._channel.exchange_declare(self.on_fan_exchange_declareok,
209     #                                        constant.fan_exchange,
210     #                                        constant.FAN)
211
212     def on_fan_exchange_declareok(self, unused_frame):
213         """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
214         command.
215
216         :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
217
218         """
219         LOGGER.info('Exchange declared')
220         pass
221
222     def on_direct_exchange_declareok(self, unused_frame):
223         queue_name = constant.queue_common + self.agent_id
224         self.setup_queue(queue_name, self.on_direct_queue_declareok)
225
226     def setup_queue(self, queue_name, next_ops):
227         """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
228         command. When it is complete, the on_queue_declareok method will
229         be invoked by pika.
230
231         :param str|unicode queue_name: The name of the queue to declare.
232
233         """
234         LOGGER.info('Declaring queue %s', queue_name)
235         self._channel.queue_declare(next_ops,
236                                     queue=queue_name,
237                                     exclusive=True)
238
239     def on_direct_queue_declareok(self, method_frame):
240         """Method invoked by pika when the Queue.Declare RPC call made in
241         setup_queue has completed. In this method we will bind the queue
242         and exchange together with the routing key by issuing the Queue.Bind
243         RPC command. When this command is complete, the on_bindok method will
244         be invoked by pika.
245
246         :param pika.frame.Method method_frame: The Queue.DeclareOk frame
247
248         """
249         queue_name = constant.queue_common + self.agent_id
250         LOGGER.info('Binding %s to %s with %s',
251                     queue_name, constant.exchange_d, queue_name)
252         self._channel.queue_bind(self.on_bindok, queue_name,
253                                  constant.exchange_d, queue_name)
254
255     def on_bindok(self, unused_frame):
256         """Invoked by pika when the Queue.Bind method has completed. At this
257         point we will start consuming messages by calling start_consuming
258         which will invoke the needed RPC commands to start the process.
259
260         :param pika.frame.Method unused_frame: The Queue.BindOk response frame
261
262         """
263         LOGGER.info('Queue bound')
264         self.start_consuming()
265
266     def start_consuming(self):
267         """This method sets up the consumer by first calling
268         add_on_cancel_callback so that the object is notified if RabbitMQ
269         cancels the consumer. It then issues the Basic.Consume RPC command
270         which returns the consumer tag that is used to uniquely identify the
271         consumer with RabbitMQ. We keep the value to use it when we want to
272         cancel consuming. The on_message method is passed in as a callback pika
273         will invoke when a message is fully received.
274
275         """
276         queue_name = constant.queue_common + self.agent_id
277         LOGGER.info('Issuing consumer related RPC commands')
278         self.add_on_cancel_callback()
279         self._consumer_tag = self._channel.basic_consume(self.on_message,
280                                                          queue_name)
281
282     def add_on_cancel_callback(self):
283         """Add a callback that will be invoked if RabbitMQ cancels the consumer
284         for some reason. If RabbitMQ does cancel the consumer,
285         on_consumer_cancelled will be invoked by pika.
286
287         """
288         LOGGER.info('Adding consumer cancellation callback')
289         self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
290
291     def on_consumer_cancelled(self, method_frame):
292         """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
293         receiving messages.
294
295         :param pika.frame.Method method_frame: The Basic.Cancel frame
296
297         """
298         LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
299                     method_frame)
300         if self._channel:
301             self._channel.close()
302
303     def on_message(self, respone_chanl, basic_deliver, properties, body):
304         """Invoked by pika when a message is delivered from RabbitMQ. The
305         channel is passed for your convenience. The basic_deliver object that
306         is passed in carries the exchange, routing key, delivery tag and
307         a redelivered flag for the message. The properties passed in is an
308         instance of BasicProperties with the message properties and the body
309         is the message that was sent.
310
311         :param pika.channel.Channel unused_channel: The channel object
312         :param pika.Spec.Basic.Deliver: basic_deliver method
313         :param pika.Spec.BasicProperties: properties
314         :param str|unicode body: The message body
315
316         """
317         LOGGER.info('Received message # %s from %s: %s',
318                     basic_deliver.delivery_tag, properties.app_id, body)
319         try:
320             msg = message.decode(body)
321             head = message.get_context(msg)
322             main_body = message.get_body(msg)
323
324             LOGGER.debug("recive the msg: head:%(h)s, body:%(b)s",
325                          {'h': head,
326                           'b': main_body})
327
328             func = getattr(self.agent_ops.driver, main_body.get('method'))
329             response = func(**main_body.get('args'))
330         except Exception as e:
331             LOGGER.error(message.dumpstrace())
332             LOGGER.error("request happend error")
333             response = {'exception': {'name': e.__class__.__name__,
334                                       'message': e.message,
335                                       'args': e.args}}
336         finally:
337             response = message.add_context(response, **head)
338             LOGGER.debug("response the msg: head:%(h)s, body:%(b)s",
339                          {'h': response.get('head'), 'b': response.get('body')})
340
341             respone_chanl.basic_publish(exchange=constant.exchange_d,
342                                         routing_key=properties.reply_to,
343                                         properties=pika.BasicProperties(correlation_id=properties.correlation_id),
344                                         body=message.encode(response)
345                                         )
346             # no matter what happend, tell the mq-server to drop this msg.
347
348         self.acknowledge_message(basic_deliver.delivery_tag)
349
350     def acknowledge_message(self, delivery_tag):
351         """Acknowledge the message delivery from RabbitMQ by sending a
352         Basic.Ack RPC method for the delivery tag.
353
354         :param int delivery_tag: The delivery tag from the Basic.Deliver frame
355
356         """
357         LOGGER.info('Acknowledging message %s', delivery_tag)
358         self._channel.basic_ack(delivery_tag)
359
360     def stop_consuming(self):
361         """Tell RabbitMQ that you would like to stop consuming by sending the
362         Basic.Cancel RPC command.
363
364         """
365         if self._channel:
366             LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
367             self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
368
369     def on_cancelok(self, unused_frame):
370         """This method is invoked by pika when RabbitMQ acknowledges the
371         cancellation of a consumer. At this point we will close the channel.
372         This will invoke the on_channel_closed method once the channel has been
373         closed, which will in-turn close the connection.
374
375         :param pika.frame.Method unused_frame: The Basic.CancelOk frame
376
377         """
378         LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
379         self.close_channel()
380
381     def close_channel(self):
382         """Call to close the channel with RabbitMQ cleanly by issuing the
383         Channel.Close RPC command.
384
385         """
386         LOGGER.info('Closing the channel')
387         self._channel.close()
388
389     def run(self):
390         """Run the example consumer by connecting to RabbitMQ and then
391         starting the IOLoop to block and allow the SelectConnection to operate.
392
393         """
394         try:
395             self._connection = self.connect()
396         except Exception as e:
397             LOGGER.error(message.dumpstrace())
398         self._connection.ioloop.start()
399
400     def stop(self):
401         """Cleanly shutdown the connection to RabbitMQ by stopping the consumer
402         with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
403         will be invoked by pika, which will then closing the channel and
404         connection. The IOLoop is started again because this method is invoked
405         when CTRL-C is pressed raising a KeyboardInterrupt exception. This
406         exception stops the IOLoop which needs to be running for pika to
407         communicate with RabbitMQ. All of the commands issued prior to starting
408         the IOLoop will be buffered but not processed.
409
410         """
411         LOGGER.info('Stopping')
412         self._closing = True
413         self.stop_consuming()
414         self._connection.ioloop.stop()
415         self.close_connection()
416         LOGGER.info('Stopped')
417
418     def close_connection(self):
419         """This method closes the connection to RabbitMQ."""
420         LOGGER.info('Closing connection')
421         self._connection.close()