Upload the contribution of vstf as bottleneck network framework.
[bottlenecks.git] / vstf / vstf / rpc_frame_work / rpc_consumer.py
diff --git a/vstf/vstf/rpc_frame_work/rpc_consumer.py b/vstf/vstf/rpc_frame_work/rpc_consumer.py
new file mode 100755 (executable)
index 0000000..f7aacfd
--- /dev/null
@@ -0,0 +1,421 @@
+#!/usr/bin/env python
+# coding=utf-8
+import logging
+import time
+
+import stevedore
+
+import pika
+from vstf.common import message
+from vstf.rpc_frame_work import constant
+
+LOGGER = logging.getLogger(__name__)
+
+
+class VstfConsumer(object):
+    """This is an example consumer that will handle unexpected interactions
+    with RabbitMQ such as channel and connection closures.
+
+    If RabbitMQ closes the connection, it will reopen it. You should
+    look at the output, as there are limited reasons why the connection may
+    be closed, which usually are tied to permission related issues or
+    socket timeouts.
+
+    If the channel is closed, it will indicate a problem with one of the
+    commands that were issued and that should surface in the output as well.
+
+    """
+
+    def __init__(self, agent,
+                 user='guest',
+                 passwd='guest',
+                 host='localhost',
+                 port='5672',
+                 agent_id="agent"):
+        """Create a new instance of the consumer class, passing in the AMQP
+        URL used to connect to RabbitMQ.
+
+        :param str    user: The user name of RabbitMQ server
+        :param str    passwd: The passwd of RabbitMQ server
+        :param str    host: The ip of RabbitMQ server
+        :param str    port: connection's port
+
+        """
+        self._connection = None
+        self._channel = None
+        self._closing = False
+        self._consumer_tag = None
+        self.user = user
+        self.passwd = passwd
+        self.srv = host
+        self.port = port
+        self.agent_id = agent_id
+        self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
+
+        # load the agent_funcs
+        try:
+            self.agent_ops = stevedore.driver.DriverManager(
+                namespace="agent.plugins",
+                name=agent,
+                invoke_on_load=True)
+        except Exception as e:
+            LOGGER.error(message.dumpstrace())
+            raise e
+
+        super(VstfConsumer, self).__init__()
+
+    def connect(self):
+        """This method connects to RabbitMQ, returning the connection handle.
+        When the connection is established, the on_connection_open method
+        will be invoked by pika.
+
+        :rtype: pika.SelectConnection
+
+        """
+        LOGGER.info('Connecting to %s:%s', self.srv, self.port)
+        return pika.SelectConnection(pika.URLParameters(self.url),
+                                     self.on_connection_open,
+                                     stop_ioloop_on_close=False)
+
+    #         return pika.SelectConnection(pika.ConnectionParameters(host="%s:%s" %(self.srv,self.port)),
+    #                                      self.on_connection_open,
+    #                                      stop_ioloop_on_close=False)
+
+    def on_connection_open(self, unused_connection):
+        """This method is called by pika once the connection to RabbitMQ has
+        been established. It passes the handle to the connection object in
+        case we need it, but in this case, we'll just mark it unused.
+
+        :type unused_connection: pika.SelectConnection
+
+        """
+        LOGGER.info('Connection opened')
+        self.add_on_connection_close_callback()
+        self.open_channel()
+
+    def add_on_connection_close_callback(self):
+        """This method adds an on close callback that will be invoked by pika
+        when RabbitMQ closes the connection to the publisher unexpectedly.
+
+        """
+        LOGGER.info('Adding connection close callback')
+        self._connection.add_on_close_callback(self.on_connection_closed)
+
+    def on_connection_closed(self, connection, reply_code, reply_text):
+        """This method is invoked by pika when the connection to RabbitMQ is
+        closed unexpectedly. Since it is unexpected, we will reconnect to
+        RabbitMQ if it disconnects.
+
+        :param pika.connection.Connection connection: The closed connection obj
+        :param int reply_code: The server provided reply_code if given
+        :param str reply_text: The server provided reply_text if given
+
+        """
+        self._channel = None
+        if self._closing:
+            self._connection.ioloop.stop()
+        else:
+            LOGGER.warning('Connection closed, reopening in 2 seconds: (%s) %s',
+                           reply_code, reply_text)
+            self._connection.add_timeout(2, self.reconnect)
+
+    def reconnect(self):
+        """Will be invoked by the IOLoop timer if the connection is
+        closed. See the on_connection_closed method.
+
+        """
+        # This is the old connection IOLoop instance, stop its ioloop
+        # Sometimes the broken connection may be exception
+        try:
+            self._connection.ioloop.stop()
+        except Exception:
+            pass
+
+        while not self._closing:
+            # Create a new connection
+            try:
+                self._connection = self.connect()
+            except Exception:
+                LOGGER.error(message.dumpstrace())
+                time.sleep(3)
+                continue
+            break
+
+        # There is now a new connection, needs a new ioloop to run
+        self._connection.ioloop.start()
+
+    def open_channel(self):
+        """Open a new channel with RabbitMQ by issuing the Channel.Open RPC
+        command. When RabbitMQ responds that the channel is open, the
+        on_channel_open callback will be invoked by pika.
+
+        """
+        LOGGER.info('Creating a new channel')
+        self._connection.channel(on_open_callback=self.on_channel_open)
+
+    def on_channel_open(self, channel):
+        """This method is invoked by pika when the channel has been opened.
+        The channel object is passed in so we can make use of it.
+
+        Since the channel is now open, we'll declare the exchange to use.
+
+        :param pika.channel.Channel channel: The channel object
+
+        """
+        LOGGER.info('Channel opened')
+        self._channel = channel
+        self.add_on_channel_close_callback()
+        self.setup_exchanges()
+
+    def add_on_channel_close_callback(self):
+        """This method tells pika to call the on_channel_closed method if
+        RabbitMQ unexpectedly closes the channel.
+
+        """
+        LOGGER.info('Adding channel close callback')
+        self._channel.add_on_close_callback(self.on_channel_closed)
+
+    def on_channel_closed(self, channel, reply_code, reply_text):
+        """Invoked by pika when RabbitMQ unexpectedly closes the channel.
+        Channels are usually closed if you attempt to do something that
+        violates the protocol, such as re-declare an exchange or queue with
+        different parameters. In this case, we'll close the connection
+        to shutdown the object.
+
+        :param pika.channel.Channel: The closed channel
+        :param int reply_code: The numeric reason the channel was closed
+        :param str reply_text: The text reason the channel was closed
+
+        """
+        LOGGER.warning('Channel %i was closed: (%s) %s',
+                       channel, reply_code, reply_text)
+        self._connection.close()
+
+    def setup_exchanges(self):
+        """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
+        command. When it is complete, the on_exchange_declareok method will
+        be invoked by pika.
+
+        :param str|unicode exchange_name: The name of the exchange to declare
+
+        """
+        LOGGER.info('Declaring %s exchange %s', constant.DIRECT, constant.exchange_d)
+        self._channel.exchange_declare(self.on_direct_exchange_declareok,
+                                       constant.exchange_d,
+                                       constant.DIRECT)
+
+    #         LOGGER.info('Declaring %s exchange %s', constant.FAN, constant.fan_exchange)
+    #         self._channel.exchange_declare(self.on_fan_exchange_declareok,
+    #                                        constant.fan_exchange,
+    #                                        constant.FAN)
+
+    def on_fan_exchange_declareok(self, unused_frame):
+        """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
+        command.
+
+        :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
+
+        """
+        LOGGER.info('Exchange declared')
+        pass
+
+    def on_direct_exchange_declareok(self, unused_frame):
+        queue_name = constant.queue_common + self.agent_id
+        self.setup_queue(queue_name, self.on_direct_queue_declareok)
+
+    def setup_queue(self, queue_name, next_ops):
+        """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
+        command. When it is complete, the on_queue_declareok method will
+        be invoked by pika.
+
+        :param str|unicode queue_name: The name of the queue to declare.
+
+        """
+        LOGGER.info('Declaring queue %s', queue_name)
+        self._channel.queue_declare(next_ops,
+                                    queue=queue_name,
+                                    exclusive=True)
+
+    def on_direct_queue_declareok(self, method_frame):
+        """Method invoked by pika when the Queue.Declare RPC call made in
+        setup_queue has completed. In this method we will bind the queue
+        and exchange together with the routing key by issuing the Queue.Bind
+        RPC command. When this command is complete, the on_bindok method will
+        be invoked by pika.
+
+        :param pika.frame.Method method_frame: The Queue.DeclareOk frame
+
+        """
+        queue_name = constant.queue_common + self.agent_id
+        LOGGER.info('Binding %s to %s with %s',
+                    queue_name, constant.exchange_d, queue_name)
+        self._channel.queue_bind(self.on_bindok, queue_name,
+                                 constant.exchange_d, queue_name)
+
+    def on_bindok(self, unused_frame):
+        """Invoked by pika when the Queue.Bind method has completed. At this
+        point we will start consuming messages by calling start_consuming
+        which will invoke the needed RPC commands to start the process.
+
+        :param pika.frame.Method unused_frame: The Queue.BindOk response frame
+
+        """
+        LOGGER.info('Queue bound')
+        self.start_consuming()
+
+    def start_consuming(self):
+        """This method sets up the consumer by first calling
+        add_on_cancel_callback so that the object is notified if RabbitMQ
+        cancels the consumer. It then issues the Basic.Consume RPC command
+        which returns the consumer tag that is used to uniquely identify the
+        consumer with RabbitMQ. We keep the value to use it when we want to
+        cancel consuming. The on_message method is passed in as a callback pika
+        will invoke when a message is fully received.
+
+        """
+        queue_name = constant.queue_common + self.agent_id
+        LOGGER.info('Issuing consumer related RPC commands')
+        self.add_on_cancel_callback()
+        self._consumer_tag = self._channel.basic_consume(self.on_message,
+                                                         queue_name)
+
+    def add_on_cancel_callback(self):
+        """Add a callback that will be invoked if RabbitMQ cancels the consumer
+        for some reason. If RabbitMQ does cancel the consumer,
+        on_consumer_cancelled will be invoked by pika.
+
+        """
+        LOGGER.info('Adding consumer cancellation callback')
+        self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
+
+    def on_consumer_cancelled(self, method_frame):
+        """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
+        receiving messages.
+
+        :param pika.frame.Method method_frame: The Basic.Cancel frame
+
+        """
+        LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
+                    method_frame)
+        if self._channel:
+            self._channel.close()
+
+    def on_message(self, respone_chanl, basic_deliver, properties, body):
+        """Invoked by pika when a message is delivered from RabbitMQ. The
+        channel is passed for your convenience. The basic_deliver object that
+        is passed in carries the exchange, routing key, delivery tag and
+        a redelivered flag for the message. The properties passed in is an
+        instance of BasicProperties with the message properties and the body
+        is the message that was sent.
+
+        :param pika.channel.Channel unused_channel: The channel object
+        :param pika.Spec.Basic.Deliver: basic_deliver method
+        :param pika.Spec.BasicProperties: properties
+        :param str|unicode body: The message body
+
+        """
+        LOGGER.info('Received message # %s from %s: %s',
+                    basic_deliver.delivery_tag, properties.app_id, body)
+        try:
+            msg = message.decode(body)
+            head = message.get_context(msg)
+            main_body = message.get_body(msg)
+
+            LOGGER.debug("recive the msg: head:%(h)s, body:%(b)s",
+                         {'h': head,
+                          'b': main_body})
+
+            func = getattr(self.agent_ops.driver, main_body.get('method'))
+            response = func(**main_body.get('args'))
+        except Exception as e:
+            LOGGER.error(message.dumpstrace())
+            LOGGER.error("request happend error")
+            response = {'exception': {'name': e.__class__.__name__,
+                                      'message': e.message,
+                                      'args': e.args}}
+        finally:
+            response = message.add_context(response, **head)
+            LOGGER.debug("response the msg: head:%(h)s, body:%(b)s",
+                         {'h': response.get('head'), 'b': response.get('body')})
+
+            respone_chanl.basic_publish(exchange=constant.exchange_d,
+                                        routing_key=properties.reply_to,
+                                        properties=pika.BasicProperties(correlation_id=properties.correlation_id),
+                                        body=message.encode(response)
+                                        )
+            # no matter what happend, tell the mq-server to drop this msg.
+
+        self.acknowledge_message(basic_deliver.delivery_tag)
+
+    def acknowledge_message(self, delivery_tag):
+        """Acknowledge the message delivery from RabbitMQ by sending a
+        Basic.Ack RPC method for the delivery tag.
+
+        :param int delivery_tag: The delivery tag from the Basic.Deliver frame
+
+        """
+        LOGGER.info('Acknowledging message %s', delivery_tag)
+        self._channel.basic_ack(delivery_tag)
+
+    def stop_consuming(self):
+        """Tell RabbitMQ that you would like to stop consuming by sending the
+        Basic.Cancel RPC command.
+
+        """
+        if self._channel:
+            LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
+            self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
+
+    def on_cancelok(self, unused_frame):
+        """This method is invoked by pika when RabbitMQ acknowledges the
+        cancellation of a consumer. At this point we will close the channel.
+        This will invoke the on_channel_closed method once the channel has been
+        closed, which will in-turn close the connection.
+
+        :param pika.frame.Method unused_frame: The Basic.CancelOk frame
+
+        """
+        LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
+        self.close_channel()
+
+    def close_channel(self):
+        """Call to close the channel with RabbitMQ cleanly by issuing the
+        Channel.Close RPC command.
+
+        """
+        LOGGER.info('Closing the channel')
+        self._channel.close()
+
+    def run(self):
+        """Run the example consumer by connecting to RabbitMQ and then
+        starting the IOLoop to block and allow the SelectConnection to operate.
+
+        """
+        try:
+            self._connection = self.connect()
+        except Exception as e:
+            LOGGER.error(message.dumpstrace())
+        self._connection.ioloop.start()
+
+    def stop(self):
+        """Cleanly shutdown the connection to RabbitMQ by stopping the consumer
+        with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
+        will be invoked by pika, which will then closing the channel and
+        connection. The IOLoop is started again because this method is invoked
+        when CTRL-C is pressed raising a KeyboardInterrupt exception. This
+        exception stops the IOLoop which needs to be running for pika to
+        communicate with RabbitMQ. All of the commands issued prior to starting
+        the IOLoop will be buffered but not processed.
+
+        """
+        LOGGER.info('Stopping')
+        self._closing = True
+        self.stop_consuming()
+        self._connection.ioloop.stop()
+        self.close_connection()
+        LOGGER.info('Stopped')
+
+    def close_connection(self):
+        """This method closes the connection to RabbitMQ."""
+        LOGGER.info('Closing connection')
+        self._connection.close()