1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corporation and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
10 from flask import Flask
11 from flask import request
15 from threading import Thread
19 from doctor_tests.common import utils
20 from doctor_tests.identity_auth import get_identity_auth
21 from doctor_tests.identity_auth import get_session
22 from doctor_tests.os_clients import nova_client
23 from doctor_tests.os_clients import neutron_client
24 from doctor_tests.inspector.base import BaseInspector
27 class SampleInspector(BaseInspector):
28 event_type = 'compute.host.down'
30 def __init__(self, conf, log, trasport_url):
31 super(SampleInspector, self).__init__(conf, log)
32 self.inspector_url = self.get_inspector_url()
33 self.novaclients = list()
34 self._init_novaclients()
35 # Normally we use this client for non redundant API calls
36 self.nova = self.novaclients[0]
38 auth = get_identity_auth(project=self.conf.doctor_project)
39 session = get_session(auth=auth)
40 if self.conf.inspector.update_neutron_port_dp_status:
41 self.neutron = neutron_client(session)
43 self.servers = collections.defaultdict(list)
44 self.hostnames = list()
48 transport = oslo_messaging.get_notification_transport(self.conf,
50 self.notif = oslo_messaging.Notifier(transport,
51 'compute.instance.update',
53 topics=['notifications'])
54 self.notif = self.notif.prepare(publisher_id='sample')
58 def _init_novaclients(self):
59 self.NUMBER_OF_CLIENTS = self.conf.instance_count
60 auth = get_identity_auth(project=self.conf.doctor_project)
61 session = get_session(auth=auth)
62 for i in range(self.NUMBER_OF_CLIENTS):
63 self.novaclients.append(
64 nova_client(self.conf.nova_version, session))
66 def _init_servers_list(self):
68 opts = {'all_tenants': True}
69 servers = self.nova.servers.list(detailed=True, search_opts=opts)
70 for server in servers:
72 host = server.__dict__.get('OS-EXT-SRV-ATTR:host')
73 self.servers[host].append(server)
74 self.log.debug('get hostname=%s from server=%s'
76 except Exception as e:
77 self.log.info('can not get hostname from server=%s, error=%s'
80 def get_inspector_url(self):
81 return 'http://%s:%s/events' % (self.conf.inspector.ip,
82 self.conf.inspector.port)
85 self.log.info('sample inspector start......')
86 self._init_servers_list()
87 self.app = InspectorApp(self.conf.inspector.port, self, self.log)
91 self.log.info('sample inspector stop......')
94 for hostname in self.hostnames:
95 self.nova.services.force_down(hostname, 'nova-compute', False)
98 'Content-Type': 'application/json',
99 'Accept': 'application/json',
101 url = '%s%s' % (self.inspector_url, 'shutdown') \
102 if self.inspector_url.endswith('/') else \
103 '%s%s' % (self.inspector_url, '/shutdown')
104 requests.post(url, data='', headers=headers)
106 def handle_events(self, events):
108 hostname = event['details']['hostname']
109 event_type = event['type']
110 if event_type == self.event_type:
111 self.hostnames.append(hostname)
112 if self.notif is not None:
113 thr0 = self._send_notif(hostname)
114 thr1 = self._disable_compute_host(hostname)
115 thr2 = self._vms_reset_state('error', hostname)
116 if self.conf.inspector.update_neutron_port_dp_status:
117 thr3 = self._set_ports_data_plane_status('DOWN', hostname)
118 if self.notif is not None:
122 if self.conf.inspector.update_neutron_port_dp_status:
125 def _alarm_data_decoder(self, data):
126 if "[" in data or "{" in data:
127 # string to list or dict removing unicode
128 data = yaml.load(data.replace("u'", "'"))
131 def _alarm_traits_decoder(self, data):
132 return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
133 for t in data['reason_data']['event']['traits']})
135 def maintenance(self, data):
137 payload = self._alarm_traits_decoder(data)
139 payload = ({t[0]: t[2] for t in
140 data['reason_data']['event']['traits']})
141 self.log.error('cannot parse alarm data: %s' % payload)
142 raise Exception('sample inspector cannot parse alarm.'
143 'Possibly trait data over 256 char')
144 self.log.info('sample inspector received data = %s' % payload)
146 state = payload['state']
147 host = payload['host']
149 if state == 'IN_MAINTENANCE':
150 self.log.info("sample inspector: disable %s automatic fault "
152 elif state == 'MAINTENANCE_COMPLETE':
153 self.log.info("sample inspector: enable %s automatic fault "
156 raise("sample inspector couldn't handle state: %s" % state)
159 def _disable_compute_host(self, hostname):
160 self.nova.services.force_down(hostname, 'nova-compute', True)
162 hostdown_time = time.time()
163 self.host_down_time = hostdown_time
164 self.log.info('doctor mark host(%s) down at %s'
165 % (hostname, hostdown_time))
168 def _vms_reset_state(self, state, hostname):
171 def _vm_reset_state(nova, server, state):
172 nova.servers.reset_state(server, state)
173 vmdown_time = time.time()
174 self.vm_down_time = vmdown_time
175 self.log.info('doctor mark vm(%s) %s at %s'
176 % (server, state, vmdown_time))
179 for nova, server in zip(self.novaclients, self.servers[hostname]):
180 t = _vm_reset_state(nova, server, state)
186 def _send_notif(self, hostname):
189 def _send_notif(server):
190 payload = dict(tenant_id=server.tenant_id,
191 instance_id=server.id,
193 self.notif.info({'some': 'context'}, 'compute.instance.update',
195 self.log.info('doctor compute.instance.update vm(%s) error %s'
196 % (server, time.time()))
199 for server in self.servers[hostname]:
200 t = _send_notif(server)
206 def _set_ports_data_plane_status(self, status, hostname):
207 body = {'data_plane_status': status}
210 def _set_port_data_plane_status(port_id):
211 self.neutron.update_port(port_id, body)
212 self.log.info('doctor set data plane status %s on port %s'
216 params = {'binding:host_id': hostname}
217 for port_id in self.neutron.list_ports(**params):
218 t = _set_port_data_plane_status(port_id)
224 class InspectorApp(Thread):
226 def __init__(self, port, inspector, log):
227 Thread.__init__(self)
229 self.inspector = inspector
233 app = Flask('inspector')
235 @app.route('/events', methods=['PUT'])
237 self.log.info('event posted in sample inspector at %s'
239 self.log.info('sample inspector = %s' % self.inspector)
240 self.log.info('sample inspector received data = %s'
242 events = json.loads(request.data.decode('utf8'))
243 self.inspector.handle_events(events)
246 @app.route('/maintenance', methods=['POST'])
248 self.inspector.maintenance(request.json)
251 @app.route('/events/shutdown', methods=['POST'])
253 self.log.info('shutdown inspector app server at %s' % time.time())
254 func = request.environ.get('werkzeug.server.shutdown')
256 raise RuntimeError('Not running with the Werkzeug Server')
258 return 'inspector app shutting down...'
260 app.run(host="0.0.0.0", port=self.port)