Merge "Change PTL informatin in INFO"
[bottlenecks.git] / testsuites / vstf / vstf_scripts / vstf / rpc_frame_work / rpc_consumer.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import logging
11 import time
12
13 import stevedore
14
15 import pika
16 from vstf.common import message
17 from vstf.rpc_frame_work import constant
18
19 LOGGER = logging.getLogger(__name__)
20
21
22 class VstfConsumer(object):
23     """This is an example consumer that will handle unexpected interactions
24     with RabbitMQ such as channel and connection closures.
25
26     If RabbitMQ closes the connection, it will reopen it. You should
27     look at the output, as there are limited reasons why the connection may
28     be closed, which usually are tied to permission related issues or
29     socket timeouts.
30
31     If the channel is closed, it will indicate a problem with one of the
32     commands that were issued and that should surface in the output as well.
33
34     """
35
36     def __init__(self, agent,
37                  user='guest',
38                  passwd='guest',
39                  host='localhost',
40                  port='5672',
41                  agent_id="agent"):
42         """Create a new instance of the consumer class, passing in the AMQP
43         URL used to connect to RabbitMQ.
44
45         :param str    user: The user name of RabbitMQ server
46         :param str    passwd: The passwd of RabbitMQ server
47         :param str    host: The ip of RabbitMQ server
48         :param str    port: connection's port
49
50         """
51         self._connection = None
52         self._channel = None
53         self._closing = False
54         self._consumer_tag = None
55         self.user = user
56         self.passwd = passwd
57         self.srv = host
58         self.port = port
59         self.agent_id = agent_id
60         self.url = 'amqp://' + self.user + ':' + self.passwd + \
61             '@' + self.srv + ':' + self.port + '/%2F'
62
63         # load the agent_funcs
64         try:
65             self.agent_ops = stevedore.driver.DriverManager(
66                 namespace="agent.plugins",
67                 name=agent,
68                 invoke_on_load=True)
69         except Exception as e:
70             LOGGER.error(message.dumpstrace())
71             raise e
72
73         super(VstfConsumer, self).__init__()
74
75     def connect(self):
76         """This method connects to RabbitMQ, returning the connection handle.
77         When the connection is established, the on_connection_open method
78         will be invoked by pika.
79
80         :rtype: pika.SelectConnection
81
82         """
83         LOGGER.info('Connecting to %s:%s', self.srv, self.port)
84         return pika.SelectConnection(pika.URLParameters(self.url),
85                                      self.on_connection_open,
86                                      stop_ioloop_on_close=False)
87
88     #         return pika.SelectConnection(pika.ConnectionParameters(host="%s:%s" %(self.srv,self.port)),
89     #                                      self.on_connection_open,
90     #                                      stop_ioloop_on_close=False)
91
92     def on_connection_open(self, unused_connection):
93         """This method is called by pika once the connection to RabbitMQ has
94         been established. It passes the handle to the connection object in
95         case we need it, but in this case, we'll just mark it unused.
96
97         :type unused_connection: pika.SelectConnection
98
99         """
100         LOGGER.info('Connection opened')
101         self.add_on_connection_close_callback()
102         self.open_channel()
103
104     def add_on_connection_close_callback(self):
105         """This method adds an on close callback that will be invoked by pika
106         when RabbitMQ closes the connection to the publisher unexpectedly.
107
108         """
109         LOGGER.info('Adding connection close callback')
110         self._connection.add_on_close_callback(self.on_connection_closed)
111
112     def on_connection_closed(self, connection, reply_code, reply_text):
113         """This method is invoked by pika when the connection to RabbitMQ is
114         closed unexpectedly. Since it is unexpected, we will reconnect to
115         RabbitMQ if it disconnects.
116
117         :param pika.connection.Connection connection: The closed connection obj
118         :param int reply_code: The server provided reply_code if given
119         :param str reply_text: The server provided reply_text if given
120
121         """
122         self._channel = None
123         if self._closing:
124             self._connection.ioloop.stop()
125         else:
126             LOGGER.warning(
127                 'Connection closed, reopening in 2 seconds: (%s) %s',
128                 reply_code,
129                 reply_text)
130             self._connection.add_timeout(2, self.reconnect)
131
132     def reconnect(self):
133         """Will be invoked by the IOLoop timer if the connection is
134         closed. See the on_connection_closed method.
135
136         """
137         # This is the old connection IOLoop instance, stop its ioloop
138         # Sometimes the broken connection may be exception
139         try:
140             self._connection.ioloop.stop()
141         except Exception:
142             pass
143
144         while not self._closing:
145             # Create a new connection
146             try:
147                 self._connection = self.connect()
148             except Exception:
149                 LOGGER.error(message.dumpstrace())
150                 time.sleep(3)
151                 continue
152             break
153
154         # There is now a new connection, needs a new ioloop to run
155         self._connection.ioloop.start()
156
157     def open_channel(self):
158         """Open a new channel with RabbitMQ by issuing the Channel.Open RPC
159         command. When RabbitMQ responds that the channel is open, the
160         on_channel_open callback will be invoked by pika.
161
162         """
163         LOGGER.info('Creating a new channel')
164         self._connection.channel(on_open_callback=self.on_channel_open)
165
166     def on_channel_open(self, channel):
167         """This method is invoked by pika when the channel has been opened.
168         The channel object is passed in so we can make use of it.
169
170         Since the channel is now open, we'll declare the exchange to use.
171
172         :param pika.channel.Channel channel: The channel object
173
174         """
175         LOGGER.info('Channel opened')
176         self._channel = channel
177         self.add_on_channel_close_callback()
178         self.setup_exchanges()
179
180     def add_on_channel_close_callback(self):
181         """This method tells pika to call the on_channel_closed method if
182         RabbitMQ unexpectedly closes the channel.
183
184         """
185         LOGGER.info('Adding channel close callback')
186         self._channel.add_on_close_callback(self.on_channel_closed)
187
188     def on_channel_closed(self, channel, reply_code, reply_text):
189         """Invoked by pika when RabbitMQ unexpectedly closes the channel.
190         Channels are usually closed if you attempt to do something that
191         violates the protocol, such as re-declare an exchange or queue with
192         different parameters. In this case, we'll close the connection
193         to shutdown the object.
194
195         :param pika.channel.Channel: The closed channel
196         :param int reply_code: The numeric reason the channel was closed
197         :param str reply_text: The text reason the channel was closed
198
199         """
200         LOGGER.warning('Channel %i was closed: (%s) %s',
201                        channel, reply_code, reply_text)
202         self._connection.close()
203
204     def setup_exchanges(self):
205         """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
206         command. When it is complete, the on_exchange_declareok method will
207         be invoked by pika.
208
209         :param str|unicode exchange_name: The name of the exchange to declare
210
211         """
212         LOGGER.info(
213             'Declaring %s exchange %s',
214             constant.DIRECT,
215             constant.exchange_d)
216         self._channel.exchange_declare(self.on_direct_exchange_declareok,
217                                        constant.exchange_d,
218                                        constant.DIRECT)
219
220     #         LOGGER.info('Declaring %s exchange %s', constant.FAN, constant.fan_exchange)
221     #         self._channel.exchange_declare(self.on_fan_exchange_declareok,
222     #                                        constant.fan_exchange,
223     #                                        constant.FAN)
224
225     def on_fan_exchange_declareok(self, unused_frame):
226         """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
227         command.
228
229         :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
230
231         """
232         LOGGER.info('Exchange declared')
233         pass
234
235     def on_direct_exchange_declareok(self, unused_frame):
236         queue_name = constant.queue_common + self.agent_id
237         self.setup_queue(queue_name, self.on_direct_queue_declareok)
238
239     def setup_queue(self, queue_name, next_ops):
240         """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
241         command. When it is complete, the on_queue_declareok method will
242         be invoked by pika.
243
244         :param str|unicode queue_name: The name of the queue to declare.
245
246         """
247         LOGGER.info('Declaring queue %s', queue_name)
248         self._channel.queue_declare(next_ops,
249                                     queue=queue_name,
250                                     exclusive=True)
251
252     def on_direct_queue_declareok(self, method_frame):
253         """Method invoked by pika when the Queue.Declare RPC call made in
254         setup_queue has completed. In this method we will bind the queue
255         and exchange together with the routing key by issuing the Queue.Bind
256         RPC command. When this command is complete, the on_bindok method will
257         be invoked by pika.
258
259         :param pika.frame.Method method_frame: The Queue.DeclareOk frame
260
261         """
262         queue_name = constant.queue_common + self.agent_id
263         LOGGER.info('Binding %s to %s with %s',
264                     queue_name, constant.exchange_d, queue_name)
265         self._channel.queue_bind(self.on_bindok, queue_name,
266                                  constant.exchange_d, queue_name)
267
268     def on_bindok(self, unused_frame):
269         """Invoked by pika when the Queue.Bind method has completed. At this
270         point we will start consuming messages by calling start_consuming
271         which will invoke the needed RPC commands to start the process.
272
273         :param pika.frame.Method unused_frame: The Queue.BindOk response frame
274
275         """
276         LOGGER.info('Queue bound')
277         self.start_consuming()
278
279     def start_consuming(self):
280         """This method sets up the consumer by first calling
281         add_on_cancel_callback so that the object is notified if RabbitMQ
282         cancels the consumer. It then issues the Basic.Consume RPC command
283         which returns the consumer tag that is used to uniquely identify the
284         consumer with RabbitMQ. We keep the value to use it when we want to
285         cancel consuming. The on_message method is passed in as a callback pika
286         will invoke when a message is fully received.
287
288         """
289         queue_name = constant.queue_common + self.agent_id
290         LOGGER.info('Issuing consumer related RPC commands')
291         self.add_on_cancel_callback()
292         self._consumer_tag = self._channel.basic_consume(self.on_message,
293                                                          queue_name)
294
295     def add_on_cancel_callback(self):
296         """Add a callback that will be invoked if RabbitMQ cancels the consumer
297         for some reason. If RabbitMQ does cancel the consumer,
298         on_consumer_cancelled will be invoked by pika.
299
300         """
301         LOGGER.info('Adding consumer cancellation callback')
302         self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
303
304     def on_consumer_cancelled(self, method_frame):
305         """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
306         receiving messages.
307
308         :param pika.frame.Method method_frame: The Basic.Cancel frame
309
310         """
311         LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
312                     method_frame)
313         if self._channel:
314             self._channel.close()
315
316     def on_message(self, respone_chanl, basic_deliver, properties, body):
317         """Invoked by pika when a message is delivered from RabbitMQ. The
318         channel is passed for your convenience. The basic_deliver object that
319         is passed in carries the exchange, routing key, delivery tag and
320         a redelivered flag for the message. The properties passed in is an
321         instance of BasicProperties with the message properties and the body
322         is the message that was sent.
323
324         :param pika.channel.Channel unused_channel: The channel object
325         :param pika.Spec.Basic.Deliver: basic_deliver method
326         :param pika.Spec.BasicProperties: properties
327         :param str|unicode body: The message body
328
329         """
330         LOGGER.info('Received message # %s from %s: %s',
331                     basic_deliver.delivery_tag, properties.app_id, body)
332         try:
333             msg = message.decode(body)
334             head = message.get_context(msg)
335             main_body = message.get_body(msg)
336
337             LOGGER.debug("recive the msg: head:%(h)s, body:%(b)s",
338                          {'h': head,
339                           'b': main_body})
340
341             func = getattr(self.agent_ops.driver, main_body.get('method'))
342             response = func(**main_body.get('args'))
343         except Exception as e:
344             LOGGER.error(message.dumpstrace())
345             LOGGER.error("request happend error")
346             response = {'exception': {'name': e.__class__.__name__,
347                                       'message': e.message,
348                                       'args': e.args}}
349         finally:
350             response = message.add_context(response, **head)
351             LOGGER.debug("response the msg: head:%(h)s, body:%(b)s", {
352                          'h': response.get('head'), 'b': response.get('body')})
353
354             respone_chanl.basic_publish(
355                 exchange=constant.exchange_d,
356                 routing_key=properties.reply_to,
357                 properties=pika.BasicProperties(
358                     correlation_id=properties.correlation_id),
359                 body=message.encode(response))
360             # no matter what happend, tell the mq-server to drop this msg.
361
362         self.acknowledge_message(basic_deliver.delivery_tag)
363
364     def acknowledge_message(self, delivery_tag):
365         """Acknowledge the message delivery from RabbitMQ by sending a
366         Basic.Ack RPC method for the delivery tag.
367
368         :param int delivery_tag: The delivery tag from the Basic.Deliver frame
369
370         """
371         LOGGER.info('Acknowledging message %s', delivery_tag)
372         self._channel.basic_ack(delivery_tag)
373
374     def stop_consuming(self):
375         """Tell RabbitMQ that you would like to stop consuming by sending the
376         Basic.Cancel RPC command.
377
378         """
379         if self._channel:
380             LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
381             self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
382
383     def on_cancelok(self, unused_frame):
384         """This method is invoked by pika when RabbitMQ acknowledges the
385         cancellation of a consumer. At this point we will close the channel.
386         This will invoke the on_channel_closed method once the channel has been
387         closed, which will in-turn close the connection.
388
389         :param pika.frame.Method unused_frame: The Basic.CancelOk frame
390
391         """
392         LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
393         self.close_channel()
394
395     def close_channel(self):
396         """Call to close the channel with RabbitMQ cleanly by issuing the
397         Channel.Close RPC command.
398
399         """
400         LOGGER.info('Closing the channel')
401         self._channel.close()
402
403     def run(self):
404         """Run the example consumer by connecting to RabbitMQ and then
405         starting the IOLoop to block and allow the SelectConnection to operate.
406
407         """
408         try:
409             self._connection = self.connect()
410         except Exception as e:
411             LOGGER.error(message.dumpstrace())
412         self._connection.ioloop.start()
413
414     def stop(self):
415         """Cleanly shutdown the connection to RabbitMQ by stopping the consumer
416         with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
417         will be invoked by pika, which will then closing the channel and
418         connection. The IOLoop is started again because this method is invoked
419         when CTRL-C is pressed raising a KeyboardInterrupt exception. This
420         exception stops the IOLoop which needs to be running for pika to
421         communicate with RabbitMQ. All of the commands issued prior to starting
422         the IOLoop will be buffered but not processed.
423
424         """
425         LOGGER.info('Stopping')
426         self._closing = True
427         self.stop_consuming()
428         self._connection.ioloop.stop()
429         self.close_connection()
430         LOGGER.info('Stopped')
431
432     def close_connection(self):
433         """This method closes the connection to RabbitMQ."""
434         LOGGER.info('Closing connection')
435         self._connection.close()