DevStack support
[doctor.git] / doctor_tests / inspector / sample.py
1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corporation and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 import collections
10 from flask import Flask
11 from flask import request
12 import json
13 import oslo_messaging
14 import time
15 from threading import Thread
16 import requests
17 import yaml
18
19 from doctor_tests.common import utils
20 from doctor_tests.identity_auth import get_identity_auth
21 from doctor_tests.identity_auth import get_session
22 from doctor_tests.os_clients import nova_client
23 from doctor_tests.os_clients import neutron_client
24 from doctor_tests.inspector.base import BaseInspector
25
26
27 class SampleInspector(BaseInspector):
28     event_type = 'compute.host.down'
29
30     def __init__(self, conf, log, trasport_url):
31         super(SampleInspector, self).__init__(conf, log)
32         self.inspector_url = self.get_inspector_url()
33         self.novaclients = list()
34         self._init_novaclients()
35         # Normally we use this client for non redundant API calls
36         self.nova = self.novaclients[0]
37
38         auth = get_identity_auth(project=self.conf.doctor_project)
39         session = get_session(auth=auth)
40         if self.conf.inspector.update_neutron_port_dp_status:
41             self.neutron = neutron_client(session)
42
43         self.servers = collections.defaultdict(list)
44         self.hostnames = list()
45         self.app = None
46
47         try:
48             transport = oslo_messaging.get_notification_transport(self.conf,
49                                                                   trasport_url)
50             self.notif = oslo_messaging.Notifier(transport,
51                                                  'compute.instance.update',
52                                                  driver='messaging',
53                                                  topics=['notifications'])
54             self.notif = self.notif.prepare(publisher_id='sample')
55         except Exception:
56             self.notif = None
57
58     def _init_novaclients(self):
59         self.NUMBER_OF_CLIENTS = self.conf.instance_count
60         auth = get_identity_auth(project=self.conf.doctor_project)
61         session = get_session(auth=auth)
62         for i in range(self.NUMBER_OF_CLIENTS):
63             self.novaclients.append(
64                 nova_client(self.conf.nova_version, session))
65
66     def _init_servers_list(self):
67         self.servers.clear()
68         opts = {'all_tenants': True}
69         servers = self.nova.servers.list(detailed=True, search_opts=opts)
70         for server in servers:
71             try:
72                 host = server.__dict__.get('OS-EXT-SRV-ATTR:host')
73                 self.servers[host].append(server)
74                 self.log.debug('get hostname=%s from server=%s'
75                                % (host, str(server.name)))
76             except Exception as e:
77                 self.log.info('can not get hostname from server=%s, error=%s'
78                               % (server, e))
79
80     def get_inspector_url(self):
81         return 'http://%s:%s/events' % (self.conf.inspector.ip,
82                                         self.conf.inspector.port)
83
84     def start(self):
85         self.log.info('sample inspector start......')
86         self._init_servers_list()
87         self.app = InspectorApp(self.conf.inspector.port, self, self.log)
88         self.app.start()
89
90     def stop(self):
91         self.log.info('sample inspector stop......')
92         if not self.app:
93             return
94         for hostname in self.hostnames:
95             self.nova.services.force_down(hostname, 'nova-compute', False)
96
97         headers = {
98             'Content-Type': 'application/json',
99             'Accept': 'application/json',
100         }
101         url = '%s%s' % (self.inspector_url, 'shutdown') \
102             if self.inspector_url.endswith('/') else \
103             '%s%s' % (self.inspector_url, '/shutdown')
104         requests.post(url, data='', headers=headers)
105
106     def handle_events(self, events):
107         for event in events:
108             hostname = event['details']['hostname']
109             event_type = event['type']
110             if event_type == self.event_type:
111                 self.hostnames.append(hostname)
112                 if self.notif is not None:
113                     thr0 = self._send_notif(hostname)
114                 thr1 = self._disable_compute_host(hostname)
115                 thr2 = self._vms_reset_state('error', hostname)
116                 if self.conf.inspector.update_neutron_port_dp_status:
117                     thr3 = self._set_ports_data_plane_status('DOWN', hostname)
118                 if self.notif is not None:
119                     thr0.join()
120                 thr1.join()
121                 thr2.join()
122                 if self.conf.inspector.update_neutron_port_dp_status:
123                     thr3.join()
124
125     def _alarm_data_decoder(self, data):
126         if "[" in data or "{" in data:
127             # string to list or dict removing unicode
128             data = yaml.load(data.replace("u'", "'"))
129         return data
130
131     def _alarm_traits_decoder(self, data):
132         return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
133                 for t in data['reason_data']['event']['traits']})
134
135     def maintenance(self, data):
136         try:
137             payload = self._alarm_traits_decoder(data)
138         except Exception:
139             payload = ({t[0]: t[2] for t in
140                        data['reason_data']['event']['traits']})
141             self.log.error('cannot parse alarm data: %s' % payload)
142             raise Exception('sample inspector cannot parse alarm.'
143                             'Possibly trait data over 256 char')
144         self.log.info('sample inspector received data = %s' % payload)
145
146         state = payload['state']
147         host = payload['host']
148
149         if state == 'IN_MAINTENANCE':
150             self.log.info("sample inspector: disable %s automatic fault "
151                           "management" % host)
152         elif state == 'MAINTENANCE_COMPLETE':
153             self.log.info("sample inspector: enable %s automatic fault "
154                           "management" % host)
155         else:
156             raise("sample inspector couldn't handle state: %s" % state)
157
158     @utils.run_async
159     def _disable_compute_host(self, hostname):
160         self.nova.services.force_down(hostname, 'nova-compute', True)
161
162         hostdown_time = time.time()
163         self.host_down_time = hostdown_time
164         self.log.info('doctor mark host(%s) down at %s'
165                       % (hostname, hostdown_time))
166
167     @utils.run_async
168     def _vms_reset_state(self, state, hostname):
169
170         @utils.run_async
171         def _vm_reset_state(nova, server, state):
172             nova.servers.reset_state(server, state)
173             vmdown_time = time.time()
174             self.vm_down_time = vmdown_time
175             self.log.info('doctor mark vm(%s) %s at %s'
176                           % (server, state, vmdown_time))
177
178         thrs = []
179         for nova, server in zip(self.novaclients, self.servers[hostname]):
180             t = _vm_reset_state(nova, server, state)
181             thrs.append(t)
182         for t in thrs:
183             t.join()
184
185     @utils.run_async
186     def _send_notif(self, hostname):
187
188         @utils.run_async
189         def _send_notif(server):
190             payload = dict(tenant_id=server.tenant_id,
191                            instance_id=server.id,
192                            state="error")
193             self.notif.info({'some': 'context'}, 'compute.instance.update',
194                             payload)
195             self.log.info('doctor compute.instance.update vm(%s) error %s'
196                           % (server, time.time()))
197
198         thrs = []
199         for server in self.servers[hostname]:
200             t = _send_notif(server)
201             thrs.append(t)
202         for t in thrs:
203             t.join()
204
205     @utils.run_async
206     def _set_ports_data_plane_status(self, status, hostname):
207         body = {'data_plane_status': status}
208
209         @utils.run_async
210         def _set_port_data_plane_status(port_id):
211             self.neutron.update_port(port_id, body)
212             self.log.info('doctor set data plane status %s on port %s'
213                           % (status, port_id))
214
215         thrs = []
216         params = {'binding:host_id': hostname}
217         for port_id in self.neutron.list_ports(**params):
218             t = _set_port_data_plane_status(port_id)
219             thrs.append(t)
220         for t in thrs:
221             t.join()
222
223
224 class InspectorApp(Thread):
225
226     def __init__(self, port, inspector, log):
227         Thread.__init__(self)
228         self.port = port
229         self.inspector = inspector
230         self.log = log
231
232     def run(self):
233         app = Flask('inspector')
234
235         @app.route('/events', methods=['PUT'])
236         def event_posted():
237             self.log.info('event posted in sample inspector at %s'
238                           % time.time())
239             self.log.info('sample inspector = %s' % self.inspector)
240             self.log.info('sample inspector received data = %s'
241                           % request.data)
242             events = json.loads(request.data.decode('utf8'))
243             self.inspector.handle_events(events)
244             return "OK"
245
246         @app.route('/maintenance', methods=['POST'])
247         def maintenance():
248             self.inspector.maintenance(request.json)
249             return "OK"
250
251         @app.route('/events/shutdown', methods=['POST'])
252         def shutdown():
253             self.log.info('shutdown inspector app server at %s' % time.time())
254             func = request.environ.get('werkzeug.server.shutdown')
255             if func is None:
256                 raise RuntimeError('Not running with the Werkzeug Server')
257             func()
258             return 'inspector app shutting down...'
259
260         app.run(host="0.0.0.0", port=self.port)