JIRA: BOTTLENECKS-29
[bottlenecks.git] / vstf / vstf / rpc_frame_work / rpc_producer.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import uuid
11 import json
12 import time
13 import exceptions
14 import logging
15
16 import pika
17 from vstf.common import message
18 from vstf.common import excepts
19 from vstf.rpc_frame_work import constant
20
21 LOG = logging.getLogger(__name__)
22
23
24 class RpcProxy(object):
25     def __init__(self, host,
26                  user='guest',
27                  passwd='guest',
28                  port='5672'):
29         """create a connection to rabbitmq,direct call and fan call supported.
30
31         """
32         # try to create connection of rabbitmq
33         self._channel = None
34         self._connection = None
35         self._queue = str(uuid.uuid4())
36         self._consume_tag = None
37
38         self.user = user
39         self.passwd = passwd
40         self.srv = host
41         self.port = port
42         self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
43         try:
44             self.connect(host, self.setup_vstf_producer)
45         except Exception as e:
46             LOG.error("create connection failed. e:%(e)s", {'e': e})
47             raise e
48
49         self.response = None
50         self.corr_id = None
51
52     def connect(self, host, ok_callback):
53         """Create a Blocking connection to the rabbitmq-server
54         
55         :param str    host: the rabbitmq-server's host
56         :param obj    ok_callback: if connect success than do this function
57         
58         """
59         LOG.info("Connect to the server %s", host)
60         self._connection = pika.BlockingConnection(pika.URLParameters(self.url))
61         if self._connection:
62             ok_callback()
63
64     def setup_vstf_producer(self):
65         self.open_channel()
66         self.create_exchange(constant.exchange_d, constant.DIRECT)
67         self.bind_queues()
68         self.start_consumer()
69
70     def open_channel(self):
71         self._channel = self._connection.channel()
72         if self._channel:
73             self._channel.confirm_delivery()
74
75     def create_exchange(self, name, type):
76         LOG.info("Create %s exchange: %s", type, name)
77         self._channel.exchange_declare(exchange=name, type=type)
78
79     def bind_queues(self):
80         LOG.info("Declare queue %s and bind it to exchange %s",
81                  self._queue, constant.exchange_d)
82         self._channel.queue_declare(queue=self._queue, exclusive=True)
83         self._channel.queue_bind(exchange=constant.exchange_d, queue=self._queue)
84
85     def start_consumer(self):
86         LOG.info("Start response consumer")
87         self._consume_tag = self._channel.basic_consume(self.on_response,
88                                                         no_ack=True,
89                                                         queue=self._queue)
90
91     def stop_consuming(self):
92         """Tell RabbitMQ that you would like to stop consuming by sending the
93         Basic.Cancel RPC command.
94
95         """
96         if self._channel:
97             LOG.info('Sending a Basic.Cancel RPC command to RabbitMQ')
98             self._channel.basic_cancel(self._consume_tag)
99
100         self.close_channel()
101
102     def close_channel(self):
103         """Call to close the channel with RabbitMQ cleanly by issuing the
104         Channel.Close RPC command.
105
106         """
107         LOG.info('Closing the channel')
108         self._channel.close()
109
110     def close_connection(self):
111         """This method closes the connection to RabbitMQ."""
112         LOG.info('Closing connection')
113         self._connection.close()
114
115     def stop(self):
116         self.stop_consuming()
117         self.close_connection()
118
119     def on_response(self, ch, method, props, body):
120         """this func reciver the msg"""
121         self.response = None
122         if self.corr_id == props.correlation_id:
123             self.response = json.loads(body)
124             LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s",
125                       {'h': self.response.get('head'), 'b': self.response.get('body')})
126         else:
127             LOG.warn("Proxy producer Drop the msg "
128                      "because of the wrong correlation id, %s\n" % body)
129
130     def publish(self, target, corrid, body):
131         properties = pika.BasicProperties(reply_to=self._queue,
132                                           correlation_id=corrid)
133         LOG.debug("start to publish message to the exchange=%s, target=%s, msg=%s"
134                   , constant.exchange_d, target, body)
135         return self._channel.basic_publish(exchange=constant.exchange_d,
136                                            routing_key=target,
137                                            mandatory=True,
138                                            properties=properties,
139                                            body=message.encode(body))
140
141     def call(self, msg, target='agent', timeout=constant.TIMEOUT):
142         """send msg to agent by id, this func will wait ack until timeout
143         :msg the msg to be sent
144         :id agent's id
145         :timeout timeout of waiting response
146
147         """
148         self.response = None
149         queue = constant.queue_common + target
150         # the msg request and respone must be match by corr_id
151         self.corr_id = str(uuid.uuid4())
152         # same msg format 
153         msg = message.add_context(msg, corrid=self.corr_id)
154
155         # send msg to the queue
156         try:
157             ret = self.publish(queue, self.corr_id, msg)
158         except Exception as e:
159             LOG.error(message.dumpstrace())
160             raise excepts.ChannelDie
161
162         # if delivery msg failed. return error
163         # clean the msg in the queue
164         if not ret:
165             LOG.error("productor message delivery failed.")
166             return "Message can not be deliveryed, please check the connection of agent."
167
168         # wait for response
169         t_begin = time.time()
170         while self.response is None:
171             self._connection.process_data_events()
172             count = time.time() - t_begin
173             if count > timeout:
174                 LOG.error("Command timeout!")
175                 # flush the msg of the queue
176                 self._channel.queue_purge(queue=queue)
177                 # self.channel.basic_cancel()
178                 return False
179
180         msg_body = message.get_body(message.decode(self.response))
181
182         # deal with exceptions
183         if msg_body \
184                 and isinstance(msg_body, dict) \
185                 and msg_body.has_key('exception'):
186             ename = str(msg_body['exception'].get('name'))
187             if hasattr(exceptions, ename):
188                 e = getattr(exceptions, ename)()
189             else:
190                 class CallError(Exception):
191                     pass
192
193                 e = CallError()
194             e.message = str(msg_body['exception'].get('message'))
195             e.args = msg_body['exception'].get('args')
196             raise e
197         else:
198             return msg_body
199
200
201 class Server(object):
202     def __init__(self, host=None,
203                  user='guest',
204                  passwd='guest',
205                  port='5672'):
206         super(Server, self).__init__()
207         # Default use salt's master ip as rabbit rpc server ip
208         if host is None:
209             raise Exception("Can not create rpc proxy because of the None rabbitmq server address.")
210
211         self.host = host
212         self.port = port
213         self.user = user
214         self.passwd = passwd
215         try:
216             self.proxy = RpcProxy(host=host,
217                                   port=port,
218                                   user=user,
219                                   passwd=passwd)
220         except Exception as e:
221             raise e
222
223     def call(self, msg, msg_id, timeout=constant.TIMEOUT):
224         """when you add a server listen to the rabbit
225         you must appoint which queue to be listen.
226         :@queue the queue name.
227         """
228         retry = False
229
230         try:
231             ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
232         except excepts.ChannelDie:
233             # this may be the proxy die, try to reconnect to the rabbit
234             del self.proxy
235             self.proxy = RpcProxy(host=self.host,
236                                   port=self.port,
237                                   user=self.user,
238                                   passwd=self.passwd)
239             if self.proxy is None:
240                 raise excepts.UnsolvableExit
241             retry = True
242
243         if retry:
244             # if retry happened except, throw to uplay
245             ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
246
247         return ret
248
249     def cast(self, msg):
250         """when you want to send msg to all agent and no reply, use this func"""
251         LOG.warn("cast not support now.")
252
253     def make_msg(self, method, **kargs):
254         return {'method': method,
255                 'args': kargs}
256
257     def close(self):
258         self.proxy.stop()