refactor failure inject
[doctor.git] / tests / consumer / sample.py
1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corporation and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 from flask import Flask
10 from flask import request
11 import json
12 import time
13 from threading import Thread
14 import requests
15
16 from consumer.base import BaseConsumer
17
18
19 class SampleConsumer(BaseConsumer):
20
21     def __init__(self, conf, log):
22         super(SampleConsumer, self).__init__(conf, log)
23         self.app = None
24
25     def start(self):
26         self.log.info('sample consumer start......')
27         self.app = ConsumerApp(self.conf.consumer.port, self, self.log)
28         self.app.start()
29
30     def stop(self):
31         self.log.info('sample consumer stop......')
32         if not self.app:
33             return
34         headers = {
35             'Content-Type': 'application/json',
36             'Accept': 'application/json',
37         }
38         url = 'http://%s:%d/shutdown'\
39               % (self.conf.consumer.ip,
40                  self.conf.consumer.port)
41         requests.post(url, data='', headers=headers)
42
43
44 class ConsumerApp(Thread):
45
46     def __init__(self, port, consumer, log):
47         Thread.__init__(self)
48         self.port = port
49         self.consumer = consumer
50         self.log = log
51
52     def run(self):
53         app = Flask('consumer')
54
55         @app.route('/failure', methods=['POST'])
56         def event_posted():
57             self.log.info('doctor consumer notified at %s' % time.time())
58             self.log.info('sample consumer received data = %s' % request.data)
59             data = json.loads(request.data.decode('utf8'))
60             return 'OK'
61
62         @app.route('/shutdown', methods=['POST'])
63         def shutdown():
64             self.log.info('shutdown consumer app server at %s' % time.time())
65             func = request.environ.get('werkzeug.server.shutdown')
66             if func is None:
67                 raise RuntimeError('Not running with the Werkzeug Server')
68             func()
69             return 'consumer app shutting down...'
70
71         app.run(host="0.0.0.0", port=self.port)