Add parallel execution and shortcut notification to inspector design guideline
[doctor.git] / tests / inspector / sample.py
1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corporation and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 import collections
10 from flask import Flask
11 from flask import request
12 import json
13 import time
14 from threading import Thread
15 import requests
16
17 from common import utils
18 from identity_auth import get_identity_auth
19 from identity_auth import get_session
20 from os_clients import nova_client
21 from os_clients import neutron_client
22 from inspector.base import BaseInspector
23
24
25 class SampleInspector(BaseInspector):
26     event_type = 'compute.host.down'
27
28     def __init__(self, conf, log):
29         super(SampleInspector, self).__init__(conf, log)
30         self.inspector_url = self.get_inspector_url()
31         self.novaclients = list()
32         self._init_novaclients()
33         # Normally we use this client for non redundant API calls
34         self.nova = self.novaclients[0]
35
36         auth = get_identity_auth(project=self.conf.doctor_project)
37         session = get_session(auth=auth)
38         self.neutron = neutron_client(session)
39
40         self.servers = collections.defaultdict(list)
41         self.hostnames = list()
42         self.app = None
43
44     def _init_novaclients(self):
45         self.NUMBER_OF_CLIENTS = self.conf.instance_count
46         auth = get_identity_auth(project=self.conf.doctor_project)
47         session = get_session(auth=auth)
48         for i in range(self.NUMBER_OF_CLIENTS):
49             self.novaclients.append(
50                 nova_client(self.conf.nova_version, session))
51
52     def _init_servers_list(self):
53         self.servers.clear()
54         opts = {'all_tenants': True}
55         servers = self.nova.servers.list(search_opts=opts)
56         for server in servers:
57             try:
58                 host = server.__dict__.get('OS-EXT-SRV-ATTR:host')
59                 self.servers[host].append(server)
60                 self.log.debug('get hostname=%s from server=%s' % (host, server))
61             except Exception as e:
62                 self.log.info('can not get hostname from server=%s' % server)
63
64     def get_inspector_url(self):
65         return 'http://%s:%s' % (self.conf.inspector.ip, self.conf.inspector.port)
66
67     def start(self):
68         self.log.info('sample inspector start......')
69         self._init_servers_list()
70         self.app = InspectorApp(self.conf.inspector.port, self, self.log)
71         self.app.start()
72
73     def stop(self):
74         self.log.info('sample inspector stop......')
75         if not self.app:
76             return
77         for hostname in self.hostnames:
78             self.nova.services.force_down(hostname, 'nova-compute', False)
79
80         headers = {
81             'Content-Type': 'application/json',
82             'Accept': 'application/json',
83         }
84         url = '%s%s' % (self.inspector_url, 'shutdown') \
85             if self.inspector_url.endswith('/') else \
86             '%s%s' % (self.inspector_url, '/shutdown')
87         requests.post(url, data='', headers=headers)
88
89     def handle_events(self, events):
90         for event in events:
91             hostname = event['details']['hostname']
92             event_type = event['type']
93             if event_type == self.event_type:
94                 self.hostnames.append(hostname)
95                 thr1 = self._disable_compute_host(hostname)
96                 thr2 = self._vms_reset_state('error', hostname)
97                 thr3 = self._set_ports_data_plane_status('DOWN', hostname)
98                 thr1.join()
99                 thr2.join()
100                 thr3.join()
101
102     @utils.run_async
103     def _disable_compute_host(self, hostname):
104         self.nova.services.force_down(hostname, 'nova-compute', True)
105         self.log.info('doctor mark host(%s) down at %s' % (hostname, time.time()))
106
107     @utils.run_async
108     def _vms_reset_state(self, state, hostname):
109
110         @utils.run_async
111         def _vm_reset_state(nova, server, state):
112             nova.servers.reset_state(server, state)
113             self.log.info('doctor mark vm(%s) error at %s' % (server, time.time()))
114
115         thrs = []
116         for nova, server in zip(self.novaclients, self.servers[hostname]):
117             t = _vm_reset_state(nova, server, state)
118             thrs.append(t)
119         for t in thrs:
120             t.join()
121
122     @utils.run_async
123     def _set_ports_data_plane_status(self, status, hostname):
124         body = {'data_plane_status': status}
125
126         @utils.run_async
127         def _set_port_data_plane_status(port_id):
128             self.neutron.update_port(port_id, body)
129             self.log.info('doctor set data plane status %s on port %s' % (status, port_id))
130
131         thrs = []
132         params = {'binding:host_id': hostname}
133         for port_id in self.neutron.list_ports(**params):
134             t = _set_port_data_plane_status(port_id)
135             thrs.append(t)
136         for t in thrs:
137             t.join()
138
139
140 class InspectorApp(Thread):
141
142     def __init__(self, port, inspector, log):
143         Thread.__init__(self)
144         self.port = port
145         self.inspector = inspector
146         self.log = log
147
148     def run(self):
149         app = Flask('inspector')
150
151         @app.route('/events', methods=['PUT'])
152         def event_posted():
153             self.log.info('event posted in sample inspector at %s' % time.time())
154             self.log.info('sample inspector = %s' % self.inspector)
155             self.log.info('sample inspector received data = %s' % request.data)
156             events = json.loads(request.data.decode('utf8'))
157             self.inspector.handle_events(events)
158             return "OK"
159
160         @app.route('/shutdown', methods=['POST'])
161         def shutdown():
162             self.log.info('shutdown inspector app server at %s' % time.time())
163             func = request.environ.get('werkzeug.server.shutdown')
164             if func is None:
165                 raise RuntimeError('Not running with the Werkzeug Server')
166             func()
167             return 'inspector app shutting down...'
168
169         app.run(host="0.0.0.0", port=self.port)