Merge "Change PTL informatin in INFO"
[bottlenecks.git] / testsuites / vstf / vstf_scripts / vstf / rpc_frame_work / rpc_producer.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import uuid
11 import json
12 import time
13 import exceptions
14 import logging
15
16 import pika
17 from vstf.common import message
18 from vstf.common import excepts
19 from vstf.rpc_frame_work import constant
20
21 LOG = logging.getLogger(__name__)
22
23
24 class RpcProxy(object):
25
26     def __init__(self, host,
27                  user='guest',
28                  passwd='guest',
29                  port='5672'):
30         """create a connection to rabbitmq,direct call and fan call supported.
31
32         """
33         # try to create connection of rabbitmq
34         self._channel = None
35         self._connection = None
36         self._queue = str(uuid.uuid4())
37         self._consume_tag = None
38
39         self.user = user
40         self.passwd = passwd
41         self.srv = host
42         self.port = port
43         self.url = 'amqp://' + self.user + ':' + self.passwd + \
44             '@' + self.srv + ':' + self.port + '/%2F'
45         try:
46             self.connect(host, self.setup_vstf_producer)
47         except Exception as e:
48             LOG.error("create connection failed. e:%(e)s", {'e': e})
49             raise e
50
51         self.response = None
52         self.corr_id = None
53
54     def connect(self, host, ok_callback):
55         """Create a Blocking connection to the rabbitmq-server
56
57         :param str    host: the rabbitmq-server's host
58         :param obj    ok_callback: if connect success than do this function
59
60         """
61         LOG.info("Connect to the server %s", host)
62         self._connection = pika.BlockingConnection(
63             pika.URLParameters(self.url))
64         if self._connection:
65             ok_callback()
66
67     def setup_vstf_producer(self):
68         self.open_channel()
69         self.create_exchange(constant.exchange_d, constant.DIRECT)
70         self.bind_queues()
71         self.start_consumer()
72
73     def open_channel(self):
74         self._channel = self._connection.channel()
75         if self._channel:
76             self._channel.confirm_delivery()
77
78     def create_exchange(self, name, type):
79         LOG.info("Create %s exchange: %s", type, name)
80         self._channel.exchange_declare(exchange=name, type=type)
81
82     def bind_queues(self):
83         LOG.info("Declare queue %s and bind it to exchange %s",
84                  self._queue, constant.exchange_d)
85         self._channel.queue_declare(queue=self._queue, exclusive=True)
86         self._channel.queue_bind(
87             exchange=constant.exchange_d,
88             queue=self._queue)
89
90     def start_consumer(self):
91         LOG.info("Start response consumer")
92         self._consume_tag = self._channel.basic_consume(self.on_response,
93                                                         no_ack=True,
94                                                         queue=self._queue)
95
96     def stop_consuming(self):
97         """Tell RabbitMQ that you would like to stop consuming by sending the
98         Basic.Cancel RPC command.
99
100         """
101         if self._channel:
102             LOG.info('Sending a Basic.Cancel RPC command to RabbitMQ')
103             self._channel.basic_cancel(self._consume_tag)
104
105         self.close_channel()
106
107     def close_channel(self):
108         """Call to close the channel with RabbitMQ cleanly by issuing the
109         Channel.Close RPC command.
110
111         """
112         LOG.info('Closing the channel')
113         self._channel.close()
114
115     def close_connection(self):
116         """This method closes the connection to RabbitMQ."""
117         LOG.info('Closing connection')
118         self._connection.close()
119
120     def stop(self):
121         self.stop_consuming()
122         self.close_connection()
123
124     def on_response(self, ch, method, props, body):
125         """this func reciver the msg"""
126         self.response = None
127         if self.corr_id == props.correlation_id:
128             self.response = json.loads(body)
129             LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s", {
130                       'h': self.response.get('head'), 'b': self.response.get('body')})
131         else:
132             LOG.warn("Proxy producer Drop the msg "
133                      "because of the wrong correlation id, %s\n" % body)
134
135     def publish(self, target, corrid, body):
136         properties = pika.BasicProperties(reply_to=self._queue,
137                                           correlation_id=corrid)
138         LOG.debug(
139             "start to publish message to the exchange=%s, target=%s, msg=%s",
140             constant.exchange_d,
141             target,
142             body)
143         return self._channel.basic_publish(exchange=constant.exchange_d,
144                                            routing_key=target,
145                                            mandatory=True,
146                                            properties=properties,
147                                            body=message.encode(body))
148
149     def call(self, msg, target='agent', timeout=constant.TIMEOUT):
150         """send msg to agent by id, this func will wait ack until timeout
151         :msg the msg to be sent
152         :id agent's id
153         :timeout timeout of waiting response
154
155         """
156         self.response = None
157         queue = constant.queue_common + target
158         # the msg request and respone must be match by corr_id
159         self.corr_id = str(uuid.uuid4())
160         # same msg format
161         msg = message.add_context(msg, corrid=self.corr_id)
162
163         # send msg to the queue
164         try:
165             ret = self.publish(queue, self.corr_id, msg)
166         except Exception as e:
167             LOG.error(message.dumpstrace())
168             raise excepts.ChannelDie
169
170         # if delivery msg failed. return error
171         # clean the msg in the queue
172         if not ret:
173             LOG.error("productor message delivery failed.")
174             return "Message can not be deliveryed, please check the connection of agent."
175
176         # wait for response
177         t_begin = time.time()
178         while self.response is None:
179             self._connection.process_data_events()
180             count = time.time() - t_begin
181             if count > timeout:
182                 LOG.error("Command timeout!")
183                 # flush the msg of the queue
184                 self._channel.queue_purge(queue=queue)
185                 # self.channel.basic_cancel()
186                 return False
187
188         msg_body = message.get_body(message.decode(self.response))
189
190         # deal with exceptions
191         if msg_body \
192                 and isinstance(msg_body, dict) \
193                 and 'exception' in msg_body:
194             ename = str(msg_body['exception'].get('name'))
195             if hasattr(exceptions, ename):
196                 e = getattr(exceptions, ename)()
197             else:
198                 class CallError(Exception):
199                     pass
200
201                 e = CallError()
202             e.message = str(msg_body['exception'].get('message'))
203             e.args = msg_body['exception'].get('args')
204             raise e
205         else:
206             return msg_body
207
208
209 class Server(object):
210
211     def __init__(self, host=None,
212                  user='guest',
213                  passwd='guest',
214                  port='5672'):
215         super(Server, self).__init__()
216         # Default use salt's master ip as rabbit rpc server ip
217         if host is None:
218             raise Exception(
219                 "Can not create rpc proxy because of the None rabbitmq server address.")
220
221         self.host = host
222         self.port = port
223         self.user = user
224         self.passwd = passwd
225         try:
226             self.proxy = RpcProxy(host=host,
227                                   port=port,
228                                   user=user,
229                                   passwd=passwd)
230         except Exception as e:
231             raise e
232
233     def call(self, msg, msg_id, timeout=constant.TIMEOUT):
234         """when you add a server listen to the rabbit
235         you must appoint which queue to be listen.
236         :@queue the queue name.
237         """
238         retry = False
239
240         try:
241             ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
242         except excepts.ChannelDie:
243             # this may be the proxy die, try to reconnect to the rabbit
244             del self.proxy
245             self.proxy = RpcProxy(host=self.host,
246                                   port=self.port,
247                                   user=self.user,
248                                   passwd=self.passwd)
249             if self.proxy is None:
250                 raise excepts.UnsolvableExit
251             retry = True
252
253         if retry:
254             # if retry happened except, throw to uplay
255             ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
256
257         return ret
258
259     def cast(self, msg):
260         """when you want to send msg to all agent and no reply, use this func"""
261         LOG.warn("cast not support now.")
262
263     def make_msg(self, method, **kargs):
264         return {'method': method,
265                 'args': kargs}
266
267     def close(self):
268         self.proxy.stop()