Add maintenance test code
[doctor.git] / doctor_tests / inspector / sample.py
1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corporation and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 import collections
10 from flask import Flask
11 from flask import request
12 import json
13 import time
14 from threading import Thread
15 import requests
16 import yaml
17
18 from doctor_tests.common import utils
19 from doctor_tests.identity_auth import get_identity_auth
20 from doctor_tests.identity_auth import get_session
21 from doctor_tests.os_clients import nova_client
22 from doctor_tests.os_clients import neutron_client
23 from doctor_tests.inspector.base import BaseInspector
24
25
26 class SampleInspector(BaseInspector):
27     event_type = 'compute.host.down'
28
29     def __init__(self, conf, log):
30         super(SampleInspector, self).__init__(conf, log)
31         self.inspector_url = self.get_inspector_url()
32         self.novaclients = list()
33         self._init_novaclients()
34         # Normally we use this client for non redundant API calls
35         self.nova = self.novaclients[0]
36
37         auth = get_identity_auth(project=self.conf.doctor_project)
38         session = get_session(auth=auth)
39         if self.conf.inspector.update_neutron_port_dp_status:
40             self.neutron = neutron_client(session)
41
42         self.servers = collections.defaultdict(list)
43         self.hostnames = list()
44         self.app = None
45
46     def _init_novaclients(self):
47         self.NUMBER_OF_CLIENTS = self.conf.instance_count
48         auth = get_identity_auth(project=self.conf.doctor_project)
49         session = get_session(auth=auth)
50         for i in range(self.NUMBER_OF_CLIENTS):
51             self.novaclients.append(
52                 nova_client(self.conf.nova_version, session))
53
54     def _init_servers_list(self):
55         self.servers.clear()
56         opts = {'all_tenants': True}
57         servers = self.nova.servers.list(search_opts=opts)
58         for server in servers:
59             try:
60                 host = server.__dict__.get('OS-EXT-SRV-ATTR:host')
61                 self.servers[host].append(server)
62                 self.log.debug('get hostname=%s from server=%s'
63                                % (host, server))
64             except Exception as e:
65                 self.log.info('can not get hostname from server=%s, error=%s'
66                               % (server, e))
67
68     def get_inspector_url(self):
69         return 'http://%s:%s/events' % (self.conf.inspector.ip,
70                                         self.conf.inspector.port)
71
72     def start(self):
73         self.log.info('sample inspector start......')
74         self._init_servers_list()
75         self.app = InspectorApp(self.conf.inspector.port, self, self.log)
76         self.app.start()
77
78     def stop(self):
79         self.log.info('sample inspector stop......')
80         if not self.app:
81             return
82         for hostname in self.hostnames:
83             self.nova.services.force_down(hostname, 'nova-compute', False)
84
85         headers = {
86             'Content-Type': 'application/json',
87             'Accept': 'application/json',
88         }
89         url = '%s%s' % (self.inspector_url, 'shutdown') \
90             if self.inspector_url.endswith('/') else \
91             '%s%s' % (self.inspector_url, '/shutdown')
92         requests.post(url, data='', headers=headers)
93
94     def handle_events(self, events):
95         for event in events:
96             hostname = event['details']['hostname']
97             event_type = event['type']
98             if event_type == self.event_type:
99                 self.hostnames.append(hostname)
100                 thr1 = self._disable_compute_host(hostname)
101                 thr2 = self._vms_reset_state('error', hostname)
102                 if self.conf.inspector.update_neutron_port_dp_status:
103                     thr3 = self._set_ports_data_plane_status('DOWN', hostname)
104                 thr1.join()
105                 thr2.join()
106                 if self.conf.inspector.update_neutron_port_dp_status:
107                     thr3.join()
108
109     def _alarm_data_decoder(self, data):
110         if "[" in data or "{" in data:
111             # string to list or dict removing unicode
112             data = yaml.load(data.replace("u'", "'"))
113         return data
114
115     def _alarm_traits_decoder(self, data):
116         return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
117                 for t in data['reason_data']['event']['traits']})
118
119     def maintenance(self, data):
120         try:
121             payload = self._alarm_traits_decoder(data)
122         except:
123             payload = ({t[0]: t[2] for t in
124                        data['reason_data']['event']['traits']})
125             self.log.error('cannot parse alarm data: %s' % payload)
126             raise Exception('sample inspector cannot parse alarm.'
127                             'Possibly trait data over 256 char')
128         self.log.info('sample inspector received data = %s' % payload)
129
130         state = payload['state']
131         host = payload['host']
132
133         if state == 'IN_MAINTENANCE':
134             self.log.info("sample inspector: disable %s automatic fault "
135                           "management" % host)
136         elif state == 'MAINTENANCE_COMPLETE':
137             self.log.info("sample inspector: enable %s automatic fault "
138                           "management" % host)
139         else:
140             raise("sample inspector couldn't handle state: %s" % state)
141
142     @utils.run_async
143     def _disable_compute_host(self, hostname):
144         self.nova.services.force_down(hostname, 'nova-compute', True)
145
146         hostdown_time = time.time()
147         self.host_down_time = hostdown_time
148         self.log.info('doctor mark host(%s) down at %s'
149                       % (hostname, hostdown_time))
150
151     @utils.run_async
152     def _vms_reset_state(self, state, hostname):
153
154         @utils.run_async
155         def _vm_reset_state(nova, server, state):
156             nova.servers.reset_state(server, state)
157             vmdown_time = time.time()
158             self.vm_down_time = vmdown_time
159             self.log.info('doctor mark vm(%s) error at %s'
160                           % (server, vmdown_time))
161
162         thrs = []
163         for nova, server in zip(self.novaclients, self.servers[hostname]):
164             t = _vm_reset_state(nova, server, state)
165             thrs.append(t)
166         for t in thrs:
167             t.join()
168
169     @utils.run_async
170     def _set_ports_data_plane_status(self, status, hostname):
171         body = {'data_plane_status': status}
172
173         @utils.run_async
174         def _set_port_data_plane_status(port_id):
175             self.neutron.update_port(port_id, body)
176             self.log.info('doctor set data plane status %s on port %s'
177                           % (status, port_id))
178
179         thrs = []
180         params = {'binding:host_id': hostname}
181         for port_id in self.neutron.list_ports(**params):
182             t = _set_port_data_plane_status(port_id)
183             thrs.append(t)
184         for t in thrs:
185             t.join()
186
187
188 class InspectorApp(Thread):
189
190     def __init__(self, port, inspector, log):
191         Thread.__init__(self)
192         self.port = port
193         self.inspector = inspector
194         self.log = log
195
196     def run(self):
197         app = Flask('inspector')
198
199         @app.route('/events', methods=['PUT'])
200         def event_posted():
201             self.log.info('event posted in sample inspector at %s'
202                           % time.time())
203             self.log.info('sample inspector = %s' % self.inspector)
204             self.log.info('sample inspector received data = %s'
205                           % request.data)
206             events = json.loads(request.data.decode('utf8'))
207             self.inspector.handle_events(events)
208             return "OK"
209
210         @app.route('/maintenance', methods=['POST'])
211         def maintenance():
212             self.inspector.maintenance(request.json)
213             return "OK"
214
215         @app.route('/events/shutdown', methods=['POST'])
216         def shutdown():
217             self.log.info('shutdown inspector app server at %s' % time.time())
218             func = request.environ.get('werkzeug.server.shutdown')
219             if func is None:
220                 raise RuntimeError('Not running with the Werkzeug Server')
221             func()
222             return 'inspector app shutting down...'
223
224         app.run(host="0.0.0.0", port=self.port)