self.srv = host
self.port = port
self.agent_id = agent_id
- self.url = 'amqp://' + self.user + ':' + self.passwd + '@' + self.srv + ':' + self.port + '/%2F'
+ self.url = 'amqp://' + self.user + ':' + self.passwd + \
+ '@' + self.srv + ':' + self.port + '/%2F'
# load the agent_funcs
try:
if self._closing:
self._connection.ioloop.stop()
else:
- LOGGER.warning('Connection closed, reopening in 2 seconds: (%s) %s',
- reply_code, reply_text)
+ LOGGER.warning(
+ 'Connection closed, reopening in 2 seconds: (%s) %s',
+ reply_code,
+ reply_text)
self._connection.add_timeout(2, self.reconnect)
def reconnect(self):
:param str|unicode exchange_name: The name of the exchange to declare
"""
- LOGGER.info('Declaring %s exchange %s', constant.DIRECT, constant.exchange_d)
+ LOGGER.info(
+ 'Declaring %s exchange %s',
+ constant.DIRECT,
+ constant.exchange_d)
self._channel.exchange_declare(self.on_direct_exchange_declareok,
constant.exchange_d,
constant.DIRECT)
'args': e.args}}
finally:
response = message.add_context(response, **head)
- LOGGER.debug("response the msg: head:%(h)s, body:%(b)s",
- {'h': response.get('head'), 'b': response.get('body')})
-
- respone_chanl.basic_publish(exchange=constant.exchange_d,
- routing_key=properties.reply_to,
- properties=pika.BasicProperties(correlation_id=properties.correlation_id),
- body=message.encode(response)
- )
+ LOGGER.debug("response the msg: head:%(h)s, body:%(b)s", {
+ 'h': response.get('head'), 'b': response.get('body')})
+
+ respone_chanl.basic_publish(
+ exchange=constant.exchange_d,
+ routing_key=properties.reply_to,
+ properties=pika.BasicProperties(
+ correlation_id=properties.correlation_id),
+ body=message.encode(response))
# no matter what happend, tell the mq-server to drop this msg.
self.acknowledge_message(basic_deliver.delivery_tag)