1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
16 from vstf.common import message
17 from vstf.rpc_frame_work import constant
19 LOGGER = logging.getLogger(__name__)
22 class VstfConsumer(object):
23 """This is an example consumer that will handle unexpected interactions
24 with RabbitMQ such as channel and connection closures.
26 If RabbitMQ closes the connection, it will reopen it. You should
27 look at the output, as there are limited reasons why the connection may
28 be closed, which usually are tied to permission related issues or
31 If the channel is closed, it will indicate a problem with one of the
32 commands that were issued and that should surface in the output as well.
36 def __init__(self, agent,
42 """Create a new instance of the consumer class, passing in the AMQP
43 URL used to connect to RabbitMQ.
45 :param str user: The user name of RabbitMQ server
46 :param str passwd: The passwd of RabbitMQ server
47 :param str host: The ip of RabbitMQ server
48 :param str port: connection's port
51 self._connection = None
54 self._consumer_tag = None
59 self.agent_id = agent_id
60 self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
62 # load the agent_funcs
64 self.agent_ops = stevedore.driver.DriverManager(
65 namespace="agent.plugins",
68 except Exception as e:
69 LOGGER.error(message.dumpstrace())
72 super(VstfConsumer, self).__init__()
75 """This method connects to RabbitMQ, returning the connection handle.
76 When the connection is established, the on_connection_open method
77 will be invoked by pika.
79 :rtype: pika.SelectConnection
82 LOGGER.info('Connecting to %s:%s', self.srv, self.port)
83 return pika.SelectConnection(pika.URLParameters(self.url),
84 self.on_connection_open,
85 stop_ioloop_on_close=False)
87 # return pika.SelectConnection(pika.ConnectionParameters(host="%s:%s" %(self.srv,self.port)),
88 # self.on_connection_open,
89 # stop_ioloop_on_close=False)
91 def on_connection_open(self, unused_connection):
92 """This method is called by pika once the connection to RabbitMQ has
93 been established. It passes the handle to the connection object in
94 case we need it, but in this case, we'll just mark it unused.
96 :type unused_connection: pika.SelectConnection
99 LOGGER.info('Connection opened')
100 self.add_on_connection_close_callback()
103 def add_on_connection_close_callback(self):
104 """This method adds an on close callback that will be invoked by pika
105 when RabbitMQ closes the connection to the publisher unexpectedly.
108 LOGGER.info('Adding connection close callback')
109 self._connection.add_on_close_callback(self.on_connection_closed)
111 def on_connection_closed(self, connection, reply_code, reply_text):
112 """This method is invoked by pika when the connection to RabbitMQ is
113 closed unexpectedly. Since it is unexpected, we will reconnect to
114 RabbitMQ if it disconnects.
116 :param pika.connection.Connection connection: The closed connection obj
117 :param int reply_code: The server provided reply_code if given
118 :param str reply_text: The server provided reply_text if given
123 self._connection.ioloop.stop()
125 LOGGER.warning('Connection closed, reopening in 2 seconds: (%s) %s',
126 reply_code, reply_text)
127 self._connection.add_timeout(2, self.reconnect)
130 """Will be invoked by the IOLoop timer if the connection is
131 closed. See the on_connection_closed method.
134 # This is the old connection IOLoop instance, stop its ioloop
135 # Sometimes the broken connection may be exception
137 self._connection.ioloop.stop()
141 while not self._closing:
142 # Create a new connection
144 self._connection = self.connect()
146 LOGGER.error(message.dumpstrace())
151 # There is now a new connection, needs a new ioloop to run
152 self._connection.ioloop.start()
154 def open_channel(self):
155 """Open a new channel with RabbitMQ by issuing the Channel.Open RPC
156 command. When RabbitMQ responds that the channel is open, the
157 on_channel_open callback will be invoked by pika.
160 LOGGER.info('Creating a new channel')
161 self._connection.channel(on_open_callback=self.on_channel_open)
163 def on_channel_open(self, channel):
164 """This method is invoked by pika when the channel has been opened.
165 The channel object is passed in so we can make use of it.
167 Since the channel is now open, we'll declare the exchange to use.
169 :param pika.channel.Channel channel: The channel object
172 LOGGER.info('Channel opened')
173 self._channel = channel
174 self.add_on_channel_close_callback()
175 self.setup_exchanges()
177 def add_on_channel_close_callback(self):
178 """This method tells pika to call the on_channel_closed method if
179 RabbitMQ unexpectedly closes the channel.
182 LOGGER.info('Adding channel close callback')
183 self._channel.add_on_close_callback(self.on_channel_closed)
185 def on_channel_closed(self, channel, reply_code, reply_text):
186 """Invoked by pika when RabbitMQ unexpectedly closes the channel.
187 Channels are usually closed if you attempt to do something that
188 violates the protocol, such as re-declare an exchange or queue with
189 different parameters. In this case, we'll close the connection
190 to shutdown the object.
192 :param pika.channel.Channel: The closed channel
193 :param int reply_code: The numeric reason the channel was closed
194 :param str reply_text: The text reason the channel was closed
197 LOGGER.warning('Channel %i was closed: (%s) %s',
198 channel, reply_code, reply_text)
199 self._connection.close()
201 def setup_exchanges(self):
202 """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
203 command. When it is complete, the on_exchange_declareok method will
206 :param str|unicode exchange_name: The name of the exchange to declare
209 LOGGER.info('Declaring %s exchange %s', constant.DIRECT, constant.exchange_d)
210 self._channel.exchange_declare(self.on_direct_exchange_declareok,
214 # LOGGER.info('Declaring %s exchange %s', constant.FAN, constant.fan_exchange)
215 # self._channel.exchange_declare(self.on_fan_exchange_declareok,
216 # constant.fan_exchange,
219 def on_fan_exchange_declareok(self, unused_frame):
220 """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
223 :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
226 LOGGER.info('Exchange declared')
229 def on_direct_exchange_declareok(self, unused_frame):
230 queue_name = constant.queue_common + self.agent_id
231 self.setup_queue(queue_name, self.on_direct_queue_declareok)
233 def setup_queue(self, queue_name, next_ops):
234 """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
235 command. When it is complete, the on_queue_declareok method will
238 :param str|unicode queue_name: The name of the queue to declare.
241 LOGGER.info('Declaring queue %s', queue_name)
242 self._channel.queue_declare(next_ops,
246 def on_direct_queue_declareok(self, method_frame):
247 """Method invoked by pika when the Queue.Declare RPC call made in
248 setup_queue has completed. In this method we will bind the queue
249 and exchange together with the routing key by issuing the Queue.Bind
250 RPC command. When this command is complete, the on_bindok method will
253 :param pika.frame.Method method_frame: The Queue.DeclareOk frame
256 queue_name = constant.queue_common + self.agent_id
257 LOGGER.info('Binding %s to %s with %s',
258 queue_name, constant.exchange_d, queue_name)
259 self._channel.queue_bind(self.on_bindok, queue_name,
260 constant.exchange_d, queue_name)
262 def on_bindok(self, unused_frame):
263 """Invoked by pika when the Queue.Bind method has completed. At this
264 point we will start consuming messages by calling start_consuming
265 which will invoke the needed RPC commands to start the process.
267 :param pika.frame.Method unused_frame: The Queue.BindOk response frame
270 LOGGER.info('Queue bound')
271 self.start_consuming()
273 def start_consuming(self):
274 """This method sets up the consumer by first calling
275 add_on_cancel_callback so that the object is notified if RabbitMQ
276 cancels the consumer. It then issues the Basic.Consume RPC command
277 which returns the consumer tag that is used to uniquely identify the
278 consumer with RabbitMQ. We keep the value to use it when we want to
279 cancel consuming. The on_message method is passed in as a callback pika
280 will invoke when a message is fully received.
283 queue_name = constant.queue_common + self.agent_id
284 LOGGER.info('Issuing consumer related RPC commands')
285 self.add_on_cancel_callback()
286 self._consumer_tag = self._channel.basic_consume(self.on_message,
289 def add_on_cancel_callback(self):
290 """Add a callback that will be invoked if RabbitMQ cancels the consumer
291 for some reason. If RabbitMQ does cancel the consumer,
292 on_consumer_cancelled will be invoked by pika.
295 LOGGER.info('Adding consumer cancellation callback')
296 self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
298 def on_consumer_cancelled(self, method_frame):
299 """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
302 :param pika.frame.Method method_frame: The Basic.Cancel frame
305 LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
308 self._channel.close()
310 def on_message(self, respone_chanl, basic_deliver, properties, body):
311 """Invoked by pika when a message is delivered from RabbitMQ. The
312 channel is passed for your convenience. The basic_deliver object that
313 is passed in carries the exchange, routing key, delivery tag and
314 a redelivered flag for the message. The properties passed in is an
315 instance of BasicProperties with the message properties and the body
316 is the message that was sent.
318 :param pika.channel.Channel unused_channel: The channel object
319 :param pika.Spec.Basic.Deliver: basic_deliver method
320 :param pika.Spec.BasicProperties: properties
321 :param str|unicode body: The message body
324 LOGGER.info('Received message # %s from %s: %s',
325 basic_deliver.delivery_tag, properties.app_id, body)
327 msg = message.decode(body)
328 head = message.get_context(msg)
329 main_body = message.get_body(msg)
331 LOGGER.debug("recive the msg: head:%(h)s, body:%(b)s",
335 func = getattr(self.agent_ops.driver, main_body.get('method'))
336 response = func(**main_body.get('args'))
337 except Exception as e:
338 LOGGER.error(message.dumpstrace())
339 LOGGER.error("request happend error")
340 response = {'exception': {'name': e.__class__.__name__,
341 'message': e.message,
344 response = message.add_context(response, **head)
345 LOGGER.debug("response the msg: head:%(h)s, body:%(b)s",
346 {'h': response.get('head'), 'b': response.get('body')})
348 respone_chanl.basic_publish(exchange=constant.exchange_d,
349 routing_key=properties.reply_to,
350 properties=pika.BasicProperties(correlation_id=properties.correlation_id),
351 body=message.encode(response)
353 # no matter what happend, tell the mq-server to drop this msg.
355 self.acknowledge_message(basic_deliver.delivery_tag)
357 def acknowledge_message(self, delivery_tag):
358 """Acknowledge the message delivery from RabbitMQ by sending a
359 Basic.Ack RPC method for the delivery tag.
361 :param int delivery_tag: The delivery tag from the Basic.Deliver frame
364 LOGGER.info('Acknowledging message %s', delivery_tag)
365 self._channel.basic_ack(delivery_tag)
367 def stop_consuming(self):
368 """Tell RabbitMQ that you would like to stop consuming by sending the
369 Basic.Cancel RPC command.
373 LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
374 self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
376 def on_cancelok(self, unused_frame):
377 """This method is invoked by pika when RabbitMQ acknowledges the
378 cancellation of a consumer. At this point we will close the channel.
379 This will invoke the on_channel_closed method once the channel has been
380 closed, which will in-turn close the connection.
382 :param pika.frame.Method unused_frame: The Basic.CancelOk frame
385 LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
388 def close_channel(self):
389 """Call to close the channel with RabbitMQ cleanly by issuing the
390 Channel.Close RPC command.
393 LOGGER.info('Closing the channel')
394 self._channel.close()
397 """Run the example consumer by connecting to RabbitMQ and then
398 starting the IOLoop to block and allow the SelectConnection to operate.
402 self._connection = self.connect()
403 except Exception as e:
404 LOGGER.error(message.dumpstrace())
405 self._connection.ioloop.start()
408 """Cleanly shutdown the connection to RabbitMQ by stopping the consumer
409 with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
410 will be invoked by pika, which will then closing the channel and
411 connection. The IOLoop is started again because this method is invoked
412 when CTRL-C is pressed raising a KeyboardInterrupt exception. This
413 exception stops the IOLoop which needs to be running for pika to
414 communicate with RabbitMQ. All of the commands issued prior to starting
415 the IOLoop will be buffered but not processed.
418 LOGGER.info('Stopping')
420 self.stop_consuming()
421 self._connection.ioloop.stop()
422 self.close_connection()
423 LOGGER.info('Stopped')
425 def close_connection(self):
426 """This method closes the connection to RabbitMQ."""
427 LOGGER.info('Closing connection')
428 self._connection.close()