Upload the contribution of vstf as bottleneck network framework.
[bottlenecks.git] / vstf / vstf / rpc_frame_work / rpc_producer.py
1 #!/usr/bin/env python
2 # coding=utf-8
3 import uuid
4 import json
5 import time
6 import exceptions
7 import logging
8
9 import pika
10 from vstf.common import message
11 from vstf.common import excepts
12 from vstf.rpc_frame_work import constant
13
14 LOG = logging.getLogger(__name__)
15
16
17 class RpcProxy(object):
18     def __init__(self, host,
19                  user='guest',
20                  passwd='guest',
21                  port='5672'):
22         """create a connection to rabbitmq,direct call and fan call supported.
23
24         """
25         # try to create connection of rabbitmq
26         self._channel = None
27         self._connection = None
28         self._queue = str(uuid.uuid4())
29         self._consume_tag = None
30
31         self.user = user
32         self.passwd = passwd
33         self.srv = host
34         self.port = port
35         self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
36         try:
37             self.connect(host, self.setup_vstf_producer)
38         except Exception as e:
39             LOG.error("create connection failed. e:%(e)s", {'e': e})
40             raise e
41
42         self.response = None
43         self.corr_id = None
44
45     def connect(self, host, ok_callback):
46         """Create a Blocking connection to the rabbitmq-server
47         
48         :param str    host: the rabbitmq-server's host
49         :param obj    ok_callback: if connect success than do this function
50         
51         """
52         LOG.info("Connect to the server %s", host)
53         self._connection = pika.BlockingConnection(pika.URLParameters(self.url))
54         if self._connection:
55             ok_callback()
56
57     def setup_vstf_producer(self):
58         self.open_channel()
59         self.create_exchange(constant.exchange_d, constant.DIRECT)
60         self.bind_queues()
61         self.start_consumer()
62
63     def open_channel(self):
64         self._channel = self._connection.channel()
65         if self._channel:
66             self._channel.confirm_delivery()
67
68     def create_exchange(self, name, type):
69         LOG.info("Create %s exchange: %s", type, name)
70         self._channel.exchange_declare(exchange=name, type=type)
71
72     def bind_queues(self):
73         LOG.info("Declare queue %s and bind it to exchange %s",
74                  self._queue, constant.exchange_d)
75         self._channel.queue_declare(queue=self._queue, exclusive=True)
76         self._channel.queue_bind(exchange=constant.exchange_d, queue=self._queue)
77
78     def start_consumer(self):
79         LOG.info("Start response consumer")
80         self._consume_tag = self._channel.basic_consume(self.on_response,
81                                                         no_ack=True,
82                                                         queue=self._queue)
83
84     def stop_consuming(self):
85         """Tell RabbitMQ that you would like to stop consuming by sending the
86         Basic.Cancel RPC command.
87
88         """
89         if self._channel:
90             LOG.info('Sending a Basic.Cancel RPC command to RabbitMQ')
91             self._channel.basic_cancel(self._consume_tag)
92
93         self.close_channel()
94
95     def close_channel(self):
96         """Call to close the channel with RabbitMQ cleanly by issuing the
97         Channel.Close RPC command.
98
99         """
100         LOG.info('Closing the channel')
101         self._channel.close()
102
103     def close_connection(self):
104         """This method closes the connection to RabbitMQ."""
105         LOG.info('Closing connection')
106         self._connection.close()
107
108     def stop(self):
109         self.stop_consuming()
110         self.close_connection()
111
112     def on_response(self, ch, method, props, body):
113         """this func reciver the msg"""
114         self.response = None
115         if self.corr_id == props.correlation_id:
116             self.response = json.loads(body)
117             LOG.debug("Proxy producer reciver the msg: head:%(h)s, body:%(b)s",
118                       {'h': self.response.get('head'), 'b': self.response.get('body')})
119         else:
120             LOG.warn("Proxy producer Drop the msg "
121                      "because of the wrong correlation id, %s\n" % body)
122
123     def publish(self, target, corrid, body):
124         properties = pika.BasicProperties(reply_to=self._queue,
125                                           correlation_id=corrid)
126         LOG.debug("start to publish message to the exchange=%s, target=%s, msg=%s"
127                   , constant.exchange_d, target, body)
128         return self._channel.basic_publish(exchange=constant.exchange_d,
129                                            routing_key=target,
130                                            mandatory=True,
131                                            properties=properties,
132                                            body=message.encode(body))
133
134     def call(self, msg, target='agent', timeout=constant.TIMEOUT):
135         """send msg to agent by id, this func will wait ack until timeout
136         :msg the msg to be sent
137         :id agent's id
138         :timeout timeout of waiting response
139
140         """
141         self.response = None
142         queue = constant.queue_common + target
143         # the msg request and respone must be match by corr_id
144         self.corr_id = str(uuid.uuid4())
145         # same msg format 
146         msg = message.add_context(msg, corrid=self.corr_id)
147
148         # send msg to the queue
149         try:
150             ret = self.publish(queue, self.corr_id, msg)
151         except Exception as e:
152             LOG.error(message.dumpstrace())
153             raise excepts.ChannelDie
154
155         # if delivery msg failed. return error
156         # clean the msg in the queue
157         if not ret:
158             LOG.error("productor message delivery failed.")
159             return "Message can not be deliveryed, please check the connection of agent."
160
161         # wait for response
162         t_begin = time.time()
163         while self.response is None:
164             self._connection.process_data_events()
165             count = time.time() - t_begin
166             if count > timeout:
167                 LOG.error("Command timeout!")
168                 # flush the msg of the queue
169                 self._channel.queue_purge(queue=queue)
170                 # self.channel.basic_cancel()
171                 return False
172
173         msg_body = message.get_body(message.decode(self.response))
174
175         # deal with exceptions
176         if msg_body \
177                 and isinstance(msg_body, dict) \
178                 and msg_body.has_key('exception'):
179             ename = str(msg_body['exception'].get('name'))
180             if hasattr(exceptions, ename):
181                 e = getattr(exceptions, ename)()
182             else:
183                 class CallError(Exception):
184                     pass
185
186                 e = CallError()
187             e.message = str(msg_body['exception'].get('message'))
188             e.args = msg_body['exception'].get('args')
189             raise e
190         else:
191             return msg_body
192
193
194 class Server(object):
195     def __init__(self, host=None,
196                  user='guest',
197                  passwd='guest',
198                  port='5672'):
199         super(Server, self).__init__()
200         # Default use salt's master ip as rabbit rpc server ip
201         if host is None:
202             raise Exception("Can not create rpc proxy because of the None rabbitmq server address.")
203
204         self.host = host
205         self.port = port
206         self.user = user
207         self.passwd = passwd
208         try:
209             self.proxy = RpcProxy(host=host,
210                                   port=port,
211                                   user=user,
212                                   passwd=passwd)
213         except Exception as e:
214             raise e
215
216     def call(self, msg, msg_id, timeout=constant.TIMEOUT):
217         """when you add a server listen to the rabbit
218         you must appoint which queue to be listen.
219         :@queue the queue name.
220         """
221         retry = False
222
223         try:
224             ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
225         except excepts.ChannelDie:
226             # this may be the proxy die, try to reconnect to the rabbit
227             del self.proxy
228             self.proxy = RpcProxy(host=self.host,
229                                   port=self.port,
230                                   user=self.user,
231                                   passwd=self.passwd)
232             if self.proxy is None:
233                 raise excepts.UnsolvableExit
234             retry = True
235
236         if retry:
237             # if retry happened except, throw to uplay
238             ret = self.proxy.call(msg, target=msg_id, timeout=timeout)
239
240         return ret
241
242     def cast(self, msg):
243         """when you want to send msg to all agent and no reply, use this func"""
244         LOG.warn("cast not support now.")
245
246     def make_msg(self, method, **kargs):
247         return {'method': method,
248                 'args': kargs}
249
250     def close(self):
251         self.proxy.stop()