from flask import Flask
from flask import request
import json
+import oslo_messaging
import time
from threading import Thread
import requests
+import yaml
from doctor_tests.common import utils
from doctor_tests.identity_auth import get_identity_auth
class SampleInspector(BaseInspector):
event_type = 'compute.host.down'
- def __init__(self, conf, log):
+ def __init__(self, conf, log, trasport_url):
super(SampleInspector, self).__init__(conf, log)
self.inspector_url = self.get_inspector_url()
self.novaclients = list()
self.hostnames = list()
self.app = None
+ try:
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ trasport_url)
+ self.notif = oslo_messaging.Notifier(transport,
+ 'compute.instance.update',
+ driver='messaging',
+ topics=['notifications'])
+ self.notif = self.notif.prepare(publisher_id='sample')
+ except:
+ self.notif = None
+
def _init_novaclients(self):
self.NUMBER_OF_CLIENTS = self.conf.instance_count
auth = get_identity_auth(project=self.conf.doctor_project)
def _init_servers_list(self):
self.servers.clear()
opts = {'all_tenants': True}
- servers = self.nova.servers.list(search_opts=opts)
+ servers = self.nova.servers.list(detailed=True, search_opts=opts)
for server in servers:
try:
host = server.__dict__.get('OS-EXT-SRV-ATTR:host')
event_type = event['type']
if event_type == self.event_type:
self.hostnames.append(hostname)
+ if self.notif is not None:
+ thr0 = self._send_notif(hostname)
thr1 = self._disable_compute_host(hostname)
thr2 = self._vms_reset_state('error', hostname)
if self.conf.inspector.update_neutron_port_dp_status:
thr3 = self._set_ports_data_plane_status('DOWN', hostname)
+ if self.notif is not None:
+ thr0.join()
thr1.join()
thr2.join()
if self.conf.inspector.update_neutron_port_dp_status:
thr3.join()
+ def _alarm_data_decoder(self, data):
+ if "[" in data or "{" in data:
+ # string to list or dict removing unicode
+ data = yaml.load(data.replace("u'", "'"))
+ return data
+
+ def _alarm_traits_decoder(self, data):
+ return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
+ for t in data['reason_data']['event']['traits']})
+
+ def maintenance(self, data):
+ try:
+ payload = self._alarm_traits_decoder(data)
+ except:
+ payload = ({t[0]: t[2] for t in
+ data['reason_data']['event']['traits']})
+ self.log.error('cannot parse alarm data: %s' % payload)
+ raise Exception('sample inspector cannot parse alarm.'
+ 'Possibly trait data over 256 char')
+ self.log.info('sample inspector received data = %s' % payload)
+
+ state = payload['state']
+ host = payload['host']
+
+ if state == 'IN_MAINTENANCE':
+ self.log.info("sample inspector: disable %s automatic fault "
+ "management" % host)
+ elif state == 'MAINTENANCE_COMPLETE':
+ self.log.info("sample inspector: enable %s automatic fault "
+ "management" % host)
+ else:
+ raise("sample inspector couldn't handle state: %s" % state)
+
@utils.run_async
def _disable_compute_host(self, hostname):
self.nova.services.force_down(hostname, 'nova-compute', True)
nova.servers.reset_state(server, state)
vmdown_time = time.time()
self.vm_down_time = vmdown_time
- self.log.info('doctor mark vm(%s) error at %s'
- % (server, vmdown_time))
+ self.log.info('doctor mark vm(%s) %s at %s'
+ % (server, state, vmdown_time))
thrs = []
for nova, server in zip(self.novaclients, self.servers[hostname]):
for t in thrs:
t.join()
+ @utils.run_async
+ def _send_notif(self, hostname):
+
+ @utils.run_async
+ def _send_notif(server):
+ payload = dict(tenant_id=server.tenant_id,
+ instance_id=server.id,
+ state="error")
+ self.notif.info({'some': 'context'}, 'compute.instance.update',
+ payload)
+ self.log.info('doctor compute.instance.update vm(%s) error %s'
+ % (server, time.time()))
+
+ thrs = []
+ for server in self.servers[hostname]:
+ t = _send_notif(server)
+ thrs.append(t)
+ for t in thrs:
+ t.join()
+
@utils.run_async
def _set_ports_data_plane_status(self, status, hostname):
body = {'data_plane_status': status}
self.inspector.handle_events(events)
return "OK"
+ @app.route('/maintenance', methods=['POST'])
+ def maintenance():
+ self.inspector.maintenance(request.json)
+ return "OK"
+
@app.route('/events/shutdown', methods=['POST'])
def shutdown():
self.log.info('shutdown inspector app server at %s' % time.time())