Decode received data in byte type to str type
[doctor.git] / tests / inspector / sample.py
1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corporation and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 import collections
10 from flask import Flask
11 from flask import request
12 import json
13 import time
14 from threading import Thread
15 import requests
16
17 from identity_auth import get_identity_auth
18 from identity_auth import get_session
19 from os_clients import nova_client
20 from inspector.base import BaseInspector
21
22
23 class SampleInspector(BaseInspector):
24     event_type = 'compute.host.down'
25
26     def __init__(self, conf, log):
27         super(SampleInspector, self).__init__(conf, log)
28         self.inspector_url = self.get_inspector_url()
29         self.novaclients = list()
30         self._init_novaclients()
31         # Normally we use this client for non redundant API calls
32         self.nova = self.novaclients[0]
33
34         self.servers = collections.defaultdict(list)
35         self.hostnames = list()
36         self.app = None
37
38     def _init_novaclients(self):
39         self.NUMBER_OF_CLIENTS = self.conf.instance_count
40         auth = get_identity_auth(project=self.conf.doctor_project)
41         session = get_session(auth=auth)
42         for i in range(self.NUMBER_OF_CLIENTS):
43             self.novaclients.append(
44                 nova_client(self.conf.nova_version, session))
45
46     def _init_servers_list(self):
47         self.servers.clear()
48         opts = {'all_tenants': True}
49         servers = self.nova.servers.list(search_opts=opts)
50         for server in servers:
51             try:
52                 host = server.__dict__.get('OS-EXT-SRV-ATTR:host')
53                 self.servers[host].append(server)
54                 self.log.debug('get hostname=%s from server=%s' % (host, server))
55             except Exception as e:
56                 self.log.info('can not get hostname from server=%s' % server)
57
58     def get_inspector_url(self):
59         return 'http://%s:%s' % (self.conf.inspector.ip, self.conf.inspector.port)
60
61     def start(self):
62         self.log.info('sample inspector start......')
63         self._init_servers_list()
64         self.app = InspectorApp(self.conf.inspector.port, self, self.log)
65         self.app.start()
66
67     def stop(self):
68         self.log.info('sample inspector stop......')
69         if not self.app:
70             return
71         for hostname in self.hostnames:
72             self.nova.services.force_down(hostname, 'nova-compute', False)
73
74         headers = {
75             'Content-Type': 'application/json',
76             'Accept': 'application/json',
77         }
78         url = '%s%s' % (self.inspector_url, 'shutdown') \
79             if self.inspector_url.endswith('/') else \
80             '%s%s' % (self.inspector_url, '/shutdown')
81         requests.post(url, data='', headers=headers)
82
83     def handle_events(self, events):
84         for event in events:
85             hostname = event['details']['hostname']
86             event_type = event['type']
87             if event_type == self.event_type:
88                 self.hostnames.append(hostname)
89                 self.disable_compute_host(hostname)
90
91     def disable_compute_host(self, hostname):
92         threads = []
93         if len(self.servers[hostname]) > self.NUMBER_OF_CLIENTS:
94             # TODO(tojuvone): This could be enhanced in future with dynamic
95             # reuse of self.novaclients when all threads in use
96             self.log.error('%d servers in %s. Can handle only %d'%(
97                            self.servers[hostname], hostname, self.NUMBER_OF_CLIENTS))
98         for nova, server in zip(self.novaclients, self.servers[hostname]):
99             t = ThreadedResetState(nova, "error", server, self.log)
100             t.start()
101             threads.append(t)
102         for t in threads:
103             t.join()
104         self.nova.services.force_down(hostname, 'nova-compute', True)
105         self.log.info('doctor mark host(%s) down at %s' % (hostname, time.time()))
106
107
108 class ThreadedResetState(Thread):
109
110     def __init__(self, nova, state, server, log):
111         Thread.__init__(self)
112         self.nova = nova
113         self.state = state
114         self.server = server
115         self.log = log
116
117     def run(self):
118         self.nova.servers.reset_state(self.server, self.state)
119         self.log.info('doctor mark vm(%s) error at %s' % (self.server, time.time()))
120
121
122 class InspectorApp(Thread):
123
124     def __init__(self, port, inspector, log):
125         Thread.__init__(self)
126         self.port = port
127         self.inspector = inspector
128         self.log = log
129
130     def run(self):
131         app = Flask('inspector')
132
133         @app.route('/events', methods=['PUT'])
134         def event_posted():
135             self.log.info('event posted in sample inspector at %s' % time.time())
136             self.log.info('sample inspector = %s' % self.inspector)
137             self.log.info('sample inspector received data = %s' % request.data)
138             events = json.loads(request.data.decode('utf8'))
139             self.inspector.handle_events(events)
140             return "OK"
141
142         @app.route('/shutdown', methods=['POST'])
143         def shutdown():
144             self.log.info('shutdown inspector app server at %s' % time.time())
145             func = request.environ.get('werkzeug.server.shutdown')
146             if func is None:
147                 raise RuntimeError('Not running with the Werkzeug Server')
148             func()
149             return 'inspector app shutting down...'
150
151         app.run(host="0.0.0.0", port=self.port)