X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=doctor_tests%2Finspector%2Fsample.py;h=70156b20901c5552fd5ffaccb7a2af9112ff6583;hb=a0528f67abe01f0bb4be3565cfef4fd049afa1fa;hp=54328727f6d29425d6bd8986f2fb2c9910af9f29;hpb=171f4b42c7d6e179a987b98b46d8c9ae6f2fc36d;p=doctor.git diff --git a/doctor_tests/inspector/sample.py b/doctor_tests/inspector/sample.py index 54328727..70156b20 100644 --- a/doctor_tests/inspector/sample.py +++ b/doctor_tests/inspector/sample.py @@ -10,9 +10,11 @@ import collections from flask import Flask from flask import request import json +import oslo_messaging import time from threading import Thread import requests +import yaml from doctor_tests.common import utils from doctor_tests.identity_auth import get_identity_auth @@ -25,7 +27,7 @@ from doctor_tests.inspector.base import BaseInspector class SampleInspector(BaseInspector): event_type = 'compute.host.down' - def __init__(self, conf, log): + def __init__(self, conf, log, trasport_url): super(SampleInspector, self).__init__(conf, log) self.inspector_url = self.get_inspector_url() self.novaclients = list() @@ -42,6 +44,17 @@ class SampleInspector(BaseInspector): self.hostnames = list() self.app = None + try: + transport = oslo_messaging.get_notification_transport(self.conf, + trasport_url) + self.notif = oslo_messaging.Notifier(transport, + 'compute.instance.update', + driver='messaging', + topics=['notifications']) + self.notif = self.notif.prepare(publisher_id='sample') + except: + self.notif = None + def _init_novaclients(self): self.NUMBER_OF_CLIENTS = self.conf.instance_count auth = get_identity_auth(project=self.conf.doctor_project) @@ -53,17 +66,20 @@ class SampleInspector(BaseInspector): def _init_servers_list(self): self.servers.clear() opts = {'all_tenants': True} - servers = self.nova.servers.list(search_opts=opts) + servers = self.nova.servers.list(detailed=True, search_opts=opts) for server in servers: try: host = server.__dict__.get('OS-EXT-SRV-ATTR:host') self.servers[host].append(server) - self.log.debug('get hostname=%s from server=%s' % (host, server)) + self.log.debug('get hostname=%s from server=%s' + % (host, str(server.name))) except Exception as e: - self.log.info('can not get hostname from server=%s' % server) + self.log.info('can not get hostname from server=%s, error=%s' + % (server, e)) def get_inspector_url(self): - return 'http://%s:%s' % (self.conf.inspector.ip, self.conf.inspector.port) + return 'http://%s:%s/events' % (self.conf.inspector.ip, + self.conf.inspector.port) def start(self): self.log.info('sample inspector start......') @@ -93,19 +109,60 @@ class SampleInspector(BaseInspector): event_type = event['type'] if event_type == self.event_type: self.hostnames.append(hostname) + if self.notif is not None: + thr0 = self._send_notif(hostname) thr1 = self._disable_compute_host(hostname) thr2 = self._vms_reset_state('error', hostname) if self.conf.inspector.update_neutron_port_dp_status: thr3 = self._set_ports_data_plane_status('DOWN', hostname) + if self.notif is not None: + thr0.join() thr1.join() thr2.join() if self.conf.inspector.update_neutron_port_dp_status: thr3.join() + def _alarm_data_decoder(self, data): + if "[" in data or "{" in data: + # string to list or dict removing unicode + data = yaml.load(data.replace("u'", "'")) + return data + + def _alarm_traits_decoder(self, data): + return ({str(t[0]): self._alarm_data_decoder(str(t[2])) + for t in data['reason_data']['event']['traits']}) + + def maintenance(self, data): + try: + payload = self._alarm_traits_decoder(data) + except: + payload = ({t[0]: t[2] for t in + data['reason_data']['event']['traits']}) + self.log.error('cannot parse alarm data: %s' % payload) + raise Exception('sample inspector cannot parse alarm.' + 'Possibly trait data over 256 char') + self.log.info('sample inspector received data = %s' % payload) + + state = payload['state'] + host = payload['host'] + + if state == 'IN_MAINTENANCE': + self.log.info("sample inspector: disable %s automatic fault " + "management" % host) + elif state == 'MAINTENANCE_COMPLETE': + self.log.info("sample inspector: enable %s automatic fault " + "management" % host) + else: + raise("sample inspector couldn't handle state: %s" % state) + @utils.run_async def _disable_compute_host(self, hostname): self.nova.services.force_down(hostname, 'nova-compute', True) - self.log.info('doctor mark host(%s) down at %s' % (hostname, time.time())) + + hostdown_time = time.time() + self.host_down_time = hostdown_time + self.log.info('doctor mark host(%s) down at %s' + % (hostname, hostdown_time)) @utils.run_async def _vms_reset_state(self, state, hostname): @@ -113,7 +170,10 @@ class SampleInspector(BaseInspector): @utils.run_async def _vm_reset_state(nova, server, state): nova.servers.reset_state(server, state) - self.log.info('doctor mark vm(%s) error at %s' % (server, time.time())) + vmdown_time = time.time() + self.vm_down_time = vmdown_time + self.log.info('doctor mark vm(%s) %s at %s' + % (server, state, vmdown_time)) thrs = [] for nova, server in zip(self.novaclients, self.servers[hostname]): @@ -122,6 +182,26 @@ class SampleInspector(BaseInspector): for t in thrs: t.join() + @utils.run_async + def _send_notif(self, hostname): + + @utils.run_async + def _send_notif(server): + payload = dict(tenant_id=server.tenant_id, + instance_id=server.id, + state="error") + self.notif.info({'some': 'context'}, 'compute.instance.update', + payload) + self.log.info('doctor compute.instance.update vm(%s) error %s' + % (server, time.time())) + + thrs = [] + for server in self.servers[hostname]: + t = _send_notif(server) + thrs.append(t) + for t in thrs: + t.join() + @utils.run_async def _set_ports_data_plane_status(self, status, hostname): body = {'data_plane_status': status} @@ -129,7 +209,8 @@ class SampleInspector(BaseInspector): @utils.run_async def _set_port_data_plane_status(port_id): self.neutron.update_port(port_id, body) - self.log.info('doctor set data plane status %s on port %s' % (status, port_id)) + self.log.info('doctor set data plane status %s on port %s' + % (status, port_id)) thrs = [] params = {'binding:host_id': hostname} @@ -153,14 +234,21 @@ class InspectorApp(Thread): @app.route('/events', methods=['PUT']) def event_posted(): - self.log.info('event posted in sample inspector at %s' % time.time()) + self.log.info('event posted in sample inspector at %s' + % time.time()) self.log.info('sample inspector = %s' % self.inspector) - self.log.info('sample inspector received data = %s' % request.data) + self.log.info('sample inspector received data = %s' + % request.data) events = json.loads(request.data.decode('utf8')) self.inspector.handle_events(events) return "OK" - @app.route('/shutdown', methods=['POST']) + @app.route('/maintenance', methods=['POST']) + def maintenance(): + self.inspector.maintenance(request.json) + return "OK" + + @app.route('/events/shutdown', methods=['POST']) def shutdown(): self.log.info('shutdown inspector app server at %s' % time.time()) func = request.environ.get('werkzeug.server.shutdown')