From 642987dcca77cdf1ae551d824ab44272f3406f70 Mon Sep 17 00:00:00 2001 From: boucherv Date: Wed, 21 Jun 2017 16:37:56 +0200 Subject: [PATCH] [cloudify_ims] Support Cloudify 4.0 - Delete all shell commands to use cloudify python lib - Cloudify Manager installation with a packaged image - SNAPS integration - Adapt test_vnf unit tests - Initiate test cloudify_ims unit tests (to be completed) JIRA: FUNCTEST-838 Change-Id: Ia4b499d4155e6af5d37d6d5cf4310a5a9693c7ce Signed-off-by: boucherv Signed-off-by: Morgan Richomme --- functest/core/vnf.py | 34 +- functest/opnfv_tests/vnf/ims/clearwater.py | 66 --- .../opnfv_tests/vnf/ims/clearwater_ims_base.py | 13 +- functest/opnfv_tests/vnf/ims/cloudify_ims.py | 602 +++++++++++++------- functest/opnfv_tests/vnf/ims/cloudify_ims.yaml | 59 +- functest/opnfv_tests/vnf/ims/create_venv.sh | 44 -- .../opnfv_tests/vnf/ims/orchestrator_cloudify.py | 232 -------- functest/opnfv_tests/vnf/ims/requirements.pip | 1 - functest/tests/unit/core/test_vnf.py | 404 +++++++------ functest/tests/unit/vnf/ims/test_clearwater.py | 85 --- functest/tests/unit/vnf/ims/test_cloudify_ims.py | 630 ++++++--------------- .../unit/vnf/ims/test_orchestrator_cloudify.py | 176 ------ functest/utils/openstack_utils.py | 59 +- requirements.txt | 2 +- 14 files changed, 873 insertions(+), 1534 deletions(-) delete mode 100644 functest/opnfv_tests/vnf/ims/clearwater.py delete mode 100755 functest/opnfv_tests/vnf/ims/create_venv.sh delete mode 100644 functest/opnfv_tests/vnf/ims/orchestrator_cloudify.py delete mode 100644 functest/opnfv_tests/vnf/ims/requirements.pip delete mode 100644 functest/tests/unit/vnf/ims/test_clearwater.py delete mode 100644 functest/tests/unit/vnf/ims/test_orchestrator_cloudify.py diff --git a/functest/core/vnf.py b/functest/core/vnf.py index 0589b5d2a..a329212d8 100644 --- a/functest/core/vnf.py +++ b/functest/core/vnf.py @@ -43,12 +43,10 @@ class VnfOnBoarding(base.TestCase): def __init__(self, **kwargs): super(VnfOnBoarding, self).__init__(**kwargs) - self.tenant_created = False - self.user_created = False + self.exist_obj = {'tenant': False, 'user': False} self.tenant_name = CONST.__getattribute__( 'vnf_{}_tenant_name'.format(self.case_name)) - self.tenant_description = CONST.__getattribute__( - 'vnf_{}_tenant_description'.format(self.case_name)) + self.creds = {} def run(self, **kwargs): """ @@ -78,8 +76,10 @@ class VnfOnBoarding(base.TestCase): return base.TestCase.EX_OK else: self.result = 0 + self.stop_time = time.time() return base.TestCase.EX_TESTCASE_FAILED except Exception: # pylint: disable=broad-except + self.stop_time = time.time() self.__logger.exception("Exception on VNF testing") return base.TestCase.EX_TESTCASE_FAILED @@ -96,20 +96,24 @@ class VnfOnBoarding(base.TestCase): Raise VnfPreparationException in case of problem """ try: + tenant_description = CONST.__getattribute__( + 'vnf_{}_tenant_description'.format(self.case_name)) self.__logger.info("Prepare VNF: %s, description: %s", - self.tenant_name, self.tenant_description) - admin_creds = os_utils.get_credentials() + self.tenant_name, tenant_description) keystone_client = os_utils.get_keystone_client() - self.tenant_created = os_utils.get_or_create_tenant_for_vnf( - keystone_client, self.tenant_name, self.tenant_description) - self.user_created = os_utils.get_or_create_user_for_vnf( + self.exist_obj['tenant'] = ( + not os_utils.get_or_create_tenant_for_vnf( + keystone_client, + self.tenant_name, + tenant_description)) + self.exist_obj['user'] = not os_utils.get_or_create_user_for_vnf( keystone_client, self.tenant_name) - creds = admin_creds.copy() - creds.update({ + self.creds = { "tenant": self.tenant_name, "username": self.tenant_name, - "password": self.tenant_name - }) + "password": self.tenant_name, + "auth_url": os_utils.get_credentials()['auth_url'] + } return base.TestCase.EX_OK except Exception: # pylint: disable=broad-except self.__logger.exception("Exception raised during VNF preparation") @@ -182,7 +186,7 @@ class VnfOnBoarding(base.TestCase): """ self.__logger.info("test cleaning") keystone_client = os_utils.get_keystone_client() - if self.tenant_created: + if not self.exist_obj['tenant']: os_utils.delete_tenant(keystone_client, self.tenant_name) - if self.user_created: + if not self.exist_obj['user']: os_utils.delete_user(keystone_client, self.tenant_name) diff --git a/functest/opnfv_tests/vnf/ims/clearwater.py b/functest/opnfv_tests/vnf/ims/clearwater.py deleted file mode 100644 index 33ed352dc..000000000 --- a/functest/opnfv_tests/vnf/ims/clearwater.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python -# coding: utf8 -####################################################################### -# -# Copyright (c) 2015 Orange -# valentin.boucher@orange.com -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -######################################################################## - - -class Clearwater(object): - - def __init__(self, inputs={}, orchestrator=None, logger=None): - self.config = inputs - self.orchestrator = orchestrator - self.logger = logger - self.deploy = False - - def set_orchestrator(self, orchestrator): - self.orchestrator = orchestrator - - def set_flavor_id(self, flavor_id): - self.config['flavor_id'] = flavor_id - - def set_image_id(self, image_id): - self.config['image_id'] = image_id - - def set_agent_user(self, agent_user): - self.config['agent_user'] = agent_user - - def set_external_network_name(self, external_network_name): - self.config['external_network_name'] = external_network_name - - def set_public_domain(self, public_domain): - self.config['public_domain'] = public_domain - - def deploy_vnf(self, blueprint, bp_name='clearwater', - dep_name='clearwater-opnfv'): - if self.orchestrator: - self.dep_name = dep_name - error = self.orchestrator.download_upload_and_deploy_blueprint( - blueprint, self.config, bp_name, dep_name) - if error: - return error - - self.deploy = True - - else: - if self.logger: - self.logger.error("Cloudify manager is down or not provide...") - - def undeploy_vnf(self): - if self.orchestrator: - if self.deploy: - self.deploy = False - self.orchestrator.undeploy_deployment(self.dep_name) - else: - if self.logger: - self.logger.error("Clearwater isn't already deploy...") - else: - if self.logger: - self.logger.error("Cloudify manager is down or not provide...") diff --git a/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py b/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py index c1a35be31..25ddca216 100644 --- a/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py +++ b/functest/opnfv_tests/vnf/ims/clearwater_ims_base.py @@ -11,6 +11,7 @@ import logging import os import pkg_resources import shutil +import time import requests @@ -71,7 +72,17 @@ class ClearwaterOnBoardingBase(vnf.VnfOnBoarding): ellis_ip, params['email']) self.logger.info('Create 1st calling number on Ellis') - number_res = self.create_ellis_number(number_url, cookies) + i = 24 + while rq.status_code != 200 and i > 0: + try: + number_res = self.create_ellis_number(number_url, cookies) + break + except: + if i == 1: + raise Exception("Unable to create a number") + self.logger.warn("Unable to create a number. Retry ..") + time.sleep(25) + i = i - 1 output_dict['number'] = number_res if two_numbers: diff --git a/functest/opnfv_tests/vnf/ims/cloudify_ims.py b/functest/opnfv_tests/vnf/ims/cloudify_ims.py index 7d7ce33ea..2dcce4084 100644 --- a/functest/opnfv_tests/vnf/ims/cloudify_ims.py +++ b/functest/opnfv_tests/vnf/ims/cloudify_ims.py @@ -9,33 +9,48 @@ import logging import os -import sys - -import requests +import time import yaml +from scp import SCPClient + +from cloudify_rest_client import CloudifyClient +from cloudify_rest_client.executions import Execution -from functest.opnfv_tests.vnf.ims.clearwater import Clearwater import functest.opnfv_tests.vnf.ims.clearwater_ims_base as clearwater_ims_base -from functest.opnfv_tests.vnf.ims.orchestrator_cloudify import Orchestrator from functest.utils.constants import CONST -import functest.utils.functest_utils as ft_utils import functest.utils.openstack_utils as os_utils +from snaps.openstack.os_credentials import OSCreds +from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \ + OpenStackNetwork +from snaps.openstack.create_security_group import SecurityGroupSettings, \ + SecurityGroupRuleSettings,\ + Direction, Protocol, \ + OpenStackSecurityGroup +from snaps.openstack.create_router import RouterSettings, OpenStackRouter +from snaps.openstack.create_instance import VmInstanceSettings, \ + FloatingIpSettings, \ + OpenStackVmInstance +from snaps.openstack.create_flavor import FlavorSettings, OpenStackFlavor +from snaps.openstack.create_image import ImageSettings, OpenStackImage +from snaps.openstack.create_keypairs import KeypairSettings, OpenStackKeypair +from snaps.openstack.create_network import PortSettings + +from functest.opnfv_tests.openstack.snaps import snaps_utils + + __author__ = "Valentin Boucher " class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase): """Clearwater vIMS deployed with Cloudify Orchestrator Case""" + __logger = logging.getLogger(__name__) + def __init__(self, **kwargs): if "case_name" not in kwargs: kwargs["case_name"] = "cloudify_ims" super(CloudifyIms, self).__init__(**kwargs) - self.logger = logging.getLogger(__name__) - self.neutron_client = '' - self.glance_client = '' - self.keystone_client = '' - self.nova_client = '' # Retrieve the configuration try: @@ -44,200 +59,285 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase): except Exception: raise Exception("VNF config file not found") + self.snaps_creds = '' + self.created_object = [] + config_file = os.path.join(self.case_dir, self.config) self.orchestrator = dict( - requirements=get_config("cloudify.requirements", config_file), - blueprint=get_config("cloudify.blueprint", config_file), - inputs=get_config("cloudify.inputs", config_file) + requirements=get_config("orchestrator.requirements", config_file), ) - self.logger.debug("Orchestrator configuration: %s", self.orchestrator) + self.details['orchestrator'] = dict( + name=get_config("orchestrator.name", config_file), + version=get_config("orchestrator.version", config_file), + status='ERROR', + result='' + ) + self.__logger.debug("Orchestrator configuration %s", self.orchestrator) self.vnf = dict( - blueprint=get_config("clearwater.blueprint", config_file), - deployment_name=get_config("clearwater.deployment_name", - config_file), - inputs=get_config("clearwater.inputs", config_file), - requirements=get_config("clearwater.requirements", config_file) + descriptor=get_config("vnf.descriptor", config_file), + inputs=get_config("vnf.inputs", config_file), + requirements=get_config("vnf.requirements", config_file) + ) + self.details['vnf'] = dict( + descriptor_version=self.vnf['descriptor']['version'], + name=get_config("vnf.name", config_file), + version=get_config("vnf.version", config_file), ) - self.logger.debug("VNF configuration: %s", self.vnf) + self.__logger.debug("VNF configuration: %s", self.vnf) + self.details['test_vnf'] = dict( + name=get_config("vnf_test_suite.name", config_file), + version=get_config("vnf_test_suite.version", config_file) + ) self.images = get_config("tenant_images", config_file) - self.logger.info("Images needed for vIMS: %s", self.images) + self.__logger.info("Images needed for vIMS: %s", self.images) - def deploy_orchestrator(self): + def prepare(self): + super(CloudifyIms, self).prepare() + + self.__logger.info("Additional pre-configuration steps") - self.logger.info("Additional pre-configuration steps") - self.neutron_client = os_utils.get_neutron_client(self.admin_creds) - self.glance_client = os_utils.get_glance_client(self.admin_creds) - self.keystone_client = os_utils.get_keystone_client(self.admin_creds) - self.nova_client = os_utils.get_nova_client(self.admin_creds) + self.snaps_creds = OSCreds( + username=self.creds['username'], + password=self.creds['password'], + auth_url=self.creds['auth_url'], + project_name=self.creds['tenant'], + identity_api_version=int(os_utils.get_keystone_client_version())) # needs some images - self.logger.info("Upload some OS images if it doesn't exist") - temp_dir = os.path.join(self.data_dir, "tmp/") + self.__logger.info("Upload some OS images if it doesn't exist") for image_name, image_url in self.images.iteritems(): - self.logger.info("image: %s, url: %s", image_name, image_url) - try: - image_id = os_utils.get_image_id(self.glance_client, - image_name) - self.logger.debug("image_id: %s", image_id) - except Exception: - self.logger.error("Unexpected error: %s", sys.exc_info()[0]) - - if image_id == '': - self.logger.info("""%s image does not exist on glance repo. - Try downloading this image - and upload on glance !""", - image_name) - image_id = os_utils.download_and_add_image_on_glance( - self.glance_client, - image_name, - image_url, - temp_dir) - # Need to extend quota - self.logger.info("Update security group quota for this tenant") - tenant_id = os_utils.get_tenant_id(self.keystone_client, - self.tenant_name) - self.logger.debug("Tenant id found %s", tenant_id) - if not os_utils.update_sg_quota(self.neutron_client, - tenant_id, 50, 100): - self.logger.error("Failed to update security group quota" - " for tenant " + self.tenant_name) - - self.logger.debug("group quota extended") - - # start the deployment of cloudify + self.__logger.info("image: %s, url: %s", image_name, image_url) + if image_url and image_name: + image_creator = OpenStackImage( + self.snaps_creds, + ImageSettings(name=image_name, + image_user='cloud', + img_format='qcow2', + url=image_url)) + image_creator.create() + # self.created_object.append(image_creator) + + def deploy_orchestrator(self): + """ + Deploy Cloudify Manager + + network, security group, fip, VM creation + """ + # network creation + + start_time = time.time() + self.__logger.info("Creating keypair ...") + kp_file = os.path.join(self.data_dir, "cloudify_ims.pem") + keypair_settings = KeypairSettings(name='cloudify_ims_kp', + private_filepath=kp_file) + keypair_creator = OpenStackKeypair(self.snaps_creds, keypair_settings) + keypair_creator.create() + self.created_object.append(keypair_creator) + + self.__logger.info("Creating full network ...") + subnet_settings = SubnetSettings(name='cloudify_ims_subnet', + cidr='10.67.79.0/24') + network_settings = NetworkSettings(name='cloudify_ims_network', + subnet_settings=[subnet_settings]) + network_creator = OpenStackNetwork(self.snaps_creds, network_settings) + network_creator.create() + self.created_object.append(network_creator) + ext_net_name = snaps_utils.get_ext_net_name(self.snaps_creds) + router_creator = OpenStackRouter( + self.snaps_creds, + RouterSettings( + name='cloudify_ims_router', + external_gateway=ext_net_name, + internal_subnets=[subnet_settings.name])) + router_creator.create() + self.created_object.append(router_creator) + + # security group creation + self.__logger.info("Creating security group for cloudify manager vm") + sg_rules = list() + sg_rules.append( + SecurityGroupRuleSettings(sec_grp_name="sg-cloudify-manager", + direction=Direction.ingress, + protocol=Protocol.tcp, port_range_min=1, + port_range_max=65535)) + sg_rules.append( + SecurityGroupRuleSettings(sec_grp_name="sg-cloudify-manager", + direction=Direction.ingress, + protocol=Protocol.udp, port_range_min=1, + port_range_max=65535)) + + securit_group_creator = OpenStackSecurityGroup( + self.snaps_creds, + SecurityGroupSettings( + name="sg-cloudify-manager", + rule_settings=sg_rules)) + + securit_group_creator.create() + self.created_object.append(securit_group_creator) + + # orchestrator VM flavor + self.__logger.info("Get or create flavor for cloudify manager vm ...") + + flavor_settings = FlavorSettings( + name=self.orchestrator['requirements']['flavor']['name'], + ram=self.orchestrator['requirements']['flavor']['ram_min'], + disk=50, + vcpus=2) + flavor_creator = OpenStackFlavor(self.snaps_creds, flavor_settings) + flavor_creator.create() + self.created_object.append(flavor_creator) + image_settings = ImageSettings( + name=self.orchestrator['requirements']['os_image'], + image_user='centos', + exists=True) + + port_settings = PortSettings(name='cloudify_manager_port', + network_name=network_settings.name) + + manager_settings = VmInstanceSettings( + name='cloudify_manager', + flavor=flavor_settings.name, + port_settings=[port_settings], + security_group_names=[securit_group_creator.sec_grp_settings.name], + floating_ip_settings=[FloatingIpSettings( + name='cloudify_manager_fip', + port_name=port_settings.name, + router_name=router_creator.router_settings.name)]) + + manager_creator = OpenStackVmInstance(self.snaps_creds, + manager_settings, + image_settings, + keypair_settings) + + self.__logger.info("Creating cloudify manager VM") + manager_creator.create() + self.created_object.append(manager_creator) + public_auth_url = os_utils.get_endpoint('identity') - self.logger.debug("CFY inputs: %s", self.orchestrator['inputs']) - cfy = Orchestrator(self.data_dir, self.orchestrator['inputs']) - self.orchestrator['object'] = cfy - self.logger.debug("Orchestrator object created") + self.__logger.info("Set creds for cloudify manager") + cfy_creds = dict(keystone_username=self.tenant_name, + keystone_password=self.tenant_name, + keystone_tenant_name=self.tenant_name, + keystone_url=public_auth_url) - self.logger.debug("Tenant name: %s", self.tenant_name) + cfy_client = CloudifyClient(host=manager_creator.get_floating_ip().ip, + username='admin', + password='admin', + tenant='default_tenant') - cfy.set_credentials(username=self.tenant_name, - password=self.tenant_name, - tenant_name=self.tenant_name, - auth_url=public_auth_url) - self.logger.info("Credentials set in CFY") + self.orchestrator['object'] = cfy_client - # orchestrator VM flavor - self.logger.info("Check Flavor is available, if not create one") - self.logger.debug("Flavor details %s ", - self.orchestrator['requirements']['ram_min']) - flavor_exist, flavor_id = os_utils.get_or_create_flavor( - "m1.large", - self.orchestrator['requirements']['ram_min'], - '50', - '2', - public=True) - self.logger.debug("Flavor id: %s", flavor_id) - - if not flavor_id: - self.logger.info("Available flavors are: ") - self.logger.info(self.nova_client.flavor.list()) - - cfy.set_flavor_id(flavor_id) - self.logger.debug("Flavor OK") - - # orchestrator VM image - self.logger.debug("Orchestrator image") - if 'os_image' in self.orchestrator['requirements'].keys(): - image_id = os_utils.get_image_id( - self.glance_client, - self.orchestrator['requirements']['os_image']) - self.logger.debug("Orchestrator image id: %s", image_id) - if image_id == '': - self.logger.error("CFY image not found") - - cfy.set_image_id(image_id) - self.logger.debug("Orchestrator image set") - - self.logger.debug("Get External network") - ext_net = os_utils.get_external_net(self.neutron_client) - self.logger.debug("External network: %s", ext_net) - - cfy.set_external_network_name(ext_net) - self.logger.debug("CFY External network set") - - self.logger.debug("get resolvconf") - ns = ft_utils.get_resolvconf_ns() - if ns: - cfy.set_nameservers(ns) - self.logger.debug("Resolvconf set") - - self.logger.info("Prepare virtualenv for cloudify-cli") - venv_scrit_dir = os.path.join(self.case_dir, "create_venv.sh") - cmd = "bash {} {}".format(venv_scrit_dir, self.data_dir) - ft_utils.execute_command(cmd) - - cfy.download_manager_blueprint( - self.orchestrator['blueprint']['url'], - self.orchestrator['blueprint']['branch']) - - error = cfy.deploy_manager() - if error: - self.logger.error(error) - return {'status': 'FAIL', 'result': error} + self.__logger.info("Attemps running status of the Manager") + cfy_status = None + retry = 10 + while str(cfy_status) != 'running' and retry: + try: + cfy_status = cfy_client.manager.get_status()['status'] + except Exception: # pylint: disable=broad-except + self.__logger.warning("Cloudify Manager isn't " + + "up and running. Retrying ...") + retry = retry - 1 + time.sleep(30) + + if str(cfy_status) == 'running': + self.__logger.info("Cloudify Manager is up and running") else: - return {'status': 'PASS', 'result': ''} + raise Exception("Cloudify Manager isn't up and running") + + self.__logger.info("Put OpenStack creds in manager") + secrets_list = cfy_client.secrets.list() + for k, val in cfy_creds.iteritems(): + if not any(d.get('key', None) == k for d in secrets_list): + cfy_client.secrets.create(k, val) + else: + cfy_client.secrets.update(k, val) + + duration = time.time() - start_time + + self.__logger.info("Put private keypair in manager") + if manager_creator.vm_ssh_active(block=True): + ssh = manager_creator.ssh_client() + scp = SCPClient(ssh.get_transport()) + scp.put(kp_file, '~/') + cmd = "sudo cp ~/cloudify_ims.pem /etc/cloudify/" + ssh.exec_command(cmd) + cmd = "sudo chmod 444 /etc/cloudify/cloudify_ims.pem" + ssh.exec_command(cmd) + cmd = "sudo yum install -y gcc python-devel" + ssh.exec_command(cmd) + + self.details['orchestrator'].update(status='PASS', duration=duration) + + self.vnf['inputs'].update(dict( + external_network_name=ext_net_name, + network_name=network_settings.name + )) + return True def deploy_vnf(self): - cw = Clearwater(self.vnf['inputs'], self.orchestrator['object'], - self.logger) - self.vnf['object'] = cw - - self.logger.info("Collect flavor id for all clearwater vm") - flavor_exist, flavor_id = os_utils.get_or_create_flavor( - "m1.small", - self.vnf['requirements']['ram_min'], + """ + Deploy Clearwater IMS + """ + start_time = time.time() + + self.__logger.info("Upload VNFD") + cfy_client = self.orchestrator['object'] + descriptor = self.vnf['descriptor'] + cfy_client.blueprints.publish_archive(descriptor.get('url'), + descriptor.get('name'), + descriptor.get('file_name')) + + self.__logger.info("Get or create flavor for all clearwater vm") + self.exist_obj['flavor2'], flavor_id = os_utils.get_or_create_flavor( + self.vnf['requirements']['flavor']['name'], + self.vnf['requirements']['flavor']['ram_min'], '30', '1', public=True) - self.logger.debug("Flavor id: %s", flavor_id) - if not flavor_id: - self.logger.info("Available flavors are: ") - self.logger.info(self.nova_client.flavor.list()) - - cw.set_flavor_id(flavor_id) - - # VMs image - if 'os_image' in self.vnf['requirements'].keys(): - image_id = os_utils.get_image_id( - self.glance_client, self.vnf['requirements']['os_image']) - - cw.set_image_id(image_id) - ext_net = os_utils.get_external_net(self.neutron_client) - cw.set_external_network_name(ext_net) - - error = cw.deploy_vnf(self.vnf['blueprint']) - if error: - self.logger.error(error) - return {'status': 'FAIL', 'result': error} + + self.vnf['inputs'].update(dict( + flavor_id=flavor_id, + )) + + self.__logger.info("Create VNF Instance") + cfy_client.deployments.create(descriptor.get('name'), + descriptor.get('name'), + self.vnf.get('inputs')) + + wait_for_execution(cfy_client, + _get_deployment_environment_creation_execution( + cfy_client, descriptor.get('name')), + self.__logger, + timeout=600) + + self.__logger.info("Start the VNF Instance deployment") + execution = cfy_client.executions.start(descriptor.get('name'), + 'install') + # Show execution log + execution = wait_for_execution(cfy_client, execution, self.__logger) + + duration = time.time() - start_time + + self.__logger.info(execution) + if execution.status == 'terminated': + self.details['vnf'].update(status='PASS', duration=duration) + return True else: - return {'status': 'PASS', 'result': ''} + self.details['vnf'].update(status='FAIL', duration=duration) + return False def test_vnf(self): - script = "source {0}venv_cloudify/bin/activate; " - script += "cd {0}; " - script += "cfy status | grep -Eo \"([0-9]{{1,3}}\.){{3}}[0-9]{{1,3}}\"" - cmd = "/bin/bash -c '" + script.format(self.data_dir) + "'" + """ + Run test on clearwater ims instance + """ + start_time = time.time() - try: - self.logger.debug("Trying to get clearwater manager IP ... ") - mgr_ip = os.popen(cmd).read() - mgr_ip = mgr_ip.splitlines()[0] - except Exception: - self.logger.exception("Unable to retrieve the IP of the " - "cloudify manager server !") - - self.logger.info('Cloudify Manager: %s', mgr_ip) - api_url = 'http://{0}/api/v2/deployments/{1}/outputs'.format( - mgr_ip, self.vnf['deployment_name']) - dep_outputs = requests.get(api_url) - self.logger.info(api_url) - outputs = dep_outputs.json()['outputs'] - self.logger.info("Deployment outputs: %s", outputs) + cfy_client = self.orchestrator['object'] + + outputs = cfy_client.deployments.outputs.get( + self.vnf['descriptor'].get('name'))['outputs'] dns_ip = outputs['dns_ip'] ellis_ip = outputs['ellis_ip'] self.config_ellis(ellis_ip) @@ -245,15 +345,52 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase): if dns_ip != "": vims_test_result = self.run_clearwater_live_test( dns_ip=dns_ip, - public_domain="") # self.inputs["public_domain"] - if vims_test_result != '': - return {'status': 'PASS', 'result': vims_test_result} - else: - return {'status': 'FAIL', 'result': ''} + public_domain=self.vnf['inputs']["public_domain"]) + duration = time.time() - start_time + short_result = sig_test_format(vims_test_result) + self.__logger.info(short_result) + self.details['test_vnf'].update(status='PASS', + result=short_result, + full_result=vims_test_result, + duration=duration) + return True + else: + return False def clean(self): - self.vnf['object'].undeploy_vnf() - self.orchestrator['object'].undeploy_manager() + try: + cfy_client = self.orchestrator['object'] + dep_name = self.vnf['descriptor'].get('name') + # kill existing execution + self.__logger.info('Deleting the current deployment') + exec_list = cfy_client.executions.list(dep_name) + for execution in exec_list: + if execution['status'] == "started": + try: + cfy_client.executions.cancel(execution['id'], + force=True) + except: + self.__logger.warn("Can't cancel the current exec") + + execution = cfy_client.executions.start( + dep_name, + 'uninstall', + parameters=dict(ignore_failure=True), + force=True) + + wait_for_execution(cfy_client, execution, self.__logger) + cfy_client.deployments.delete(self.vnf['descriptor'].get('name')) + cfy_client.blueprints.delete(self.vnf['descriptor'].get('name')) + except: + self.__logger.warn("Some issue during the undeployment ..") + self.__logger.warn("Tenant clean continue ..") + + self.__logger.info('Remove the cloudify manager OS object ..') + for creator in reversed(self.created_object): + try: + creator.clean() + except Exception as e: + self.logger.error('Unexpected error cleaning - %s', e) super(CloudifyIms, self).clean() @@ -262,15 +399,15 @@ class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase): # YAML UTILS # # ----------------------------------------------------------- -def get_config(parameter, file): +def get_config(parameter, file_path): """ Returns the value of a given parameter in file.yaml parameter must be given in string format with dots Example: general.openstack.image_name """ - with open(file) as f: - file_yaml = yaml.safe_load(f) - f.close() + with open(file_path) as config_file: + file_yaml = yaml.safe_load(config_file) + config_file.close() value = file_yaml for element in parameter.split("."): value = value.get(element) @@ -278,3 +415,90 @@ def get_config(parameter, file): raise ValueError("The parameter %s is not defined in" " reporting.yaml" % parameter) return value + + +def wait_for_execution(client, execution, logger, timeout=2400, ): + """ + Wait for a workflow execution on Cloudify Manager + """ + # if execution already ended - return without waiting + if execution.status in Execution.END_STATES: + return execution + + if timeout is not None: + deadline = time.time() + timeout + + # Poll for execution status and execution logs, until execution ends + # and we receive an event of type in WORKFLOW_END_TYPES + offset = 0 + batch_size = 50 + event_list = [] + execution_ended = False + while True: + event_list = client.events.list( + execution_id=execution.id, + _offset=offset, + _size=batch_size, + include_logs=False, + sort='@timestamp').items + + offset = offset + len(event_list) + for event in event_list: + logger.debug(event.get('message')) + + if timeout is not None: + if time.time() > deadline: + raise RuntimeError( + 'execution of operation {0} for deployment {1} ' + 'timed out'.format(execution.workflow_id, + execution.deployment_id)) + else: + # update the remaining timeout + timeout = deadline - time.time() + + if not execution_ended: + execution = client.executions.get(execution.id) + execution_ended = execution.status in Execution.END_STATES + + if execution_ended: + break + + time.sleep(5) + + return execution + + +def _get_deployment_environment_creation_execution(client, deployment_id): + """ + Get the execution id of a env preparation + + network, security group, fip, VM creation + """ + executions = client.executions.list(deployment_id=deployment_id) + for execution in executions: + if execution.workflow_id == 'create_deployment_environment': + return execution + raise RuntimeError('Failed to get create_deployment_environment ' + 'workflow execution.' + 'Available executions: {0}'.format(executions)) + + +def sig_test_format(sig_test): + """ + Process the signaling result to have a short result + """ + nb_passed = 0 + nb_failures = 0 + nb_skipped = 0 + for data_test in sig_test: + if data_test['result'] == "Passed": + nb_passed += 1 + elif data_test['result'] == "Failed": + nb_failures += 1 + elif data_test['result'] == "Skipped": + nb_skipped += 1 + total_sig_test_result = {} + total_sig_test_result['passed'] = nb_passed + total_sig_test_result['failures'] = nb_failures + total_sig_test_result['skipped'] = nb_skipped + return total_sig_test_result diff --git a/functest/opnfv_tests/vnf/ims/cloudify_ims.yaml b/functest/opnfv_tests/vnf/ims/cloudify_ims.yaml index 74b9e9580..f1028ce73 100644 --- a/functest/opnfv_tests/vnf/ims/cloudify_ims.yaml +++ b/functest/opnfv_tests/vnf/ims/cloudify_ims.yaml @@ -1,39 +1,40 @@ tenant_images: ubuntu_14.04: http://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img - centos_7: http://cloud.centos.org/centos/7/images/CentOS-7-x86_64-GenericCloud-1510.qcow2 -cloudify: - blueprint: - url: https://github.com/boucherv-orange/cloudify-manager-blueprints.git - branch: '3.3.1-build' + cloudify_manager_4.0: http://repository.cloudifysource.org/cloudify/4.0.1/sp-release/cloudify-manager-premium-4.0.1.qcow2 +orchestrator: + name: cloudify + version: '4.0' requirements: - ram_min: 4096 - os_image: centos_7 - inputs: - keystone_username: "" - keystone_password: "" - keystone_tenant_name: "" - keystone_url: "" - manager_public_key_name: 'manager-kp' - agent_public_key_name: 'agent-kp' - image_id: "" - flavor_id: "3" - external_network_name: "" - ssh_user: centos - agents_user: ubuntu -clearwater: - blueprint: + flavor: + name: m1.medium + ram_min: 4096 + os_image: 'cloudify_manager_4.0' +vnf: + name: clearwater + version: '107' + descriptor: file_name: openstack-blueprint.yaml name: clearwater-opnfv - destination_folder: opnfv-cloudify-clearwater - url: https://github.com/Orange-OpenSource/opnfv-cloudify-clearwater.git - branch: stable - deployment_name: clearwater-opnfv + url: https://github.com/Orange-OpenSource/opnfv-cloudify-clearwater/archive/master.zip + version: '122' requirements: - ram_min: 2048 - os_image: ubuntu_14.04 + flavor: + name: m1.medium + ram_min: 2048 inputs: - image_id: '' - flavor_id: '' + image_id: 'ubuntu_14.04' + flavor_id: 'm1.small' agent_user: ubuntu + key_pair_name: cloudify_ims_kp + private_key_path: '/etc/cloudify/cloudify_ims.pem' external_network_name: '' public_domain: clearwater.opnfv + release: repo122 + bono_cluster_size: 1 + sprout_cluster_size: 1 + vellum_cluster_size: 1 + dime_cluster_size: 1 + homer_cluster_size: 1 +vnf_test_suite: + name: clearwater-live-test + version: "1.0" diff --git a/functest/opnfv_tests/vnf/ims/create_venv.sh b/functest/opnfv_tests/vnf/ims/create_venv.sh deleted file mode 100755 index 575fd177c..000000000 --- a/functest/opnfv_tests/vnf/ims/create_venv.sh +++ /dev/null @@ -1,44 +0,0 @@ -#!/bin/bash -e - -# Script checks that venv exists. If it doesn't it will be created -# It requires python2.7 and virtualenv packages installed -# -# Copyright (c) 2015 Orange -# valentin.boucher@orange.com -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -BASEDIR=`dirname $0` -VENV_PATH=$1 -VENV_NAME="venv_cloudify" -function venv_install() { - if command -v virtualenv-2.7; then - virtualenv-2.7 $1 - elif command -v virtualenv2; then - virtualenv2 $1 - elif command -v virtualenv; then - virtualenv $1 - else - echo Cannot find virtualenv command. - return 1 - fi -} - -# exit when something goes wrong during venv install -set -e -if [ ! -d "$VENV_PATH/$VENV_NAME" ]; then - venv_install $VENV_PATH/$VENV_NAME - echo "Virtualenv" + $VENV_NAME + "created." -fi - -if [ ! -f "$VENV_PATH/$VENV_NAME/updated" -o $BASEDIR/requirements.pip -nt $VENV_PATH/$VENV_NAME/updated ]; then - source $VENV_PATH/$VENV_NAME/bin/activate - pip install -r $BASEDIR/requirements.pip - touch $VENV_PATH/$VENV_NAME/updated - echo "Requirements installed." - deactivate -fi -set +e diff --git a/functest/opnfv_tests/vnf/ims/orchestrator_cloudify.py b/functest/opnfv_tests/vnf/ims/orchestrator_cloudify.py deleted file mode 100644 index 0cdfcb3f1..000000000 --- a/functest/opnfv_tests/vnf/ims/orchestrator_cloudify.py +++ /dev/null @@ -1,232 +0,0 @@ -#!/usr/bin/env python -# coding: utf8 -####################################################################### -# -# Copyright (c) 2015 Orange -# valentin.boucher@orange.com -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -######################################################################## - -import logging -import os -import shutil -import subprocess32 as subprocess -import yaml - -from git import Repo - - -class Orchestrator(object): - - def __init__(self, testcase_dir, inputs={}): - self.testcase_dir = testcase_dir - self.blueprint_dir = testcase_dir + 'cloudify-manager-blueprint/' - self.input_file = 'inputs.yaml' - self.manager_blueprint = False - self.config = inputs - self.logger = logging.getLogger(__name__) - self.manager_up = False - - def set_credentials(self, username, password, tenant_name, auth_url): - self.config['keystone_username'] = username - self.config['keystone_password'] = password - self.config['keystone_url'] = auth_url - self.config['keystone_tenant_name'] = tenant_name - - def set_flavor_id(self, flavor_id): - self.config['flavor_id'] = flavor_id - - def set_image_id(self, image_id): - self.config['image_id'] = image_id - - def set_external_network_name(self, external_network_name): - self.config['external_network_name'] = external_network_name - - def set_ssh_user(self, ssh_user): - self.config['ssh_user'] = ssh_user - - def set_nova_url(self, nova_url): - self.config['nova_url'] = nova_url - - def set_neutron_url(self, neutron_url): - self.config['neutron_url'] = neutron_url - - def set_nameservers(self, nameservers): - if 0 < len(nameservers): - self.config['dns_subnet_1'] = nameservers[0] - - def download_manager_blueprint(self, manager_blueprint_url, - manager_blueprint_branch): - if self.manager_blueprint: - self.logger.info( - "cloudify manager server blueprint is " - "already downloaded !") - else: - self.logger.info( - "Downloading the cloudify manager server blueprint") - download_result = self._download_blueprints( - manager_blueprint_url, - manager_blueprint_branch, - self.blueprint_dir) - - if not download_result: - self.logger.error("Failed to download manager blueprint") - exit(-1) - else: - self.manager_blueprint = True - - def manager_up(self): - return self.manager_up - - def deploy_manager(self): - if self.manager_blueprint: - self.logger.info("Writing the inputs file") - with open(self.blueprint_dir + "inputs.yaml", "w") as f: - f.write(yaml.dump(self.config, default_style='"')) - f.close() - - # Ensure no ssh key file already exists - key_files = ["/.ssh/cloudify-manager-kp.pem", - "/.ssh/cloudify-agent-kp.pem"] - home = os.path.expanduser("~") - - for key_file in key_files: - if os.path.isfile(home + key_file): - os.remove(home + key_file) - - self.logger.info("Launching the cloudify-manager deployment") - script = "set -e; " - script += ("source " + self.testcase_dir + - "venv_cloudify/bin/activate; ") - script += "cd " + self.testcase_dir + "; " - script += "cfy init -r; " - script += "cd cloudify-manager-blueprint; " - script += ("cfy local create-requirements -o requirements.txt " + - "-p openstack-manager-blueprint.yaml; ") - script += "pip install -r requirements.txt; " - script += ("cfy bootstrap --install-plugins " + - "-p openstack-manager-blueprint.yaml -i inputs.yaml; ") - cmd = "/bin/bash -c '" + script + "'" - error = execute_command(cmd, self.logger) - if error: - self.logger.error("Failed to deploy cloudify-manager") - return error - - self.logger.info("Cloudify-manager server is UP !") - - self.manager_up = True - - def undeploy_manager(self): - self.logger.info("Launching the cloudify-manager undeployment") - - self.manager_up = False - - script = "source " + self.testcase_dir + "venv_cloudify/bin/activate; " - script += "cd " + self.testcase_dir + "; " - script += "cfy teardown -f --ignore-deployments; " - cmd = "/bin/bash -c '" + script + "'" - execute_command(cmd, self.logger) - - self.logger.info( - "Cloudify-manager server has been successfully removed!") - - def download_upload_and_deploy_blueprint(self, blueprint, config, - bp_name, dep_name): - self.logger.info("Downloading the {0} blueprint".format( - blueprint['file_name'])) - destination_folder = os.path.join(self.testcase_dir, - blueprint['destination_folder']) - download_result = self._download_blueprints(blueprint['url'], - blueprint['branch'], - destination_folder) - - if not download_result: - self.logger.error( - "Failed to download blueprint {0}". - format(blueprint['file_name'])) - exit(-1) - - self.logger.info("Writing the inputs file") - - with open(self.testcase_dir + blueprint['destination_folder'] + - "/inputs.yaml", "w") as f: - f.write(yaml.dump(config, default_style='"')) - f.close() - - self.logger.info("Launching the {0} deployment".format(bp_name)) - script = "source " + self.testcase_dir + "venv_cloudify/bin/activate; " - script += ("cd " + self.testcase_dir + - blueprint['destination_folder'] + "; ") - script += ("cfy blueprints upload -b " + - bp_name + " -p openstack-blueprint.yaml; ") - script += ("cfy deployments create -b " + bp_name + - " -d " + dep_name + " --inputs inputs.yaml; ") - script += ("cfy executions start -w install -d " + - dep_name + " --timeout 1800; ") - - cmd = "/bin/bash -c '" + script + "'" - error = execute_command(cmd, self.logger, 2000) - if error: - self.logger.error("Failed to deploy blueprint") - return error - self.logger.info("The deployment of {0} is ended".format(dep_name)) - - def undeploy_deployment(self, dep_name): - self.logger.info("Launching the {0} undeployment".format(dep_name)) - script = "source " + self.testcase_dir + "venv_cloudify/bin/activate; " - script += "cd " + self.testcase_dir + "; " - script += ("cfy executions start -w uninstall -d " + dep_name + - " --timeout 1800 ; ") - script += "cfy deployments delete -d " + dep_name + "; " - - cmd = "/bin/bash -c '" + script + "'" - try: - execute_command(cmd, self.logger) - except: - self.logger.error("Clearwater undeployment failed") - - def _download_blueprints(self, blueprint_url, branch, dest_path): - if os.path.exists(dest_path): - shutil.rmtree(dest_path) - try: - Repo.clone_from(blueprint_url, dest_path, branch=branch) - return True - except: - return False - - -def execute_command(cmd, logger, timeout=1800): - """ - Execute Linux command - """ - if logger: - logger.debug('Executing command : {}'.format(cmd)) - timeout_exception = False - output_file = "output.txt" - f = open(output_file, 'w+') - try: - p = subprocess.call(cmd, shell=True, stdout=f, - stderr=subprocess.STDOUT, timeout=timeout) - except subprocess.TimeoutExpired: - timeout_exception = True - if logger: - logger.error("TIMEOUT when executing command %s" % cmd) - pass - - f.close() - f = open(output_file, 'r') - result = f.read() - if result != "" and logger: - logger.debug(result) - if p == 0: - return False - else: - if logger and not timeout_exception: - logger.error("Error when executing command %s" % cmd) - f = open(output_file, 'r') - lines = f.readlines() - return lines[-5:] diff --git a/functest/opnfv_tests/vnf/ims/requirements.pip b/functest/opnfv_tests/vnf/ims/requirements.pip deleted file mode 100644 index ab26f6e02..000000000 --- a/functest/opnfv_tests/vnf/ims/requirements.pip +++ /dev/null @@ -1 +0,0 @@ -cloudify==3.3.1 \ No newline at end of file diff --git a/functest/tests/unit/core/test_vnf.py b/functest/tests/unit/core/test_vnf.py index f061c4096..403c452fb 100644 --- a/functest/tests/unit/core/test_vnf.py +++ b/functest/tests/unit/core/test_vnf.py @@ -1,210 +1,194 @@ -#!/usr/bin/env python - -# Copyright (c) 2016 Orange and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -# pylint: disable=missing-docstring - -import logging -import unittest - -import mock - -from functest.core import vnf -from functest.core import testcase -from functest.utils import constants - - -class VnfBaseTesting(unittest.TestCase): - """The class testing VNF.""" - # pylint: disable=missing-docstring,too-many-public-methods - - tenant_name = 'test_tenant_name' - tenant_description = 'description' - - def setUp(self): - constants.CONST.__setattr__("vnf_foo_tenant_name", self.tenant_name) - constants.CONST.__setattr__( - "vnf_foo_tenant_description", self.tenant_description) - self.test = vnf.VnfOnBoarding(project='functest', case_name='foo') - - def test_run_deploy_vnf_exc(self): - with mock.patch.object(self.test, 'prepare'),\ - mock.patch.object(self.test, 'deploy_orchestrator', - return_value=None), \ - mock.patch.object(self.test, 'deploy_vnf', - side_effect=Exception): - self.assertEqual(self.test.run(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_run_test_vnf_exc(self): - with mock.patch.object(self.test, 'prepare'),\ - mock.patch.object(self.test, 'deploy_orchestrator', - return_value=None), \ - mock.patch.object(self.test, 'deploy_vnf'), \ - mock.patch.object(self.test, 'test_vnf', - side_effect=Exception): - self.assertEqual(self.test.run(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_run_deploy_orch_ko(self): - with mock.patch.object(self.test, 'prepare'),\ - mock.patch.object(self.test, 'deploy_orchestrator', - return_value=False), \ - mock.patch.object(self.test, 'deploy_vnf', - return_value=True), \ - mock.patch.object(self.test, 'test_vnf', - return_value=True): - self.assertEqual(self.test.run(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_run_vnf_deploy_ko(self): - with mock.patch.object(self.test, 'prepare'),\ - mock.patch.object(self.test, 'deploy_orchestrator', - return_value=True), \ - mock.patch.object(self.test, 'deploy_vnf', - return_value=False), \ - mock.patch.object(self.test, 'test_vnf', - return_value=True): - self.assertEqual(self.test.run(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_run_vnf_test_ko(self): - with mock.patch.object(self.test, 'prepare'),\ - mock.patch.object(self.test, 'deploy_orchestrator', - return_value=True), \ - mock.patch.object(self.test, 'deploy_vnf', - return_value=True), \ - mock.patch.object(self.test, 'test_vnf', - return_value=False): - self.assertEqual(self.test.run(), - testcase.TestCase.EX_TESTCASE_FAILED) - - def test_run_default(self): - with mock.patch.object(self.test, 'prepare'),\ - mock.patch.object(self.test, 'deploy_orchestrator', - return_value=True), \ - mock.patch.object(self.test, 'deploy_vnf', - return_value=True), \ - mock.patch.object(self.test, 'test_vnf', - return_value=True): - self.assertEqual(self.test.run(), testcase.TestCase.EX_OK) - - def test_deploy_vnf_unimplemented(self): - with self.assertRaises(vnf.VnfDeploymentException): - self.test.deploy_vnf() - - def test_test_vnf_unimplemented(self): - with self.assertRaises(vnf.VnfTestException): - self.test.test_vnf() - - @mock.patch('functest.core.vnf.os_utils.get_keystone_client') - @mock.patch('functest.core.vnf.os_utils.delete_user', - return_value=True) - def test_clean_user_set(self, *args): - self.test.user_created = True - self.test.clean() - args[0].assert_called_once_with(mock.ANY, self.tenant_name) - args[1].assert_called_once_with() - - @mock.patch('functest.core.vnf.os_utils.get_keystone_client') - @mock.patch('functest.core.vnf.os_utils.delete_user', - return_value=True) - def test_clean_user_unset(self, *args): - self.test.user_created = False - self.test.clean() - args[0].assert_not_called() - args[1].assert_called_once_with() - - @mock.patch('functest.core.vnf.os_utils.get_keystone_client') - @mock.patch('functest.core.vnf.os_utils.delete_tenant', - return_value=True) - def test_clean_tenant_set(self, *args): - self.test.tenant_created = True - self.test.clean() - args[0].assert_called_once_with(mock.ANY, self.tenant_name) - args[1].assert_called_once_with() - - @mock.patch('functest.core.vnf.os_utils.get_keystone_client') - @mock.patch('functest.core.vnf.os_utils.delete_tenant', - return_value=True) - def test_clean_tenant_unset(self, *args): - self.test.tenant_created = False - self.test.clean() - args[0].assert_not_called() - args[1].assert_called_once_with() - - def test_deploy_orch_unimplemented(self): - self.assertTrue(self.test.deploy_orchestrator()) - - @mock.patch('functest.core.vnf.os_utils.get_credentials', - return_value={'creds': 'test'}) - @mock.patch('functest.core.vnf.os_utils.get_keystone_client', - return_value='test') - @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf', - return_value=0) - @mock.patch('functest.core.vnf.os_utils.get_or_create_user_for_vnf', - return_value=0) - def test_prepare(self, *args): - self.assertEqual(self.test.prepare(), - testcase.TestCase.EX_OK) - args[0].assert_called_once_with('test', self.tenant_name) - args[1].assert_called_once_with( - 'test', self.tenant_name, self.tenant_description) - args[2].assert_called_once_with() - args[3].assert_called_once_with() - - @mock.patch('functest.core.vnf.os_utils.get_credentials', - side_effect=Exception) - def test_prepare_admin_creds_ko(self, *args): - with self.assertRaises(vnf.VnfPreparationException): - self.test.prepare() - args[0].assert_called_once_with() - - @mock.patch('functest.core.vnf.os_utils.get_credentials', - return_value='creds') - @mock.patch('functest.core.vnf.os_utils.get_keystone_client', - side_effect=Exception) - def test_prepare_keystone_client_ko(self, *args): - with self.assertRaises(vnf.VnfPreparationException): - self.test.prepare() - args[0].assert_called_once_with() - args[1].assert_called_once_with() - - @mock.patch('functest.core.vnf.os_utils.get_credentials', - return_value='creds') - @mock.patch('functest.core.vnf.os_utils.get_keystone_client') - @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf', - side_effect=Exception) - def test_prepare_tenant_creation_ko(self, *args): - with self.assertRaises(vnf.VnfPreparationException): - self.test.prepare() - args[0].assert_called_once_with( - mock.ANY, self.tenant_name, self.tenant_description) - args[1].assert_called_once_with() - args[2].assert_called_once_with() - - @mock.patch('functest.core.vnf.os_utils.get_credentials', - return_value='creds') - @mock.patch('functest.core.vnf.os_utils.get_keystone_client') - @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf', - return_value=0) - @mock.patch('functest.core.vnf.os_utils.get_or_create_user_for_vnf', - side_effect=Exception) - def test_prepare_user_creation_ko(self, *args): - with self.assertRaises(vnf.VnfPreparationException): - self.test.prepare() - args[0].assert_called_once_with(mock.ANY, self.tenant_name) - args[1].assert_called_once_with( - mock.ANY, self.tenant_name, self.tenant_description) - args[2].assert_called_once_with() - args[3].assert_called_once_with() - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) +#!/usr/bin/env python + +# Copyright (c) 2016 Orange and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +# pylint: disable=missing-docstring + +import logging +import unittest + +import mock + +from functest.core import vnf +from functest.core import testcase +from functest.utils import constants + + +class VnfBaseTesting(unittest.TestCase): + """The class testing VNF.""" + # pylint: disable=missing-docstring,too-many-public-methods + + tenant_name = 'test_tenant_name' + tenant_description = 'description' + + def setUp(self): + constants.CONST.__setattr__("vnf_foo_tenant_name", self.tenant_name) + constants.CONST.__setattr__( + "vnf_foo_tenant_description", self.tenant_description) + self.test = vnf.VnfOnBoarding(project='functest', case_name='foo') + + def test_run_deploy_vnf_exc(self): + with mock.patch.object(self.test, 'prepare'),\ + mock.patch.object(self.test, 'deploy_orchestrator', + return_value=None), \ + mock.patch.object(self.test, 'deploy_vnf', + side_effect=Exception): + self.assertEqual(self.test.run(), + testcase.TestCase.EX_TESTCASE_FAILED) + + def test_run_test_vnf_exc(self): + with mock.patch.object(self.test, 'prepare'),\ + mock.patch.object(self.test, 'deploy_orchestrator', + return_value=None), \ + mock.patch.object(self.test, 'deploy_vnf'), \ + mock.patch.object(self.test, 'test_vnf', + side_effect=Exception): + self.assertEqual(self.test.run(), + testcase.TestCase.EX_TESTCASE_FAILED) + + def test_run_deploy_orch_ko(self): + with mock.patch.object(self.test, 'prepare'),\ + mock.patch.object(self.test, 'deploy_orchestrator', + return_value=False), \ + mock.patch.object(self.test, 'deploy_vnf', + return_value=True), \ + mock.patch.object(self.test, 'test_vnf', + return_value=True): + self.assertEqual(self.test.run(), + testcase.TestCase.EX_TESTCASE_FAILED) + + def test_run_vnf_deploy_ko(self): + with mock.patch.object(self.test, 'prepare'),\ + mock.patch.object(self.test, 'deploy_orchestrator', + return_value=True), \ + mock.patch.object(self.test, 'deploy_vnf', + return_value=False), \ + mock.patch.object(self.test, 'test_vnf', + return_value=True): + self.assertEqual(self.test.run(), + testcase.TestCase.EX_TESTCASE_FAILED) + + def test_run_vnf_test_ko(self): + with mock.patch.object(self.test, 'prepare'),\ + mock.patch.object(self.test, 'deploy_orchestrator', + return_value=True), \ + mock.patch.object(self.test, 'deploy_vnf', + return_value=True), \ + mock.patch.object(self.test, 'test_vnf', + return_value=False): + self.assertEqual(self.test.run(), + testcase.TestCase.EX_TESTCASE_FAILED) + + def test_run_default(self): + with mock.patch.object(self.test, 'prepare'),\ + mock.patch.object(self.test, 'deploy_orchestrator', + return_value=True), \ + mock.patch.object(self.test, 'deploy_vnf', + return_value=True), \ + mock.patch.object(self.test, 'test_vnf', + return_value=True): + self.assertEqual(self.test.run(), testcase.TestCase.EX_OK) + + def test_deploy_vnf_unimplemented(self): + with self.assertRaises(vnf.VnfDeploymentException): + self.test.deploy_vnf() + + def test_test_vnf_unimplemented(self): + with self.assertRaises(vnf.VnfTestException): + self.test.test_vnf() + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client') + @mock.patch('functest.core.vnf.os_utils.delete_user', + return_value=True) + def test_clean_user_already_exist(self, *args): + self.test.exist_obj['user'] = True + self.test.clean() + args[0].assert_not_called() + args[1].assert_called_once_with() + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client') + @mock.patch('functest.core.vnf.os_utils.delete_user', + return_value=True) + def test_clean_user_created(self, *args): + self.test.exist_obj['user'] = False + self.test.clean() + args[0].assert_called_once_with(mock.ANY, self.tenant_name) + args[1].assert_called_once_with() + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client') + @mock.patch('functest.core.vnf.os_utils.delete_tenant', + return_value=True) + def test_clean_tenant_already_exist(self, *args): + self.test.exist_obj['tenant'] = True + self.test.clean() + args[0].assert_not_called() + args[1].assert_called_once_with() + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client') + @mock.patch('functest.core.vnf.os_utils.delete_tenant', + return_value=True) + def test_clean_tenant_created(self, *args): + self.test.exist_obj['tenant'] = False + self.test.clean() + args[0].assert_called_once_with(mock.ANY, self.tenant_name) + args[1].assert_called_once_with() + + def test_deploy_orch_unimplemented(self): + self.assertTrue(self.test.deploy_orchestrator()) + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client', + return_value='test') + @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf', + return_value=True) + @mock.patch('functest.core.vnf.os_utils.get_or_create_user_for_vnf', + return_value=True) + @mock.patch('functest.core.vnf.os_utils.get_credentials', + return_value={'auth_url': 'test'}) + def test_prepare(self, *args): + self.assertEqual(self.test.prepare(), + testcase.TestCase.EX_OK) + args[0].assert_called_once_with() + args[1].assert_called_once_with('test', self.tenant_name) + args[2].assert_called_once_with( + 'test', self.tenant_name, self.tenant_description) + args[3].assert_called_once_with() + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client', + side_effect=Exception) + def test_prepare_keystone_client_ko(self, *args): + with self.assertRaises(vnf.VnfPreparationException): + self.test.prepare() + args[0].assert_called_once_with() + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client') + @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf', + side_effect=Exception) + def test_prepare_tenant_creation_ko(self, *args): + with self.assertRaises(vnf.VnfPreparationException): + self.test.prepare() + args[0].assert_called_once_with( + mock.ANY, self.tenant_name, self.tenant_description) + args[1].assert_called_once_with() + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client') + @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf', + return_value=0) + @mock.patch('functest.core.vnf.os_utils.get_or_create_user_for_vnf', + side_effect=Exception) + def test_prepare_user_creation_ko(self, *args): + with self.assertRaises(vnf.VnfPreparationException): + self.test.prepare() + args[0].assert_called_once_with(mock.ANY, self.tenant_name) + args[1].assert_called_once_with( + mock.ANY, self.tenant_name, self.tenant_description) + args[2].assert_called_once_with() + + +if __name__ == "__main__": + logging.disable(logging.CRITICAL) + unittest.main(verbosity=2) diff --git a/functest/tests/unit/vnf/ims/test_clearwater.py b/functest/tests/unit/vnf/ims/test_clearwater.py deleted file mode 100644 index bc31c33e0..000000000 --- a/functest/tests/unit/vnf/ims/test_clearwater.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python - -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -import logging -import unittest - -import mock - -from functest.opnfv_tests.vnf.ims import clearwater -from functest.opnfv_tests.vnf.ims import orchestrator_cloudify - - -class ClearwaterTesting(unittest.TestCase): - - def setUp(self): - self.clearwater = clearwater.Clearwater() - self.orchestrator = orchestrator_cloudify.Orchestrator('test_dir') - self.clearwater.orchestrator = self.orchestrator - self.clearwater.dep_name = 'test_dep_name' - self.bp = {'file_name': 'test_file', - 'destination_folder': 'test_folder', - 'url': 'test_url', - 'branch': 'test_branch'} - - def test_deploy_vnf_blueprint_download_failed(self): - with mock.patch.object(self.clearwater.orchestrator, - 'download_upload_and_deploy_blueprint', - return_value='error'): - self.assertEqual(self.clearwater.deploy_vnf(self.bp), - 'error') - - def test_deploy_vnf_blueprint_download_passed(self): - with mock.patch.object(self.clearwater.orchestrator, - 'download_upload_and_deploy_blueprint', - return_value=''): - self.clearwater.deploy_vnf(self.bp) - self.assertEqual(self.clearwater.deploy, True) - - def test_undeploy_vnf_deployment_passed(self): - with mock.patch.object(self.clearwater.orchestrator, - 'undeploy_deployment'): - self.clearwater.deploy = True - self.clearwater.undeploy_vnf() - self.assertEqual(self.clearwater.deploy, False) - - def test_undeploy_vnf_deployment_with_undeploy(self): - with mock.patch.object(self.clearwater.orchestrator, - 'undeploy_deployment') as m: - self.clearwater.deploy = False - self.clearwater.undeploy_vnf(), - self.assertEqual(self.clearwater.deploy, False) - self.assertFalse(m.called) - - self.clearwater.orchestrator = None - self.clearwater.deploy = True - self.clearwater.undeploy_vnf(), - self.assertEqual(self.clearwater.deploy, True) - - self.clearwater.deploy = False - self.clearwater.undeploy_vnf(), - self.assertEqual(self.clearwater.deploy, False) - - def test_set_methods(self): - self.clearwater.set_orchestrator(self.orchestrator) - self.assertTrue(self.clearwater.orchestrator, self.orchestrator) - self.clearwater.set_flavor_id('test_flavor_id') - self.assertTrue(self.clearwater.config['flavor_id'], 'test_flavor_id') - self.clearwater.set_image_id('test_image_id') - self.assertTrue(self.clearwater.config['image_id'], 'test_image_id') - self.clearwater.set_agent_user('test_user') - self.assertTrue(self.clearwater.config['agent_user'], 'test_user') - self.clearwater.set_external_network_name('test_network') - self.assertTrue(self.clearwater.config['external_network_name'], - 'test_network') - self.clearwater.set_public_domain('test_domain') - self.assertTrue(self.clearwater.config['public_domain'], - 'test_domain') - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/tests/unit/vnf/ims/test_cloudify_ims.py b/functest/tests/unit/vnf/ims/test_cloudify_ims.py index 2156a122f..3a135d18e 100644 --- a/functest/tests/unit/vnf/ims/test_cloudify_ims.py +++ b/functest/tests/unit/vnf/ims/test_cloudify_ims.py @@ -1,464 +1,166 @@ -#!/usr/bin/env python - -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -import logging -import unittest - -import mock - -from functest.opnfv_tests.vnf.ims import cloudify_ims - - -class CloudifyImsTesting(unittest.TestCase): - - def setUp(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os.makedirs'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'get_config', return_value='config_value'): - self.ims_vnf = cloudify_ims.CloudifyIms() - self.neutron_client = mock.Mock() - self.glance_client = mock.Mock() - self.keystone_client = mock.Mock() - self.nova_client = mock.Mock() - self.orchestrator = {'requirements': {'ram_min': 2, - 'os_image': 'test_os_image'}, - 'blueprint': {'url': 'test_url', - 'branch': 'test_branch'}, - 'inputs': {'public_domain': 'test_domain'}, - 'object': 'test_object', - 'deployment_name': 'test_deployment_name'} - self.ims_vnf.orchestrator = self.orchestrator - self.ims_vnf.images = {'test_image': 'test_url'} - self.ims_vnf.vnf = self.orchestrator - self.ims_vnf.tenant_name = 'test_tenant' - self.ims_vnf.inputs = {'public_domain': 'test_domain'} - self.ims_vnf.glance_client = self.glance_client - self.ims_vnf.neutron_client = self.neutron_client - self.ims_vnf.keystone_client = self.keystone_client - self.ims_vnf.nova_client = self.nova_client - self.ims_vnf.admin_creds = 'test_creds' - - self.mock_post = mock.Mock() - attrs = {'status_code': 201, - 'cookies': ""} - self.mock_post.configure_mock(**attrs) - - self.mock_post_200 = mock.Mock() - attrs = {'status_code': 200, - 'cookies': ""} - self.mock_post_200.configure_mock(**attrs) - - def test_deploy_orchestrator_missing_image(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_neutron_client', - return_value=self.neutron_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_glance_client', - return_value=self.glance_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_keystone_client', - return_value=self.keystone_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_nova_client', - return_value=self.nova_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_image_id', - return_value=''), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.os_utils.' - 'download_and_add_image_on_glance') as m, \ - self.assertRaises(Exception) as context: - self.ims_vnf.deploy_orchestrator() - self.assertTrue(m.called) - msg = "Failed to find or upload required OS " - msg += "image for this deployment" - self.assertTrue(msg in context.exception) - - def test_deploy_orchestrator_extend_quota_fail(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_neutron_client', - return_value=self.neutron_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_glance_client', - return_value=self.glance_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_keystone_client', - return_value=self.keystone_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_nova_client', - return_value=self.nova_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_image_id', - return_value='image_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_tenant_id', - return_value='tenant_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.update_sg_quota', - return_value=False), \ - self.assertRaises(Exception) as context: - self.ims_vnf.deploy_orchestrator() - msg = "Failed to update security group quota" - msg += " for tenant test_tenant" - self.assertTrue(msg in context.exception) - - def _get_image_id(self, client, name): - if name == 'test_image': - return 'image_id' - else: - return '' - - def test_deploy_orchestrator_missing_flavor(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_neutron_client', - return_value=self.neutron_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_glance_client', - return_value=self.glance_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_keystone_client', - return_value=self.keystone_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_nova_client', - return_value=self.nova_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_image_id', - side_effect=self._get_image_id), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_tenant_id', - return_value='tenant_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.update_sg_quota', - return_value=True), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_endpoint', - return_value='public_auth_url'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'Orchestrator', return_value=mock.Mock()) as m, \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_or_create_flavor', - return_value=(False, '')), \ - self.assertRaises(Exception) as context: - self.ims_vnf.deploy_orchestrator() - self.assertTrue(m.set_credentials.called) - msg = "Failed to find required flavorfor this deployment" - self.assertTrue(msg in context.exception) - - def test_deploy_orchestrator_missing_os_image(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_neutron_client', - return_value=self.neutron_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_glance_client', - return_value=self.glance_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_keystone_client', - return_value=self.keystone_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_nova_client', - return_value=self.nova_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_image_id', - side_effect=self._get_image_id), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_tenant_id', - return_value='tenant_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.update_sg_quota', - return_value=True), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_endpoint', - return_value='public_auth_url'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'Orchestrator', return_value=mock.Mock()) as m, \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_or_create_flavor', - return_value=(True, 'flavor_id')), \ - self.assertRaises(Exception) as context: - self.ims_vnf.deploy_orchestrator() - self.assertTrue(m.set_credentials.called) - self.assertTrue(m.set_flavor_id.called) - msg = "Failed to find required OS image for cloudify manager" - self.assertTrue(msg in context.exception) - - def test_deploy_orchestrator_get_ext_network_fail(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_neutron_client', - return_value=self.neutron_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_glance_client', - return_value=self.glance_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_keystone_client', - return_value=self.keystone_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_nova_client', - return_value=self.nova_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_image_id', - return_value='image_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_tenant_id', - return_value='tenant_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.update_sg_quota', - return_value=True), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_endpoint', - return_value='public_auth_url'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'Orchestrator', return_value=mock.Mock()) as m, \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_or_create_flavor', - return_value=(True, 'flavor_id')), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_external_net', - return_value=''), \ - self.assertRaises(Exception) as context: - self.ims_vnf.deploy_orchestrator() - self.assertTrue(m.set_credentials.called) - self.assertTrue(m.set_flavor_id.called) - self.assertTrue(m.set_image_id.called) - msg = "Failed to get external network" - self.assertTrue(msg in context.exception) - - def test_deploy_orchestrator_with_error(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_neutron_client', - return_value=self.neutron_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_glance_client', - return_value=self.glance_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_keystone_client', - return_value=self.keystone_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_nova_client', - return_value=self.nova_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_image_id', - return_value='image_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_tenant_id', - return_value='tenant_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.update_sg_quota', - return_value=True), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_endpoint', - return_value='public_auth_url'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'Orchestrator') as m, \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_or_create_flavor', - return_value=(True, 'flavor_id')), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_external_net', - return_value='ext_net'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'ft_utils.get_resolvconf_ns', - return_value=True), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'ft_utils.execute_command'): - mock_obj = mock.Mock() - attrs = {'deploy_manager.return_value': 'error'} - mock_obj.configure_mock(**attrs) - - m.return_value = mock_obj - - self.assertEqual(self.ims_vnf.deploy_orchestrator(), - {'status': 'FAIL', 'result': 'error'}) - - def test_deploy_orchestrator_default(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_neutron_client', - return_value=self.neutron_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_glance_client', - return_value=self.glance_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_keystone_client', - return_value=self.keystone_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_nova_client', - return_value=self.nova_client), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_image_id', - return_value='image_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_tenant_id', - return_value='tenant_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.update_sg_quota', - return_value=True), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_endpoint', - return_value='public_auth_url'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'Orchestrator') as m, \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_or_create_flavor', - return_value=(True, 'flavor_id')), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_external_net', - return_value='ext_net'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'ft_utils.get_resolvconf_ns', - return_value=True), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'ft_utils.execute_command'): - mock_obj = mock.Mock() - attrs = {'deploy_manager.return_value': ''} - mock_obj.configure_mock(**attrs) - - m.return_value = mock_obj - - self.assertEqual(self.ims_vnf.deploy_orchestrator(), - {'status': 'PASS', 'result': ''}) - - def test_deploy_vnf_missing_flavor(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'Clearwater', return_value=mock.Mock()), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_or_create_flavor', - return_value=(False, '')), \ - self.assertRaises(Exception) as context: - self.ims_vnf.deploy_vnf() - msg = "Failed to find required flavor for this deployment" - self.assertTrue(msg in context.exception) - - def test_deploy_vnf_missing_os_image(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'Clearwater', return_value=mock.Mock()) as m, \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_or_create_flavor', - return_value=(True, 'test_flavor')), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_image_id', - return_value=''), \ - self.assertRaises(Exception) as context: - self.ims_vnf.deploy_vnf() - msg = "Failed to find required OS image" - msg += " for clearwater VMs" - self.assertTrue(msg in context.exception) - self.assertTrue(m.set_flavor_id.called) - - def test_deploy_vnf_missing_get_ext_net(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'Clearwater', return_value=mock.Mock()) as m, \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_or_create_flavor', - return_value=(True, 'test_flavor')), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_image_id', - return_value='image_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_external_net', - return_value=''), \ - self.assertRaises(Exception) as context: - self.ims_vnf.deploy_vnf() - msg = "Failed to get external network" - self.assertTrue(msg in context.exception) - self.assertTrue(m.set_flavor_id.called) - self.assertTrue(m.set_image_id.called) - - def test_deploy_vnf_with_error(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'Clearwater') as m, \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_or_create_flavor', - return_value=(True, 'test_flavor')), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_image_id', - return_value='image_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_external_net', - return_value='ext_net'): - mock_obj = mock.Mock() - attrs = {'deploy_vnf.return_value': 'error'} - mock_obj.configure_mock(**attrs) - - m.return_value = mock_obj - - self.assertEqual(self.ims_vnf.deploy_vnf(), - {'status': 'FAIL', 'result': 'error'}) - - def test_deploy_vnf_default(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'Clearwater') as m, \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_or_create_flavor', - return_value=(True, 'test_flavor')), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_image_id', - return_value='image_id'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os_utils.get_external_net', - return_value='ext_net'): - mock_obj = mock.Mock() - attrs = {'deploy_vnf.return_value': ''} - mock_obj.configure_mock(**attrs) - - m.return_value = mock_obj - - self.assertEqual(self.ims_vnf.deploy_vnf(), - {'status': 'PASS', 'result': ''}) - - def test_test_vnf_ip_retrieval_failure(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os.popen', side_effect=Exception), \ - self.assertRaises(Exception) as context: - msg = "Unable to retrieve the IP of the " - msg += "cloudify manager server !" - self.ims_vnf.test_vnf() - self.assertTrue(msg in context.exception) - - def test_test_vnf_fail(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os.popen'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'requests.get') as mock_get, \ - mock.patch.object(self.ims_vnf, 'config_ellis'), \ - mock.patch.object(self.ims_vnf, - 'run_clearwater_live_test') as clearwater_obj: - clearwater_obj.return_value = '' - - mock_obj2 = mock.Mock() - attrs = {'json.return_value': {'outputs': - {'dns_ip': 'test_dns_ip', - 'ellis_ip': 'test_ellis_ip'}}} - mock_obj2.configure_mock(**attrs) - mock_get.return_value = mock_obj2 - - self.assertEqual(self.ims_vnf.test_vnf(), - {'status': 'FAIL', 'result': ''}) - - def test_test_vnf_pass(self): - with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'os.popen'), \ - mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' - 'requests.get') as mock_get, \ - mock.patch.object(self.ims_vnf, 'config_ellis'), \ - mock.patch.object(self.ims_vnf, - 'run_clearwater_live_test') as clearwater_obj: - clearwater_obj.return_value = 'vims_test_result' - - mock_obj2 = mock.Mock() - attrs = {'json.return_value': {'outputs': - {'dns_ip': 'test_dns_ip', - 'ellis_ip': 'test_ellis_ip'}}} - mock_obj2.configure_mock(**attrs) - mock_get.return_value = mock_obj2 - - self.assertEqual(self.ims_vnf.test_vnf(), - {'status': 'PASS', - 'result': 'vims_test_result'}) - - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) +#!/usr/bin/env python + +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 + +import logging +import unittest + +import mock + +from functest.core import vnf +from functest.opnfv_tests.vnf.ims import cloudify_ims + + +class CloudifyImsTesting(unittest.TestCase): + + def setUp(self): + + self.tenant = 'cloudify_ims' + self.creds = {'username': 'user', + 'password': 'pwd'} + self.orchestrator = {'name': 'cloudify', + 'version': '4.0', + 'object': 'foo', + 'requirements': {'flavor': {'name': 'm1.medium', + 'ram_min': 4096}, + 'os_image': 'manager_4.0'}} + + self.vnf = {'name': 'clearwater', + 'descriptor': {'version': '108', + 'file_name': 'openstack-blueprint.yaml', + 'name': 'clearwater-opnfv', + 'url': 'https://foo', + 'requirements': {'flavor': + {'name': 'm1.medium', + 'ram_min': 2048}}}} + + with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' + 'os.makedirs'), \ + mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.' + 'get_config', return_value={ + 'tenant_images': 'foo', + 'orchestrator': self.orchestrator, + 'vnf': self.vnf, + 'vnf_test_suite': '', + 'version': 'whatever'}): + + self.ims_vnf = cloudify_ims.CloudifyIms() + + self.images = {'image1': 'url1', + 'image2': 'url2'} + self.details = {'orchestrator': {'status': 'PASS', 'duration': 120}, + 'vnf': {}, + 'test_vnf': {}} + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client', + return_value='test') + @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf', + return_value=True) + @mock.patch('functest.core.vnf.os_utils.get_or_create_user_for_vnf', + return_value=True) + @mock.patch('functest.core.vnf.os_utils.get_credentials', + return_value={'auth_url': 'test/v1'}) + @mock.patch('snaps.openstack.create_image.OpenStackImage.create') + def test_prepare_default(self, *args): + self.assertIsNone(self.ims_vnf.prepare()) + args[4].assert_called_once_with() + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client', + return_value='test') + @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf', + return_value=True) + @mock.patch('functest.core.vnf.os_utils.get_or_create_user_for_vnf', + return_value=True) + @mock.patch('functest.core.vnf.os_utils.get_credentials', + return_value={'auth_url': 'test/no_v'}) + @mock.patch('snaps.openstack.create_image.OpenStackImage.create') + def test_prepare_bad_auth_url(self, *args): + with self.assertRaises(Exception): + self.ims_vnf.prepare() + args[0].assert_not_called() + + def test_prepare_missing_param(self): + with self.assertRaises(vnf.VnfPreparationException): + self.ims_vnf.prepare() + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client', + side_effect=Exception) + def test_prepare_keystone_exception(self, *args): + with self.assertRaises(vnf.VnfPreparationException): + self.ims_vnf.prepare() + args[0].assert_called_once_with() + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client', + return_value='test') + @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf', + side_effect=Exception) + def test_prepare_tenant_exception(self, *args): + with self.assertRaises(vnf.VnfPreparationException): + self.ims_vnf.prepare() + args[1].assert_called_once_with() + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client', + return_value='test') + @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf', + return_value=True) + @mock.patch('functest.core.vnf.os_utils.get_or_create_user_for_vnf', + side_effect=Exception) + def test_prepare_user_exception(self, *args): + with self.assertRaises(vnf.VnfPreparationException): + self.ims_vnf.prepare() + args[2].assert_called_once_with() + + @mock.patch('functest.core.vnf.os_utils.get_keystone_client', + return_value='test') + @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf', + return_value=True) + @mock.patch('functest.core.vnf.os_utils.get_or_create_user_for_vnf', + return_value=True) + @mock.patch('functest.core.vnf.os_utils.get_credentials', + side_effect=Exception) + def test_prepare_credentials_exception(self, *args): + with self.assertRaises(vnf.VnfPreparationException): + self.ims_vnf.prepare() + args[0].assert_called_once_with() + + # @mock.patch('snaps.openstack.create_keypairs.OpenStackKeypair', + # side_effect=Exception) + # def test_deploy_orchestrator_keypair_exception(self, *args): + # with self.assertRaises(vnf.OrchestratorDeploymentException): + # self.ims_vnf.deploy_orchestrator() + + # def test_deploy_orchestrator_network_creation_fail(self): + # def test_deploy_orchestrator_floatting_ip_creation_fail(self): + # def test_deploy_orchestrator_flavor_fail(self): + # def test_deploy_orchestrator_get_image_id_fail(self): + # def test_deploy_orchestrator_create_instance_fail(self): + # def test_deploy_orchestrator_secgroup_fail(self): + # def test_deploy_orchestrator_add_floating_ip_fail(self): + # def test_deploy_orchestrator_get_endpoint_fail(self): + # def test_deploy_orchestrator_initiate CloudifyClient_fail(self): + # def test_deploy_orchestrator_get_status_fail(self): + # + + # def test_deploy_vnf(self): + # def test_deploy_vnf_publish_fail(self): + # def test_deploy_vnf_get_flavor_fail(self): + # def test_deploy_vnf_get_external_net_fail(self): + # def test_deploy_vnf_deployment_create_fail(self): + # def test_deploy_vnf_start_fail(self): + # + # def test_test_vnf(self): + # def test_test_vnf_deployment_get_fail(self): + # def test_test_vnf_run_live_test_fail(self): + # + # def test_clean(self): + # def test_clean_execution_start_fail(self): + # def test_clean_deployment_delete_fail(self): + # def test_clean_blueprint_delete_fail(self): + + +if __name__ == "__main__": + logging.disable(logging.CRITICAL) + unittest.main(verbosity=2) diff --git a/functest/tests/unit/vnf/ims/test_orchestrator_cloudify.py b/functest/tests/unit/vnf/ims/test_orchestrator_cloudify.py deleted file mode 100644 index 570646644..000000000 --- a/functest/tests/unit/vnf/ims/test_orchestrator_cloudify.py +++ /dev/null @@ -1,176 +0,0 @@ -#!/usr/bin/env python - -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 - -import logging -import subprocess32 as subprocess -import unittest - -import mock - -from functest.opnfv_tests.vnf.ims import orchestrator_cloudify - - -class ImsVnfTesting(unittest.TestCase): - - def setUp(self): - self.orchestrator = orchestrator_cloudify.Orchestrator('test_dir') - self.bp = {'file_name': 'test_file', - 'destination_folder': 'test_folder', - 'url': 'test_url', - 'branch': 'test_branch'} - - def test_download_manager_blueprint_download_blueprint_failed(self): - self.orchestrator.manager_blueprint = False - with mock.patch.object(self.orchestrator, '_download_blueprints', - return_value=False), \ - mock.patch('functest.opnfv_tests.vnf.ims.orchestrator_cloudify.' - 'exit') as mock_exit: - self.orchestrator.download_manager_blueprint('test_url', - 'test_branch') - mock_exit.assert_any_call(-1) - - def test_download_manager_blueprint_download_blueprint_passed(self): - self.orchestrator.manager_blueprint = False - with mock.patch.object(self.orchestrator, '_download_blueprints', - return_value=True): - self.orchestrator.download_manager_blueprint('test_url', - 'test_branch') - self.assertEqual(self.orchestrator.manager_blueprint, - True) - - def test_deploy_manager_failed(self): - self.orchestrator.manager_blueprint = True - with mock.patch('__builtin__.open', mock.mock_open()), \ - mock.patch('functest.opnfv_tests.vnf.ims.orchestrator_cloudify.' - 'os.remove'), \ - mock.patch('functest.opnfv_tests.vnf.ims.orchestrator_cloudify.' - 'execute_command', return_value='error'): - self.assertEqual(self.orchestrator.deploy_manager(), - 'error') - self.assertEqual(self.orchestrator.manager_up, - False) - - def test_deploy_manager_passed(self): - self.orchestrator.manager_blueprint = True - with mock.patch('__builtin__.open', mock.mock_open()), \ - mock.patch('functest.opnfv_tests.vnf.ims.orchestrator_cloudify.' - 'os.remove'), \ - mock.patch('functest.opnfv_tests.vnf.ims.orchestrator_cloudify.' - 'execute_command', return_value=''): - self.orchestrator.deploy_manager() - self.assertEqual(self.orchestrator.manager_up, - True) - - def test_undeploy_manager_passed(self): - with mock.patch('functest.opnfv_tests.vnf.ims.orchestrator_cloudify.' - 'execute_command', return_value=''): - self.orchestrator.deploy_manager() - self.assertEqual(self.orchestrator.manager_up, - False) - - def test_dwnld_upload_and_depl_blueprint_dwnld_blueprint_failed(self): - with mock.patch.object(self.orchestrator, '_download_blueprints', - return_value=False), \ - mock.patch('functest.opnfv_tests.vnf.ims.orchestrator_cloudify.' - 'exit', side_effect=Exception) as mock_exit, \ - self.assertRaises(Exception): - self.orchestrator.download_upload_and_deploy_blueprint(self.bp, - 'cfig', - 'bpn', - 'dpn') - mock_exit.assert_any_call(-1) - - def test_dwnld_upload_and_depl_blueprint_failed(self): - with mock.patch.object(self.orchestrator, '_download_blueprints', - return_value=True), \ - mock.patch('__builtin__.open', mock.mock_open()), \ - mock.patch('functest.opnfv_tests.vnf.ims.orchestrator_cloudify.' - 'execute_command', return_value='error'): - r = self.orchestrator.download_upload_and_deploy_blueprint(self.bp, - 'cfig', - 'bpn', - 'dpn') - self.assertEqual(r, 'error') - - def test__download_blueprints_failed(self): - with mock.patch('functest.opnfv_tests.vnf.ims.orchestrator_cloudify.' - 'shutil.rmtree'), \ - mock.patch('functest.opnfv_tests.vnf.ims.orchestrator_cloudify.' - 'Repo.clone_from', side_effect=Exception): - self.assertEqual(self.orchestrator._download_blueprints('bp_url', - 'branch', - 'dest'), - False) - - def test__download_blueprints_passed(self): - with mock.patch('functest.opnfv_tests.vnf.ims.orchestrator_cloudify.' - 'shutil.rmtree'), \ - mock.patch('functest.opnfv_tests.vnf.ims.orchestrator_cloudify.' - 'Repo.clone_from'): - self.assertEqual(self.orchestrator._download_blueprints('bp_url', - 'branch', - 'dest'), - True) - - def test_execute_command_failed(self): - with mock.patch('__builtin__.open', - mock.mock_open(read_data='test_data\n')): - subprocess.call = mock.create_autospec(subprocess.call, - return_value=0) - mock_log = mock.Mock() - cmd = 'test_cmd -e test_env bash_script' - ret = orchestrator_cloudify.execute_command(cmd, mock_log, - timeout=100) - self.assertEqual(ret, False) - - def test_execute_command_default(self): - with mock.patch('__builtin__.open', - mock.mock_open(read_data='test_data\n')): - subprocess.call = mock. \ - create_autospec(subprocess.call, - return_value=subprocess.TimeoutExpired) - mock_log = mock.Mock() - cmd = 'test_cmd -e test_env bash_script' - ret = orchestrator_cloudify.execute_command(cmd, mock_log, - timeout=100) - self.assertEqual(ret, ['test_data\n']) - - def test_set_methods(self): - self.orchestrator.set_credentials('test_username', 'test_password', - 'test_tenant_name', 'test_auth_url') - self.assertTrue(self.orchestrator.config['keystone_username'], - 'test_username') - self.assertTrue(self.orchestrator.config['keystone_password'], - 'test_password') - self.assertTrue(self.orchestrator.config['keystone_url'], - 'test_auth_url') - self.assertTrue(self.orchestrator.config['keystone_tenant_name'], - 'test_tenant_name') - self.orchestrator.set_flavor_id('test_flavor_id') - self.assertTrue(self.orchestrator.config['flavor_id'], - 'test_flavor_id') - self.orchestrator.set_image_id('test_image_id') - self.assertTrue(self.orchestrator.config['image_id'], 'test_image_id') - self.orchestrator.set_external_network_name('test_network') - self.assertTrue(self.orchestrator.config['external_network_name'], - 'test_network') - self.orchestrator.set_ssh_user('test_user') - self.assertTrue(self.orchestrator.config['ssh_user'], - 'test_user') - self.orchestrator.set_nova_url('test_nova_url') - self.assertTrue(self.orchestrator.config['nova_url'], - 'test_nova_url') - self.orchestrator.set_neutron_url('test_neutron_url') - self.assertTrue(self.orchestrator.config['neutron_url'], - 'test_neutron_url') - self.orchestrator.set_nameservers(['test_subnet']) - self.assertTrue(self.orchestrator.config['dns_subnet_1'], - 'test_subnet') - -if __name__ == "__main__": - logging.disable(logging.CRITICAL) - unittest.main(verbosity=2) diff --git a/functest/utils/openstack_utils.py b/functest/utils/openstack_utils.py index 0ab630583..f8719bf0c 100644 --- a/functest/utils/openstack_utils.py +++ b/functest/utils/openstack_utils.py @@ -279,19 +279,27 @@ def get_heat_client(other_creds={}): def download_and_add_image_on_glance(glance, image_name, image_url, data_dir): - dest_path = data_dir - if not os.path.exists(dest_path): - os.makedirs(dest_path) - file_name = image_url.rsplit('/')[-1] - if not ft_utils.download_url(image_url, dest_path): - return False - - image = create_glance_image( - glance, image_name, dest_path + file_name) - if not image: - return False + try: + dest_path = data_dir + if not os.path.exists(dest_path): + os.makedirs(dest_path) + file_name = image_url.rsplit('/')[-1] + if not ft_utils.download_url(image_url, dest_path): + return False + except Exception: + raise Exception("Impossible to download image from {}".format( + image_url)) - return image + try: + image = create_glance_image( + glance, image_name, dest_path + file_name) + if not image: + return False + else: + return image + except Exception: + raise Exception("Impossible to put image {} in glance".format( + image_name)) # ********************************************* @@ -417,7 +425,7 @@ def get_or_create_flavor(flavor_name, ram, disk, vcpus, public=True): flavor_id = create_flavor( nova_client, flavor_name, ram, disk, vcpus, public=public) if not flavor_id: - logger.error("Failed to create flavor '%s'..." % (flavor_name)) + raise Exception("Failed to create flavor '%s'..." % (flavor_name)) else: logger.debug("Flavor '%s' with ID=%s created successfully." % (flavor_name, flavor_id)) @@ -736,9 +744,12 @@ def create_neutron_net(neutron_client, name): return None -def create_neutron_subnet(neutron_client, name, cidr, net_id): +def create_neutron_subnet(neutron_client, name, cidr, net_id, + dns=['8.8.8.8', '8.8.4.4']): json_body = {'subnets': [{'name': name, 'cidr': cidr, - 'ip_version': 4, 'network_id': net_id}]} + 'ip_version': 4, 'network_id': net_id, + 'dns_nameservers': dns}]} + try: subnet = neutron_client.create_subnet(body=json_body) return subnet['subnets'][0]['id'] @@ -889,7 +900,8 @@ def create_network_full(neutron_client, net_name, subnet_name, router_name, - cidr): + cidr, + dns=['8.8.8.8', '8.8.4.4']): # Check if the network already exists network_id = get_network_id(neutron_client, net_name) @@ -909,7 +921,7 @@ def create_network_full(neutron_client, logger.debug("Network '%s' created successfully" % network_id) logger.debug('Creating Subnet....') subnet_id = create_neutron_subnet(neutron_client, subnet_name, - cidr, network_id) + cidr, network_id, dns) if not subnet_id: return None @@ -1462,13 +1474,18 @@ def get_or_create_user_for_vnf(keystone_client, vnf_ref): try: user_id = get_user_id(keystone_client, vnf_ref) tenant_id = get_tenant_id(keystone_client, vnf_ref) + created = False if not user_id: user_id = create_user(keystone_client, vnf_ref, vnf_ref, "", tenant_id) - return True - else: - return False - add_role_user(keystone_client, user_id, 'admin', vnf_ref) + created = True + try: + role_id = get_role_id(keystone_client, 'admin') + tenant_id = get_tenant_id(keystone_client, vnf_ref) + add_role_user(keystone_client, user_id, role_id, tenant_id) + except: + logger.warn("Cannot associate user to role admin on tenant") + return created except: raise Exception("Impossible to create a user for the VNF {}".format( vnf_ref)) diff --git a/requirements.txt b/requirements.txt index b87807937..a7a1ac17f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,11 +16,11 @@ robotframework-httplibrary==0.4.2 robotframework-requests==0.4.7 robotframework-sshlibrary==2.1.3;python_version=='2.7' scp==0.10.2 -subprocess32;python_version=='2.7' dnspython>=1.14.0;python_version=='2.7' # http://www.dnspython.org/LICENSE dnspython3!=1.13.0,!=1.14.0,>=1.12.0;python_version>='3.0' # http://www.dnspython.org/LICENSE click==6.6 openbaton-cli==2.2.1-beta7 +cloudify_rest_client==4.0 mock>=2.0 # BSD iniparse==0.4 PrettyTable<0.8,>=0.7.1 # BSD -- 2.16.6