from flask import Flask
from flask import request
import json
+import oslo_messaging
import time
from threading import Thread
import requests
+import yaml
from doctor_tests.common import utils
from doctor_tests.identity_auth import get_identity_auth
class SampleInspector(BaseInspector):
event_type = 'compute.host.down'
- def __init__(self, conf, log):
+ def __init__(self, conf, log, trasport_url):
super(SampleInspector, self).__init__(conf, log)
self.inspector_url = self.get_inspector_url()
self.novaclients = list()
auth = get_identity_auth(project=self.conf.doctor_project)
session = get_session(auth=auth)
- self.neutron = neutron_client(session)
+ if self.conf.inspector.update_neutron_port_dp_status:
+ self.neutron = neutron_client(session)
self.servers = collections.defaultdict(list)
self.hostnames = list()
self.app = None
+ try:
+ transport = oslo_messaging.get_notification_transport(self.conf,
+ trasport_url)
+ self.notif = oslo_messaging.Notifier(transport,
+ 'compute.instance.update',
+ driver='messaging',
+ topics=['notifications'])
+ self.notif = self.notif.prepare(publisher_id='sample')
+ except:
+ self.notif = None
+
def _init_novaclients(self):
self.NUMBER_OF_CLIENTS = self.conf.instance_count
auth = get_identity_auth(project=self.conf.doctor_project)
def _init_servers_list(self):
self.servers.clear()
opts = {'all_tenants': True}
- servers = self.nova.servers.list(search_opts=opts)
+ servers = self.nova.servers.list(detailed=True, search_opts=opts)
for server in servers:
try:
host = server.__dict__.get('OS-EXT-SRV-ATTR:host')
self.servers[host].append(server)
- self.log.debug('get hostname=%s from server=%s' % (host, server))
+ self.log.debug('get hostname=%s from server=%s'
+ % (host, server))
except Exception as e:
- self.log.info('can not get hostname from server=%s' % server)
+ self.log.info('can not get hostname from server=%s, error=%s'
+ % (server, e))
def get_inspector_url(self):
- return 'http://%s:%s' % (self.conf.inspector.ip, self.conf.inspector.port)
+ return 'http://%s:%s/events' % (self.conf.inspector.ip,
+ self.conf.inspector.port)
def start(self):
self.log.info('sample inspector start......')
event_type = event['type']
if event_type == self.event_type:
self.hostnames.append(hostname)
+ if self.notif is not None:
+ thr0 = self._send_notif(hostname)
thr1 = self._disable_compute_host(hostname)
thr2 = self._vms_reset_state('error', hostname)
- thr3 = self._set_ports_data_plane_status('DOWN', hostname)
+ if self.conf.inspector.update_neutron_port_dp_status:
+ thr3 = self._set_ports_data_plane_status('DOWN', hostname)
+ if self.notif is not None:
+ thr0.join()
thr1.join()
thr2.join()
- thr3.join()
+ if self.conf.inspector.update_neutron_port_dp_status:
+ thr3.join()
+
+ def _alarm_data_decoder(self, data):
+ if "[" in data or "{" in data:
+ # string to list or dict removing unicode
+ data = yaml.load(data.replace("u'", "'"))
+ return data
+
+ def _alarm_traits_decoder(self, data):
+ return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
+ for t in data['reason_data']['event']['traits']})
+
+ def maintenance(self, data):
+ try:
+ payload = self._alarm_traits_decoder(data)
+ except:
+ payload = ({t[0]: t[2] for t in
+ data['reason_data']['event']['traits']})
+ self.log.error('cannot parse alarm data: %s' % payload)
+ raise Exception('sample inspector cannot parse alarm.'
+ 'Possibly trait data over 256 char')
+ self.log.info('sample inspector received data = %s' % payload)
+
+ state = payload['state']
+ host = payload['host']
+
+ if state == 'IN_MAINTENANCE':
+ self.log.info("sample inspector: disable %s automatic fault "
+ "management" % host)
+ elif state == 'MAINTENANCE_COMPLETE':
+ self.log.info("sample inspector: enable %s automatic fault "
+ "management" % host)
+ else:
+ raise("sample inspector couldn't handle state: %s" % state)
@utils.run_async
def _disable_compute_host(self, hostname):
self.nova.services.force_down(hostname, 'nova-compute', True)
- self.log.info('doctor mark host(%s) down at %s' % (hostname, time.time()))
+
+ hostdown_time = time.time()
+ self.host_down_time = hostdown_time
+ self.log.info('doctor mark host(%s) down at %s'
+ % (hostname, hostdown_time))
@utils.run_async
def _vms_reset_state(self, state, hostname):
@utils.run_async
def _vm_reset_state(nova, server, state):
nova.servers.reset_state(server, state)
- self.log.info('doctor mark vm(%s) error at %s' % (server, time.time()))
+ vmdown_time = time.time()
+ self.vm_down_time = vmdown_time
+ self.log.info('doctor mark vm(%s) %s at %s'
+ % (server, state, vmdown_time))
thrs = []
for nova, server in zip(self.novaclients, self.servers[hostname]):
for t in thrs:
t.join()
+ @utils.run_async
+ def _send_notif(self, hostname):
+
+ @utils.run_async
+ def _send_notif(server):
+ payload = dict(tenant_id=server.tenant_id,
+ instance_id=server.id,
+ state="error")
+ self.notif.info({'some': 'context'}, 'compute.instance.update',
+ payload)
+ self.log.info('doctor compute.instance.update vm(%s) error %s'
+ % (server, time.time()))
+
+ thrs = []
+ for server in self.servers[hostname]:
+ t = _send_notif(server)
+ thrs.append(t)
+ for t in thrs:
+ t.join()
+
@utils.run_async
def _set_ports_data_plane_status(self, status, hostname):
body = {'data_plane_status': status}
@utils.run_async
def _set_port_data_plane_status(port_id):
self.neutron.update_port(port_id, body)
- self.log.info('doctor set data plane status %s on port %s' % (status, port_id))
+ self.log.info('doctor set data plane status %s on port %s'
+ % (status, port_id))
thrs = []
params = {'binding:host_id': hostname}
@app.route('/events', methods=['PUT'])
def event_posted():
- self.log.info('event posted in sample inspector at %s' % time.time())
+ self.log.info('event posted in sample inspector at %s'
+ % time.time())
self.log.info('sample inspector = %s' % self.inspector)
- self.log.info('sample inspector received data = %s' % request.data)
+ self.log.info('sample inspector received data = %s'
+ % request.data)
events = json.loads(request.data.decode('utf8'))
self.inspector.handle_events(events)
return "OK"
- @app.route('/shutdown', methods=['POST'])
+ @app.route('/maintenance', methods=['POST'])
+ def maintenance():
+ self.inspector.maintenance(request.json)
+ return "OK"
+
+ @app.route('/events/shutdown', methods=['POST'])
def shutdown():
self.log.info('shutdown inspector app server at %s' % time.time())
func = request.environ.get('werkzeug.server.shutdown')