1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
16 from vstf.common import message
17 from vstf.rpc_frame_work import constant
19 LOGGER = logging.getLogger(__name__)
22 class VstfConsumer(object):
23 """This is an example consumer that will handle unexpected interactions
24 with RabbitMQ such as channel and connection closures.
26 If RabbitMQ closes the connection, it will reopen it. You should
27 look at the output, as there are limited reasons why the connection may
28 be closed, which usually are tied to permission related issues or
31 If the channel is closed, it will indicate a problem with one of the
32 commands that were issued and that should surface in the output as well.
36 def __init__(self, agent,
42 """Create a new instance of the consumer class, passing in the AMQP
43 URL used to connect to RabbitMQ.
45 :param str user: The user name of RabbitMQ server
46 :param str passwd: The passwd of RabbitMQ server
47 :param str host: The ip of RabbitMQ server
48 :param str port: connection's port
51 self._connection = None
54 self._consumer_tag = None
59 self.agent_id = agent_id
60 self.url = 'amqp://' + self.user + ':' + self.passwd + \
61 '@' + self.srv + ':' + self.port + '/%2F'
63 # load the agent_funcs
65 self.agent_ops = stevedore.driver.DriverManager(
66 namespace="agent.plugins",
69 except Exception as e:
70 LOGGER.error(message.dumpstrace())
73 super(VstfConsumer, self).__init__()
76 """This method connects to RabbitMQ, returning the connection handle.
77 When the connection is established, the on_connection_open method
78 will be invoked by pika.
80 :rtype: pika.SelectConnection
83 LOGGER.info('Connecting to %s:%s', self.srv, self.port)
84 return pika.SelectConnection(pika.URLParameters(self.url),
85 self.on_connection_open,
86 stop_ioloop_on_close=False)
88 # return pika.SelectConnection(pika.ConnectionParameters(host="%s:%s" %(self.srv,self.port)),
89 # self.on_connection_open,
90 # stop_ioloop_on_close=False)
92 def on_connection_open(self, unused_connection):
93 """This method is called by pika once the connection to RabbitMQ has
94 been established. It passes the handle to the connection object in
95 case we need it, but in this case, we'll just mark it unused.
97 :type unused_connection: pika.SelectConnection
100 LOGGER.info('Connection opened')
101 self.add_on_connection_close_callback()
104 def add_on_connection_close_callback(self):
105 """This method adds an on close callback that will be invoked by pika
106 when RabbitMQ closes the connection to the publisher unexpectedly.
109 LOGGER.info('Adding connection close callback')
110 self._connection.add_on_close_callback(self.on_connection_closed)
112 def on_connection_closed(self, connection, reply_code, reply_text):
113 """This method is invoked by pika when the connection to RabbitMQ is
114 closed unexpectedly. Since it is unexpected, we will reconnect to
115 RabbitMQ if it disconnects.
117 :param pika.connection.Connection connection: The closed connection obj
118 :param int reply_code: The server provided reply_code if given
119 :param str reply_text: The server provided reply_text if given
124 self._connection.ioloop.stop()
127 'Connection closed, reopening in 2 seconds: (%s) %s',
130 self._connection.add_timeout(2, self.reconnect)
133 """Will be invoked by the IOLoop timer if the connection is
134 closed. See the on_connection_closed method.
137 # This is the old connection IOLoop instance, stop its ioloop
138 # Sometimes the broken connection may be exception
140 self._connection.ioloop.stop()
144 while not self._closing:
145 # Create a new connection
147 self._connection = self.connect()
149 LOGGER.error(message.dumpstrace())
154 # There is now a new connection, needs a new ioloop to run
155 self._connection.ioloop.start()
157 def open_channel(self):
158 """Open a new channel with RabbitMQ by issuing the Channel.Open RPC
159 command. When RabbitMQ responds that the channel is open, the
160 on_channel_open callback will be invoked by pika.
163 LOGGER.info('Creating a new channel')
164 self._connection.channel(on_open_callback=self.on_channel_open)
166 def on_channel_open(self, channel):
167 """This method is invoked by pika when the channel has been opened.
168 The channel object is passed in so we can make use of it.
170 Since the channel is now open, we'll declare the exchange to use.
172 :param pika.channel.Channel channel: The channel object
175 LOGGER.info('Channel opened')
176 self._channel = channel
177 self.add_on_channel_close_callback()
178 self.setup_exchanges()
180 def add_on_channel_close_callback(self):
181 """This method tells pika to call the on_channel_closed method if
182 RabbitMQ unexpectedly closes the channel.
185 LOGGER.info('Adding channel close callback')
186 self._channel.add_on_close_callback(self.on_channel_closed)
188 def on_channel_closed(self, channel, reply_code, reply_text):
189 """Invoked by pika when RabbitMQ unexpectedly closes the channel.
190 Channels are usually closed if you attempt to do something that
191 violates the protocol, such as re-declare an exchange or queue with
192 different parameters. In this case, we'll close the connection
193 to shutdown the object.
195 :param pika.channel.Channel: The closed channel
196 :param int reply_code: The numeric reason the channel was closed
197 :param str reply_text: The text reason the channel was closed
200 LOGGER.warning('Channel %i was closed: (%s) %s',
201 channel, reply_code, reply_text)
202 self._connection.close()
204 def setup_exchanges(self):
205 """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
206 command. When it is complete, the on_exchange_declareok method will
209 :param str|unicode exchange_name: The name of the exchange to declare
213 'Declaring %s exchange %s',
216 self._channel.exchange_declare(self.on_direct_exchange_declareok,
220 # LOGGER.info('Declaring %s exchange %s', constant.FAN, constant.fan_exchange)
221 # self._channel.exchange_declare(self.on_fan_exchange_declareok,
222 # constant.fan_exchange,
225 def on_fan_exchange_declareok(self, unused_frame):
226 """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
229 :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
232 LOGGER.info('Exchange declared')
235 def on_direct_exchange_declareok(self, unused_frame):
236 queue_name = constant.queue_common + self.agent_id
237 self.setup_queue(queue_name, self.on_direct_queue_declareok)
239 def setup_queue(self, queue_name, next_ops):
240 """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
241 command. When it is complete, the on_queue_declareok method will
244 :param str|unicode queue_name: The name of the queue to declare.
247 LOGGER.info('Declaring queue %s', queue_name)
248 self._channel.queue_declare(next_ops,
252 def on_direct_queue_declareok(self, method_frame):
253 """Method invoked by pika when the Queue.Declare RPC call made in
254 setup_queue has completed. In this method we will bind the queue
255 and exchange together with the routing key by issuing the Queue.Bind
256 RPC command. When this command is complete, the on_bindok method will
259 :param pika.frame.Method method_frame: The Queue.DeclareOk frame
262 queue_name = constant.queue_common + self.agent_id
263 LOGGER.info('Binding %s to %s with %s',
264 queue_name, constant.exchange_d, queue_name)
265 self._channel.queue_bind(self.on_bindok, queue_name,
266 constant.exchange_d, queue_name)
268 def on_bindok(self, unused_frame):
269 """Invoked by pika when the Queue.Bind method has completed. At this
270 point we will start consuming messages by calling start_consuming
271 which will invoke the needed RPC commands to start the process.
273 :param pika.frame.Method unused_frame: The Queue.BindOk response frame
276 LOGGER.info('Queue bound')
277 self.start_consuming()
279 def start_consuming(self):
280 """This method sets up the consumer by first calling
281 add_on_cancel_callback so that the object is notified if RabbitMQ
282 cancels the consumer. It then issues the Basic.Consume RPC command
283 which returns the consumer tag that is used to uniquely identify the
284 consumer with RabbitMQ. We keep the value to use it when we want to
285 cancel consuming. The on_message method is passed in as a callback pika
286 will invoke when a message is fully received.
289 queue_name = constant.queue_common + self.agent_id
290 LOGGER.info('Issuing consumer related RPC commands')
291 self.add_on_cancel_callback()
292 self._consumer_tag = self._channel.basic_consume(self.on_message,
295 def add_on_cancel_callback(self):
296 """Add a callback that will be invoked if RabbitMQ cancels the consumer
297 for some reason. If RabbitMQ does cancel the consumer,
298 on_consumer_cancelled will be invoked by pika.
301 LOGGER.info('Adding consumer cancellation callback')
302 self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
304 def on_consumer_cancelled(self, method_frame):
305 """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
308 :param pika.frame.Method method_frame: The Basic.Cancel frame
311 LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
314 self._channel.close()
316 def on_message(self, respone_chanl, basic_deliver, properties, body):
317 """Invoked by pika when a message is delivered from RabbitMQ. The
318 channel is passed for your convenience. The basic_deliver object that
319 is passed in carries the exchange, routing key, delivery tag and
320 a redelivered flag for the message. The properties passed in is an
321 instance of BasicProperties with the message properties and the body
322 is the message that was sent.
324 :param pika.channel.Channel unused_channel: The channel object
325 :param pika.Spec.Basic.Deliver: basic_deliver method
326 :param pika.Spec.BasicProperties: properties
327 :param str|unicode body: The message body
330 LOGGER.info('Received message # %s from %s: %s',
331 basic_deliver.delivery_tag, properties.app_id, body)
333 msg = message.decode(body)
334 head = message.get_context(msg)
335 main_body = message.get_body(msg)
337 LOGGER.debug("recive the msg: head:%(h)s, body:%(b)s",
341 func = getattr(self.agent_ops.driver, main_body.get('method'))
342 response = func(**main_body.get('args'))
343 except Exception as e:
344 LOGGER.error(message.dumpstrace())
345 LOGGER.error("request happend error")
346 response = {'exception': {'name': e.__class__.__name__,
347 'message': e.message,
350 response = message.add_context(response, **head)
351 LOGGER.debug("response the msg: head:%(h)s, body:%(b)s", {
352 'h': response.get('head'), 'b': response.get('body')})
354 respone_chanl.basic_publish(
355 exchange=constant.exchange_d,
356 routing_key=properties.reply_to,
357 properties=pika.BasicProperties(
358 correlation_id=properties.correlation_id),
359 body=message.encode(response))
360 # no matter what happend, tell the mq-server to drop this msg.
362 self.acknowledge_message(basic_deliver.delivery_tag)
364 def acknowledge_message(self, delivery_tag):
365 """Acknowledge the message delivery from RabbitMQ by sending a
366 Basic.Ack RPC method for the delivery tag.
368 :param int delivery_tag: The delivery tag from the Basic.Deliver frame
371 LOGGER.info('Acknowledging message %s', delivery_tag)
372 self._channel.basic_ack(delivery_tag)
374 def stop_consuming(self):
375 """Tell RabbitMQ that you would like to stop consuming by sending the
376 Basic.Cancel RPC command.
380 LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
381 self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
383 def on_cancelok(self, unused_frame):
384 """This method is invoked by pika when RabbitMQ acknowledges the
385 cancellation of a consumer. At this point we will close the channel.
386 This will invoke the on_channel_closed method once the channel has been
387 closed, which will in-turn close the connection.
389 :param pika.frame.Method unused_frame: The Basic.CancelOk frame
392 LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
395 def close_channel(self):
396 """Call to close the channel with RabbitMQ cleanly by issuing the
397 Channel.Close RPC command.
400 LOGGER.info('Closing the channel')
401 self._channel.close()
404 """Run the example consumer by connecting to RabbitMQ and then
405 starting the IOLoop to block and allow the SelectConnection to operate.
409 self._connection = self.connect()
410 except Exception as e:
411 LOGGER.error(message.dumpstrace())
412 self._connection.ioloop.start()
415 """Cleanly shutdown the connection to RabbitMQ by stopping the consumer
416 with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
417 will be invoked by pika, which will then closing the channel and
418 connection. The IOLoop is started again because this method is invoked
419 when CTRL-C is pressed raising a KeyboardInterrupt exception. This
420 exception stops the IOLoop which needs to be running for pika to
421 communicate with RabbitMQ. All of the commands issued prior to starting
422 the IOLoop will be buffered but not processed.
425 LOGGER.info('Stopping')
427 self.stop_consuming()
428 self._connection.ioloop.stop()
429 self.close_connection()
430 LOGGER.info('Stopped')
432 def close_connection(self):
433 """This method closes the connection to RabbitMQ."""
434 LOGGER.info('Closing connection')
435 self._connection.close()