1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
17 from vstf.common import message
18 from vstf.common import excepts
19 from vstf.rpc_frame_work import constant
21 LOG = logging.getLogger(__name__)
24 class RpcProxy(object):
25 def __init__(self, host,
29 """create a connection to rabbitmq,direct call and fan call supported.
32 # try to create connection of rabbitmq
34 self._connection = None
35 self._queue = str(uuid.uuid4())
36 self._consume_tag = None
42 self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
44 self.connect(host, self.setup_vstf_producer)
45 except Exception as e:
46 LOG.error("create connection failed. e:%(e)s", {'e': e})
52 def connect(self, host, ok_callback):
53 """Create a Blocking connection to the rabbitmq-server
55 :param str host: the rabbitmq-server's host
56 :param obj ok_callback: if connect success than do this function
59 LOG.info("Connect to the server %s", host)
60 self._connection = pika.BlockingConnection(pika.URLParameters(self.url))
64 def setup_vstf_producer(self):
66 self.create_exchange(constant.exchange_d, constant.DIRECT)
70 def open_channel(self):
71 self._channel = self._connection.channel()
73 self._channel.confirm_delivery()
75 def create_exchange(self, name, type):
76 LOG.info("Create %s exchange: %s", type, name)
77 self._channel.exchange_declare(exchange=name, type=type)
79 def bind_queues(self):
80 LOG.info("Declare queue %s and bind it to exchange %s",
81 self._queue, constant.exchange_d)
82 self._channel.queue_declare(queue=self._queue, exclusive=True)
83 self._channel.queue_bind(exchange=constant.exchange_d, queue=self._queue)
85 def start_consumer(self):
86 LOG.info("Start response consumer")
87 self._consume_tag = self._channel.basic_consume(self.on_response,
91 def stop_consuming(self):
92 """Tell RabbitMQ that you would like to stop consuming by sending the
93 Basic.Cancel RPC command.
97 LOG.info('Sending a Basic.Cancel RPC command to RabbitMQ')
98 self._channel.basic_cancel(self._consume_tag)
102 def close_channel(self):
103 """Call to close the channel with RabbitMQ cleanly by issuing the
104 Channel.Close RPC command.
107 LOG.info('Closing the channel')
108 self._channel.close()
110 def close_connection(self):
111 """This method closes the connection to RabbitMQ."""
112 LOG.info('Closing connection')
113 self._connection.close()
116 self.stop_consuming()
117 self.close_connection()
119 def on_response(self, ch, method, props, body):
120 """this func reciver the msg"""
122 if self.corr_id == props.correlation_id:
123 self.response = json.loads(body)
124 LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s",
125 {'h': self.response.get('head'), 'b': self.response.get('body')})
127 LOG.warn("Proxy producer Drop the msg "
128 "because of the wrong correlation id, %s\n" % body)
130 def publish(self, target, corrid, body):
131 properties = pika.BasicProperties(reply_to=self._queue,
132 correlation_id=corrid)
133 LOG.debug("start to publish message to the exchange=%s, target=%s, msg=%s"
134 , constant.exchange_d, target, body)
135 return self._channel.basic_publish(exchange=constant.exchange_d,
138 properties=properties,
139 body=message.encode(body))
141 def call(self, msg, target='agent', timeout=constant.TIMEOUT):
142 """send msg to agent by id, this func will wait ack until timeout
143 :msg the msg to be sent
145 :timeout timeout of waiting response
149 queue = constant.queue_common + target
150 # the msg request and respone must be match by corr_id
151 self.corr_id = str(uuid.uuid4())
153 msg = message.add_context(msg, corrid=self.corr_id)
155 # send msg to the queue
157 ret = self.publish(queue, self.corr_id, msg)
158 except Exception as e:
159 LOG.error(message.dumpstrace())
160 raise excepts.ChannelDie
162 # if delivery msg failed. return error
163 # clean the msg in the queue
165 LOG.error("productor message delivery failed.")
166 return "Message can not be deliveryed, please check the connection of agent."
169 t_begin = time.time()
170 while self.response is None:
171 self._connection.process_data_events()
172 count = time.time() - t_begin
174 LOG.error("Command timeout!")
175 # flush the msg of the queue
176 self._channel.queue_purge(queue=queue)
177 # self.channel.basic_cancel()
180 msg_body = message.get_body(message.decode(self.response))
182 # deal with exceptions
184 and isinstance(msg_body, dict) \
185 and msg_body.has_key('exception'):
186 ename = str(msg_body['exception'].get('name'))
187 if hasattr(exceptions, ename):
188 e = getattr(exceptions, ename)()
190 class CallError(Exception):
194 e.message = str(msg_body['exception'].get('message'))
195 e.args = msg_body['exception'].get('args')
201 class Server(object):
202 def __init__(self, host=None,
206 super(Server, self).__init__()
207 # Default use salt's master ip as rabbit rpc server ip
209 raise Exception("Can not create rpc proxy because of the None rabbitmq server address.")
216 self.proxy = RpcProxy(host=host,
220 except Exception as e:
223 def call(self, msg, msg_id, timeout=constant.TIMEOUT):
224 """when you add a server listen to the rabbit
225 you must appoint which queue to be listen.
226 :@queue the queue name.
231 ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
232 except excepts.ChannelDie:
233 # this may be the proxy die, try to reconnect to the rabbit
235 self.proxy = RpcProxy(host=self.host,
239 if self.proxy is None:
240 raise excepts.UnsolvableExit
244 # if retry happened except, throw to uplay
245 ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
250 """when you want to send msg to all agent and no reply, use this func"""
251 LOG.warn("cast not support now.")
253 def make_msg(self, method, **kargs):
254 return {'method': method,