support pep8 check
[doctor.git] / doctor_tests / consumer / sample.py
1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corporation and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from flask import Flask
10 from flask import request
11 import json
12 import time
13 from threading import Thread
14 import requests
15
16 from doctor_tests.consumer.base import BaseConsumer
17
18
19 class SampleConsumer(BaseConsumer):
20
21     def __init__(self, conf, log):
22         super(SampleConsumer, self).__init__(conf, log)
23         self.app = None
24
25     def start(self):
26         self.log.info('sample consumer start......')
27         self.app = ConsumerApp(self.conf.consumer.port, self, self.log)
28         self.app.start()
29
30     def stop(self):
31         self.log.info('sample consumer stop......')
32         if not self.app:
33             return
34         headers = {
35             'Content-Type': 'application/json',
36             'Accept': 'application/json',
37         }
38         url = 'http://%s:%d/shutdown'\
39               % (self.conf.consumer.ip,
40                  self.conf.consumer.port)
41         requests.post(url, data='', headers=headers)
42
43
44 class ConsumerApp(Thread):
45
46     def __init__(self, port, consumer, log):
47         Thread.__init__(self)
48         self.port = port
49         self.consumer = consumer
50         self.log = log
51
52     def run(self):
53         app = Flask('consumer')
54
55         @app.route('/failure', methods=['POST'])
56         def event_posted():
57             self.log.info('doctor consumer notified at %s' % time.time())
58             data = json.loads(request.data.decode('utf8'))
59             self.log.info('sample consumer received data = %s' % data)
60             return 'OK'
61
62         @app.route('/shutdown', methods=['POST'])
63         def shutdown():
64             self.log.info('shutdown consumer app server at %s' % time.time())
65             func = request.environ.get('werkzeug.server.shutdown')
66             if func is None:
67                 raise RuntimeError('Not running with the Werkzeug Server')
68             func()
69             return 'consumer app shutting down...'
70
71         app.run(host="0.0.0.0", port=self.port)