New VNFM supporting ETSI changes 05/69405/8
authorTomi Juvonen <tomi.juvonen@nokia.com>
Thu, 19 Dec 2019 11:12:32 +0000 (13:12 +0200)
committerTomi Juvonen <tomi.juvonen@nokia.com>
Mon, 3 Feb 2020 09:57:44 +0000 (11:57 +0200)
JIRA: DOCTOR-137

Signed-off-by: Tomi Juvonen <tomi.juvonen@nokia.com>
Change-Id: I6e632b24efb6728c171dd93979e3b1de5333251b

doctor_tests/app_manager/__init__.py
doctor_tests/app_manager/vnfm.py [new file with mode: 0644]
doctor_tests/installer/common/set_fenix.sh
doctor_tests/main.py
doctor_tests/scenario/maintenance.py
tox.ini

index 717d658..c2f7591 100644 (file)
@@ -8,12 +8,13 @@
 ##############################################################################
 from oslo_config import cfg
 from oslo_utils import importutils
+import os
 
 
 OPTS = [
     cfg.StrOpt('type',
-               default='sample',
-               choices=['sample'],
+               default=os.environ.get('APP_MANAGER_TYPE', 'sample'),
+               choices=['sample', 'vnfm'],
                help='the component of doctor app manager',
                required=True),
     cfg.StrOpt('ip',
@@ -28,7 +29,8 @@ OPTS = [
 
 
 _app_manager_name_class_mapping = {
-    'sample': 'doctor_tests.app_manager.sample.SampleAppManager'
+    'sample': 'doctor_tests.app_manager.sample.SampleAppManager',
+    'vnfm': 'doctor_tests.app_manager.vnfm.VNFM',
 }
 
 
diff --git a/doctor_tests/app_manager/vnfm.py b/doctor_tests/app_manager/vnfm.py
new file mode 100644 (file)
index 0000000..7a3c9ad
--- /dev/null
@@ -0,0 +1,431 @@
+##############################################################################
+# Copyright (c) 2018 Nokia Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from flask import Flask
+from flask import request
+import json
+import requests
+from threading import Thread
+import time
+import uuid
+import yaml
+
+from doctor_tests.app_manager.base import BaseAppManager
+from doctor_tests.identity_auth import get_identity_auth
+from doctor_tests.identity_auth import get_session
+from doctor_tests.os_clients import neutron_client
+from doctor_tests.os_clients import nova_client
+from doctor_tests.os_clients import keystone_client
+
+
+class VNFM(BaseAppManager):
+
+    def __init__(self, stack, conf, log):
+        super(VNFM, self).__init__(conf, log)
+        self.stack = stack
+        self.app = None
+
+    def start(self):
+        self.log.info('VNFM start......')
+        self.app = VNFManager(self.stack, self.conf, self, self.log)
+        self.app.start()
+
+    def stop(self):
+        self.log.info('VNFM stop......')
+        if not self.app:
+            return
+        self.app.delete_constraints()
+        headers = {
+            'Content-Type': 'application/json',
+            'Accept': 'application/json',
+        }
+        url = 'http://%s:%d/shutdown'\
+              % (self.conf.app_manager.ip,
+                 self.conf.app_manager.port)
+        requests.post(url, data='', headers=headers)
+
+
+class VNFManager(Thread):
+
+    def __init__(self, stack, conf, app_manager, log):
+        Thread.__init__(self)
+        self.stack = stack
+        self.conf = conf
+        self.port = self.conf.app_manager.port
+        self.app_manager = app_manager
+        self.log = log
+        self.intance_ids = None
+        self.auth = get_identity_auth(project=self.conf.doctor_project)
+        self.session = get_session(auth=self.auth)
+        self.keystone = keystone_client(
+            self.conf.keystone_version, self.session)
+        self.nova = nova_client(self.conf.nova_version,
+                                self.session)
+        self.neutron = neutron_client(session=self.session)
+        self.headers = {
+            'Content-Type': 'application/json',
+            'Accept': 'application/json'}
+        if self.conf.admin_tool.type == 'fenix':
+            self.headers['X-Auth-Token'] = self.session.get_token()
+        self.orig_number_of_instances = self.number_of_instances()
+        # List of instances
+        self.ha_instances = []
+        self.nonha_instances = []
+        # Different instance_id specific constraints {instanse_id: {},...}
+        self.instance_constraints = None
+        # Update existing instances to instance lists
+        self.update_instances()
+        nonha_instances = len(self.nonha_instances)
+        if nonha_instances < 7:
+            self.scale = 2
+            self.max_impacted = 2
+        else:
+            self.scale = int((nonha_instances) / 2)
+            self.max_impacted = self.scale - 1
+        self.log.info('Init nonha_instances: %s scale: %s: max_impacted %s' %
+                      (nonha_instances, self.scale, self.max_impacted))
+        # Different instance groups constraints dict
+        self.ha_group = None
+        self.nonha_group = None
+        # Floating IP used in HA instance
+        self.floating_ip = None
+        # VNF project_id
+        self.project_id = None
+        # HA instance_id that is active / has floating IP
+        self.active_instance_id = self.active_instance_id()
+
+        services = self.keystone.services.list()
+        for service in services:
+            if service.type == 'maintenance':
+                self.log.info('maintenance service: %s:%s type %s'
+                              % (service.name, service.id, service.type))
+                maint_id = service.id
+        self.maint_endpoint = [ep.url for ep in self.keystone.endpoints.list()
+                               if ep.service_id == maint_id and
+                               ep.interface == 'public'][0]
+        self.log.info('maintenance endpoint: %s' % self.maint_endpoint)
+
+        self.update_constraints()
+
+    def delete_remote_instance_constraints(self, instance_id):
+        url = "%s/instance/%s" % (self.maint_endpoint, instance_id)
+        self.log.info('DELETE: %s' % url)
+        ret = requests.delete(url, data=None, headers=self.headers)
+        if ret.status_code != 200 and ret.status_code != 204:
+            raise Exception(ret.text)
+
+    def update_remote_instance_constraints(self, instance):
+        url = "%s/instance/%s" % (self.maint_endpoint, instance["instance_id"])
+        self.log.info('PUT: %s' % url)
+        ret = requests.put(url, data=json.dumps(instance),
+                           headers=self.headers)
+        if ret.status_code != 200 and ret.status_code != 204:
+            raise Exception(ret.text)
+
+    def delete_remote_group_constraints(self, instance_group):
+        url = "%s/instance_group/%s" % (self.maint_endpoint,
+                                        instance_group["group_id"])
+        self.log.info('DELETE: %s' % url)
+        ret = requests.delete(url, data=None, headers=self.headers)
+        if ret.status_code != 200 and ret.status_code != 204:
+            raise Exception(ret.text)
+
+    def update_remote_group_constraints(self, instance_group):
+        url = "%s/instance_group/%s" % (self.maint_endpoint,
+                                        instance_group["group_id"])
+        self.log.info('PUT: %s' % url)
+        ret = requests.put(url, data=json.dumps(instance_group),
+                           headers=self.headers)
+        if ret.status_code != 200 and ret.status_code != 204:
+            raise Exception(ret.text)
+
+    def delete_constraints(self):
+        if self.conf.admin_tool.type == 'fenix':
+            self.headers['X-Auth-Token'] = self.session.get_token()
+        for instance_id in self.instance_constraints:
+            self.delete_remote_instance_constraints(instance_id)
+        self.delete_remote_group_constraints(self.nonha_group)
+        self.delete_remote_group_constraints(self.ha_group)
+
+    def update_constraints(self):
+        self.log.info('Update constraints')
+        if self.project_id is None:
+            self.project_id = self.keystone.projects.list(
+                name=self.conf.doctor_project)[0].id
+        if self.nonha_group is None:
+            # Nova does not support groupping instances that do not belong to
+            # anti-affinity server_groups. Anyhow all instances need groupping
+            self.nonha_group = {
+                "group_id": str(uuid.uuid4()),
+                "project_id": self.project_id,
+                "group_name": "doctor_nonha_app_group",
+                "anti_affinity_group": False,
+                "max_instances_per_host": 0,
+                "max_impacted_members": self.max_impacted,
+                "recovery_time": 2,
+                "resource_mitigation": True}
+            self.log.info('create doctor_nonha_app_group constraints: %s'
+                          % self.nonha_group)
+            self.update_remote_group_constraints(self.nonha_group)
+        if self.ha_group is None:
+            group_id = [sg.id for sg in self.nova.server_groups.list()
+                        if sg.name == "doctor_ha_app_group"][0]
+            self.ha_group = {
+                "group_id": group_id,
+                "project_id": self.project_id,
+                "group_name": "doctor_ha_app_group",
+                "anti_affinity_group": True,
+                "max_instances_per_host": 1,
+                "max_impacted_members": 1,
+                "recovery_time": 4,
+                "resource_mitigation": True}
+            self.log.info('create doctor_ha_app_group constraints: %s'
+                          % self.nonha_group)
+            self.update_remote_group_constraints(self.ha_group)
+        instance_constraints = {}
+        for ha_instance in self.ha_instances:
+            instance = {
+                "instance_id": ha_instance.id,
+                "project_id": self.project_id,
+                "group_id": self.ha_group["group_id"],
+                "instance_name": ha_instance.name,
+                "max_interruption_time": 120,
+                "migration_type": "MIGRATION",
+                "resource_mitigation": True,
+                "lead_time": 40}
+            self.log.info('create ha instance constraints: %s'
+                          % instance)
+            instance_constraints[ha_instance.id] = instance
+        for nonha_instance in self.nonha_instances:
+            instance = {
+                "instance_id": nonha_instance.id,
+                "project_id": self.project_id,
+                "group_id": self.nonha_group["group_id"],
+                "instance_name": nonha_instance.name,
+                "max_interruption_time": 120,
+                "migration_type": "MIGRATION",
+                "resource_mitigation": True,
+                "lead_time": 40}
+            self.log.info('create nonha instance constraints: %s'
+                          % instance)
+            instance_constraints[nonha_instance.id] = instance
+        if not self.instance_constraints:
+            # Initial instance constraints
+            self.log.info('create initial instances constraints...')
+            for instance in [instance_constraints[i] for i
+                             in instance_constraints]:
+                self.update_remote_instance_constraints(instance)
+            self.instance_constraints = instance_constraints.copy()
+        else:
+            self.log.info('check instances constraints changes...')
+            added = [i for i in instance_constraints.keys()
+                     if i not in self.instance_constraints]
+            deleted = [i for i in self.instance_constraints.keys()
+                       if i not in instance_constraints]
+            modified = [i for i in instance_constraints.keys()
+                        if (i not in added and i not in deleted and
+                            instance_constraints[i] !=
+                            self.instance_constraints[i])]
+            for instance_id in deleted:
+                self.delete_remote_instance_constraints(instance_id)
+            updated = added + modified
+            for instance in [instance_constraints[i] in i in updated]:
+                self.update_remote_instance_constraints(instance)
+            if updated or deleted:
+                # Some instance constraints have changed
+                self.instance_constraints = instance_constraints.copy()
+
+    def active_instance_id(self):
+        # Need rertry as it takes time after heat template done before
+        # Floating IP in place
+        retry = 5
+        while retry > 0:
+            for instance in self.ha_instances:
+                network_interfaces = next(iter(instance.addresses.values()))
+                for network_interface in network_interfaces:
+                    _type = network_interface.get('OS-EXT-IPS:type')
+                    if _type == "floating":
+                        if not self.floating_ip:
+                            self.floating_ip = network_interface.get('addr')
+                        self.log.debug('active_instance: %s %s' %
+                                       (instance.name, instance.id))
+                        return instance.id
+            time.sleep(2)
+            self.update_instances()
+            retry -= 1
+        raise Exception("No active instance found")
+
+    def switch_over_ha_instance(self):
+        for instance in self.ha_instances:
+            if instance.id != self.active_instance_id:
+                self.log.info('Switch over to: %s %s' % (instance.name,
+                                                         instance.id))
+                # Deprecated, need to use neutron instead
+                # instance.add_floating_ip(self.floating_ip)
+                port = self.neutron.list_ports(device_id=instance.id)['ports'][0]['id']  # noqa
+                floating_id = self.neutron.list_floatingips(floating_ip_address=self.floating_ip)['floatingips'][0]['id']  # noqa
+                self.neutron.update_floatingip(floating_id, {'floatingip': {'port_id': port}})  # noqa
+                # Have to update ha_instances as floating_ip changed
+                self.update_instances()
+                self.active_instance_id = instance.id
+                break
+
+    def get_instance_ids(self):
+        ret = list()
+        for instance in self.nova.servers.list(detailed=False):
+            ret.append(instance.id)
+        return ret
+
+    def update_instances(self):
+        instances = self.nova.servers.list(detailed=True)
+        self.ha_instances = [i for i in instances
+                             if "doctor_ha_app_" in i.name]
+        self.nonha_instances = [i for i in instances
+                                if "doctor_nonha_app_" in i.name]
+
+    def _alarm_data_decoder(self, data):
+        if "[" in data or "{" in data:
+            # string to list or dict removing unicode
+            data = yaml.load(data.replace("u'", "'"))
+        return data
+
+    def _alarm_traits_decoder(self, data):
+        return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
+                for t in data['reason_data']['event']['traits']})
+
+    def get_session_instance_ids(self, url, session_id):
+        ret = requests.get(url, data=None, headers=self.headers)
+        if ret.status_code != 200:
+            raise Exception(ret.text)
+        self.log.info('get_instance_ids %s' % ret.json())
+        return ret.json()['instance_ids']
+
+    def scale_instances(self, number_of_instances):
+        number_of_instances_before = self.number_of_instances()
+
+        parameters = self.stack.parameters
+        parameters['nonha_intances'] += number_of_instances
+        self.stack.update(self.stack.stack_name,
+                          self.stack.stack_id,
+                          self.stack.template,
+                          parameters=parameters,
+                          files=self.stack.files)
+
+        number_of_instances_after = self.number_of_instances()
+        if (number_of_instances_before + number_of_instances !=
+           number_of_instances_after):
+            self.log.error('scale_instances with: %d from: %d ends up to: %d'
+                           % (number_of_instances, number_of_instances_before,
+                              number_of_instances_after))
+            raise Exception('scale_instances failed')
+
+        self.log.info('scaled instances from %d to %d' %
+                      (number_of_instances_before,
+                       number_of_instances_after))
+
+    def number_of_instances(self):
+        return len(self.nova.servers.list(detailed=False))
+
+    def run(self):
+        app = Flask('VNFM')
+
+        @app.route('/maintenance', methods=['POST'])
+        def maintenance_alarm():
+            data = json.loads(request.data.decode('utf8'))
+            try:
+                payload = self._alarm_traits_decoder(data)
+            except Exception:
+                payload = ({t[0]: t[2] for t in
+                           data['reason_data']['event']['traits']})
+                self.log.error('cannot parse alarm data: %s' % payload)
+                raise Exception('VNFM cannot parse alarm.'
+                                'Possibly trait data over 256 char')
+
+            self.log.info('VNFM received data = %s' % payload)
+
+            state = payload['state']
+            reply_state = None
+            reply = dict()
+
+            self.log.info('VNFM state: %s' % state)
+
+            if state == 'MAINTENANCE':
+                instance_ids = (self.get_session_instance_ids(
+                                payload['instance_ids'],
+                                payload['session_id']))
+                reply['instance_ids'] = instance_ids
+                reply_state = 'ACK_MAINTENANCE'
+
+            elif state == 'SCALE_IN':
+                # scale down "self.scale" instances that is VCPUS equaling
+                # at least a single compute node
+                self.scale_instances(-self.scale)
+                reply['instance_ids'] = self.get_instance_ids()
+                reply_state = 'ACK_SCALE_IN'
+
+            elif state == 'MAINTENANCE_COMPLETE':
+                # possibly need to upscale
+                number_of_instances = self.number_of_instances()
+                if self.orig_number_of_instances > number_of_instances:
+                    scale_instances = (self.orig_number_of_instances -
+                                       number_of_instances)
+                    self.scale_instances(scale_instances)
+                reply_state = 'ACK_MAINTENANCE_COMPLETE'
+
+            elif state == 'PREPARE_MAINTENANCE':
+                # TBD from contraints
+                if "MIGRATE" not in payload['allowed_actions']:
+                    raise Exception('MIGRATE not supported')
+                instance_ids = payload['instance_ids'][0]
+                self.log.info('VNFM got instance: %s' % instance_ids)
+                if instance_ids == self.active_instance_id:
+                    self.switch_over_ha_instance()
+                # optional also in contraints
+                reply['instance_action'] = "MIGRATE"
+                reply_state = 'ACK_PREPARE_MAINTENANCE'
+
+            elif state == 'PLANNED_MAINTENANCE':
+                # TBD from contraints
+                if "MIGRATE" not in payload['allowed_actions']:
+                    raise Exception('MIGRATE not supported')
+                instance_ids = payload['instance_ids'][0]
+                self.log.info('VNFM got instance: %s' % instance_ids)
+                if instance_ids == self.active_instance_id:
+                    self.switch_over_ha_instance()
+                # optional also in contraints
+                reply['instance_action'] = "MIGRATE"
+                reply_state = 'ACK_PLANNED_MAINTENANCE'
+
+            elif state == 'INSTANCE_ACTION_DONE':
+                # TBD was action done in allowed window
+                self.log.info('%s' % payload['instance_ids'])
+            else:
+                raise Exception('VNFM received event with'
+                                ' unknown state %s' % state)
+
+            if reply_state:
+                if self.conf.admin_tool.type == 'fenix':
+                    self.headers['X-Auth-Token'] = self.session.get_token()
+                reply['session_id'] = payload['session_id']
+                reply['state'] = reply_state
+                url = payload['reply_url']
+                self.log.info('VNFM reply: %s' % reply)
+                requests.put(url, data=json.dumps(reply), headers=self.headers)
+
+            return 'OK'
+
+        @app.route('/shutdown', methods=['POST'])
+        def shutdown():
+            self.log.info('shutdown VNFM server at %s' % time.time())
+            func = request.environ.get('werkzeug.server.shutdown')
+            if func is None:
+                raise RuntimeError('Not running with the Werkzeug Server')
+            func()
+            return 'VNFM shutting down...'
+
+        app.run(host="0.0.0.0", port=self.port)
index aac376c..68bb4a6 100644 (file)
@@ -75,6 +75,11 @@ echo "password = $OS_PASSWORD" >> fenix-api.conf
 echo "username = $OS_USERNAME" >> fenix-api.conf
 echo "cafile = /opt/stack/data/ca-bundle.pem" >> fenix-api.conf
 
+openstack service list | grep maintenance | {
+openstack service create --name fenix --enable maintenance
+openstack endpoint create --region $OS_REGION_NAME --enable fenix public http://localhost:12347/v1
+}
+
 # Mysql pw
 # MYSQLPW=`cat /var/lib/config-data/mysql/etc/puppet/hieradata/service_configs.json | grep mysql | grep root_password | awk -F": " '{print $2}' | awk -F"\"" '{print $2}'`
 MYSQLPW=root
index 3dea89d..7573fae 100644 (file)
@@ -65,9 +65,8 @@ class DoctorTest(object):
                 self.fault_management.setup()
 
                 # wait for aodh alarms are updated in caches for event
-                # evaluator,
-                # sleep time should be larger than event_alarm_cache_ttl
-                # (default 60)
+                # evaluator,sleep time should be larger than
+                # event_alarm_cache_ttl (default 60)
                 # (tojuvone) Fraser currently needs 120
                 time.sleep(120)
 
index f5b9b51..e6cdccc 100644 (file)
@@ -162,8 +162,12 @@ class Maintenance(object):
 
             data = {'state': 'MAINTENANCE',
                     'maintenance_at': maintenance_at,
-                    'metadata': {'openstack_version': 'Rocky'},
-                    'workflow': 'default'}
+                    'metadata': {'openstack_version': 'Train'}}
+
+            if self.conf.app_manager.type == 'vnfm':
+                data['workflow'] = 'vnf'
+            else:
+                data['workflow'] = 'default'
 
             if self.conf.admin_tool.type == 'sample':
                 data['hosts'] = maintenance_hosts
diff --git a/tox.ini b/tox.ini
index 30feecf..2937c32 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -32,6 +32,7 @@ passenv =
     ADMIN_TOOL_TYPE
     TEST_CASE
     SSH_KEY
+    APP_MANAGER_TYPE
 changedir = {toxinidir}/doctor_tests
 commands = doctor-test
            /usr/bin/find {toxinidir} -type f -name '*.py[co]' -delete -o -type d -name __pycache__ -delete