support pep8 check
[doctor.git] / doctor_tests / inspector / sample.py
1 ##############################################################################
2 # Copyright (c) 2017 ZTE Corporation and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9 import collections
10 from flask import Flask
11 from flask import request
12 import json
13 import time
14 from threading import Thread
15 import requests
16
17 from doctor_tests.common import utils
18 from doctor_tests.identity_auth import get_identity_auth
19 from doctor_tests.identity_auth import get_session
20 from doctor_tests.os_clients import nova_client
21 from doctor_tests.os_clients import neutron_client
22 from doctor_tests.inspector.base import BaseInspector
23
24
25 class SampleInspector(BaseInspector):
26     event_type = 'compute.host.down'
27
28     def __init__(self, conf, log):
29         super(SampleInspector, self).__init__(conf, log)
30         self.inspector_url = self.get_inspector_url()
31         self.novaclients = list()
32         self._init_novaclients()
33         # Normally we use this client for non redundant API calls
34         self.nova = self.novaclients[0]
35
36         auth = get_identity_auth(project=self.conf.doctor_project)
37         session = get_session(auth=auth)
38         if self.conf.inspector.update_neutron_port_dp_status:
39             self.neutron = neutron_client(session)
40
41         self.servers = collections.defaultdict(list)
42         self.hostnames = list()
43         self.app = None
44
45     def _init_novaclients(self):
46         self.NUMBER_OF_CLIENTS = self.conf.instance_count
47         auth = get_identity_auth(project=self.conf.doctor_project)
48         session = get_session(auth=auth)
49         for i in range(self.NUMBER_OF_CLIENTS):
50             self.novaclients.append(
51                 nova_client(self.conf.nova_version, session))
52
53     def _init_servers_list(self):
54         self.servers.clear()
55         opts = {'all_tenants': True}
56         servers = self.nova.servers.list(search_opts=opts)
57         for server in servers:
58             try:
59                 host = server.__dict__.get('OS-EXT-SRV-ATTR:host')
60                 self.servers[host].append(server)
61                 self.log.debug('get hostname=%s from server=%s'
62                                % (host, server))
63             except Exception as e:
64                 self.log.info('can not get hostname from server=%s, error=%s'
65                               % (server, e))
66
67     def get_inspector_url(self):
68         return 'http://%s:%s/events' % (self.conf.inspector.ip,
69                                         self.conf.inspector.port)
70
71     def start(self):
72         self.log.info('sample inspector start......')
73         self._init_servers_list()
74         self.app = InspectorApp(self.conf.inspector.port, self, self.log)
75         self.app.start()
76
77     def stop(self):
78         self.log.info('sample inspector stop......')
79         if not self.app:
80             return
81         for hostname in self.hostnames:
82             self.nova.services.force_down(hostname, 'nova-compute', False)
83
84         headers = {
85             'Content-Type': 'application/json',
86             'Accept': 'application/json',
87         }
88         url = '%s%s' % (self.inspector_url, 'shutdown') \
89             if self.inspector_url.endswith('/') else \
90             '%s%s' % (self.inspector_url, '/shutdown')
91         requests.post(url, data='', headers=headers)
92
93     def handle_events(self, events):
94         for event in events:
95             hostname = event['details']['hostname']
96             event_type = event['type']
97             if event_type == self.event_type:
98                 self.hostnames.append(hostname)
99                 thr1 = self._disable_compute_host(hostname)
100                 thr2 = self._vms_reset_state('error', hostname)
101                 if self.conf.inspector.update_neutron_port_dp_status:
102                     thr3 = self._set_ports_data_plane_status('DOWN', hostname)
103                 thr1.join()
104                 thr2.join()
105                 if self.conf.inspector.update_neutron_port_dp_status:
106                     thr3.join()
107
108     @utils.run_async
109     def _disable_compute_host(self, hostname):
110         self.nova.services.force_down(hostname, 'nova-compute', True)
111         self.log.info('doctor mark host(%s) down at %s'
112                       % (hostname, time.time()))
113
114     @utils.run_async
115     def _vms_reset_state(self, state, hostname):
116
117         @utils.run_async
118         def _vm_reset_state(nova, server, state):
119             nova.servers.reset_state(server, state)
120             self.log.info('doctor mark vm(%s) error at %s'
121                           % (server, time.time()))
122
123         thrs = []
124         for nova, server in zip(self.novaclients, self.servers[hostname]):
125             t = _vm_reset_state(nova, server, state)
126             thrs.append(t)
127         for t in thrs:
128             t.join()
129
130     @utils.run_async
131     def _set_ports_data_plane_status(self, status, hostname):
132         body = {'data_plane_status': status}
133
134         @utils.run_async
135         def _set_port_data_plane_status(port_id):
136             self.neutron.update_port(port_id, body)
137             self.log.info('doctor set data plane status %s on port %s'
138                           % (status, port_id))
139
140         thrs = []
141         params = {'binding:host_id': hostname}
142         for port_id in self.neutron.list_ports(**params):
143             t = _set_port_data_plane_status(port_id)
144             thrs.append(t)
145         for t in thrs:
146             t.join()
147
148
149 class InspectorApp(Thread):
150
151     def __init__(self, port, inspector, log):
152         Thread.__init__(self)
153         self.port = port
154         self.inspector = inspector
155         self.log = log
156
157     def run(self):
158         app = Flask('inspector')
159
160         @app.route('/events', methods=['PUT'])
161         def event_posted():
162             self.log.info('event posted in sample inspector at %s'
163                           % time.time())
164             self.log.info('sample inspector = %s' % self.inspector)
165             self.log.info('sample inspector received data = %s'
166                           % request.data)
167             events = json.loads(request.data.decode('utf8'))
168             self.inspector.handle_events(events)
169             return "OK"
170
171         @app.route('/shutdown', methods=['POST'])
172         def shutdown():
173             self.log.info('shutdown inspector app server at %s' % time.time())
174             func = request.environ.get('werkzeug.server.shutdown')
175             if func is None:
176                 raise RuntimeError('Not running with the Werkzeug Server')
177             func()
178             return 'inspector app shutting down...'
179
180         app.run(host="0.0.0.0", port=self.port)