# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
-import inspect
+"""Define the parent class of all VNF TestCases."""
+
import logging
import time
import functest.core.testcase as base
from functest.utils.constants import CONST
-import functest.utils.functest_utils as ft_utils
import functest.utils.openstack_utils as os_utils
+__author__ = ("Morgan Richomme <morgan.richomme@orange.com>, "
+ "Valentin Boucher <valentin.boucher@orange.com>")
+
+
+class VnfPreparationException(Exception):
+ """Raise when VNF preparation cannot be executed."""
+
+
+class OrchestratorDeploymentException(Exception):
+ """Raise when orchestrator cannot be deployed."""
+
+
+class VnfDeploymentException(Exception):
+ """Raise when VNF cannot be deployed."""
+
+
+class VnfTestException(Exception):
+ """Raise when VNF cannot be tested."""
+
class VnfOnBoarding(base.TestCase):
+ """Base model for VNF test cases."""
__logger = logging.getLogger(__name__)
def __init__(self, **kwargs):
super(VnfOnBoarding, self).__init__(**kwargs)
- self.repo = kwargs.get('repo', '')
- self.cmd = kwargs.get('cmd', '')
- self.details = {}
- self.result_dir = CONST.__getattribute__('dir_results')
- self.details_step_mapping = dict(
- deploy_orchestrator='orchestrator',
- deploy_vnf='vnf',
- test_vnf='test_vnf',
- prepare='prepare_env')
- self.details['prepare_env'] = {}
- self.details['orchestrator'] = {}
- self.details['vnf'] = {}
- self.details['test_vnf'] = {}
- self.images = {}
- try:
- self.tenant_name = CONST.__getattribute__(
- 'vnf_{}_tenant_name'.format(self.case_name))
- self.tenant_description = CONST.__getattribute__(
- 'vnf_{}_tenant_description'.format(self.case_name))
- except Exception:
- # raise Exception("Unknown VNF case=" + self.case_name)
- self.__logger.error("Unknown VNF case={}".format(self.case_name))
-
- try:
- self.images = CONST.__getattribute__(
- 'vnf_{}_tenant_images'.format(self.case_name))
- except Exception:
- self.__logger.warn("No tenant image defined for this VNF")
-
- def execute(self):
+ self.tenant_created = False
+ self.user_created = False
+ self.tenant_name = CONST.__getattribute__(
+ 'vnf_{}_tenant_name'.format(self.case_name))
+ self.tenant_description = CONST.__getattribute__(
+ 'vnf_{}_tenant_description'.format(self.case_name))
+
+ def run(self, **kwargs):
+ """
+ Run of the VNF test case:
+
+ * Deploy an orchestrator if needed (e.g. heat, cloudify, ONAP),
+ * Deploy the VNF,
+ * Perform tests on the VNF
+
+ A VNF test case is successfull when the 3 steps are PASS
+ If one of the step is FAIL, the test case is FAIL
+
+ Returns:
+ TestCase.EX_OK if result is 'PASS'.
+ TestCase.EX_TESTCASE_FAILED otherwise.
+ """
self.start_time = time.time()
- # Prepare the test (Create Tenant, User, ...)
+
try:
- self.__logger.info("Create VNF Onboarding environment")
self.prepare()
- except Exception:
- self.__logger.error("Error during VNF Onboarding environment"
- "creation", exc_info=True)
+ if (self.deploy_orchestrator() and
+ self.deploy_vnf() and
+ self.test_vnf()):
+ self.stop_time = time.time()
+ # Calculation with different weight depending on the steps TODO
+ self.result = 100
+ return base.TestCase.EX_OK
+ else:
+ self.result = 0
+ return base.TestCase.EX_TESTCASE_FAILED
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Exception on VNF testing")
return base.TestCase.EX_TESTCASE_FAILED
- # Deploy orchestrator
- try:
- self.__logger.info("Deploy orchestrator (if necessary)")
- orchestrator_ready_time = time.time()
- res_orchestrator = self.deploy_orchestrator()
- # orchestrator is not mandatory
- if res_orchestrator is not None:
- self.details['orchestrator']['status'] = (
- res_orchestrator['status'])
- self.details['orchestrator']['result'] = (
- res_orchestrator['result'])
- self.details['orchestrator']['duration'] = round(
- orchestrator_ready_time - self.start_time, 1)
- except Exception:
- self.__logger.warn("Problem with the Orchestrator", exc_info=True)
-
- # Deploy VNF
- try:
- self.__logger.info("Deploy VNF " + self.case_name)
- res_deploy_vnf = self.deploy_vnf()
- vnf_ready_time = time.time()
- self.details['vnf']['status'] = res_deploy_vnf['status']
- self.details['vnf']['result'] = res_deploy_vnf['result']
- self.details['vnf']['duration'] = round(
- vnf_ready_time - orchestrator_ready_time, 1)
- except Exception:
- self.__logger.error("Error during VNF deployment", exc_info=True)
- return base.TestCase.EX_TESTCASE_FAILED
+ def prepare(self):
+ """
+ Prepare the environment for VNF testing:
- # Test VNF
- try:
- self.__logger.info("Test VNF")
- res_test_vnf = self.test_vnf()
- test_vnf_done_time = time.time()
- self.details['test_vnf']['status'] = res_test_vnf['status']
- self.details['test_vnf']['result'] = res_test_vnf['result']
- self.details['test_vnf']['duration'] = round(
- test_vnf_done_time - vnf_ready_time, 1)
- except Exception:
- self.__logger.error("Error when running VNF tests", exc_info=True)
- return base.TestCase.EX_TESTCASE_FAILED
+ * Creation of a user,
+ * Creation of a tenant,
+ * Allocation admin role to the user on this tenant
- # Clean the system
- self.clean()
- self.stop_time = time.time()
+ Returns base.TestCase.EX_OK if preparation is successfull
- exit_code = self.parse_results()
- self.log_results()
- return exit_code
+ Raise VnfPreparationException in case of problem
+ """
+ try:
+ self.__logger.info("Prepare VNF: %s, description: %s",
+ self.tenant_name, self.tenant_description)
+ admin_creds = os_utils.get_credentials()
+ keystone_client = os_utils.get_keystone_client()
+ self.tenant_created = os_utils.get_or_create_tenant_for_vnf(
+ keystone_client, self.tenant_name, self.tenant_description)
+ self.user_created = os_utils.get_or_create_user_for_vnf(
+ keystone_client, self.tenant_name)
+ creds = admin_creds.copy()
+ creds.update({
+ "tenant": self.tenant_name,
+ "username": self.tenant_name,
+ "password": self.tenant_name
+ })
+ return base.TestCase.EX_OK
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Exception raised during VNF preparation")
+ raise VnfPreparationException
+
+ def deploy_orchestrator(self):
+ """
+ Deploy an orchestrator (optional).
+
+ If function overwritten
+ raise orchestratorDeploymentException if error during orchestrator
+ deployment
+ """
+ self.__logger.info("Deploy orchestrator (if necessary)")
+ return True
- # prepare state could consist in the creation of the resources
- # a dedicated user
- # a dedicated tenant
- # dedicated images
- def prepare(self):
- self.creds = os_utils.get_credentials()
- self.keystone_client = os_utils.get_keystone_client()
-
- self.__logger.info(
- "Prepare OpenStack plateform(create tenant and user)")
- admin_user_id = os_utils.get_user_id(self.keystone_client,
- self.creds['username'])
- if not admin_user_id:
- self.step_failure("Failed to get id of {0}".format(
- self.creds['username']))
-
- tenant_id = os_utils.get_tenant_id(self.keystone_client,
- self.tenant_name)
- if not tenant_id:
- tenant_id = os_utils.create_tenant(self.keystone_client,
- self.tenant_name,
- self.tenant_description)
- if not tenant_id:
- self.step_failure("Failed to get or create {0} tenant".format(
- self.tenant_name))
- roles_name = ["admin", "Admin"]
- role_id = ''
- for role_name in roles_name:
- if not role_id:
- role_id = os_utils.get_role_id(self.keystone_client,
- role_name)
-
- if not role_id:
- self.step_failure("Failed to get id for {0} role".format(
- role_name))
-
- if not os_utils.add_role_user(self.keystone_client, admin_user_id,
- role_id, tenant_id):
- self.step_failure("Failed to add {0} on tenant".format(
- self.creds['username']))
-
- user_id = os_utils.get_or_create_user(self.keystone_client,
- self.tenant_name,
- self.tenant_name,
- tenant_id)
- if not user_id:
- self.step_failure("Failed to get or create {0} user".format(
- self.tenant_name))
-
- os_utils.add_role_user(self.keystone_client, user_id,
- role_id, tenant_id)
-
- self.__logger.info("Update OpenStack creds informations")
- self.admin_creds = self.creds.copy()
- self.admin_creds.update({
- "tenant": self.tenant_name
- })
- self.neutron_client = os_utils.get_neutron_client(self.admin_creds)
- self.nova_client = os_utils.get_nova_client(self.admin_creds)
- self.creds.update({
- "tenant": self.tenant_name,
- "username": self.tenant_name,
- "password": self.tenant_name,
- })
-
- # orchestrator is not mandatory to deploy and test VNF
- def deploy_orchestrator(self, **kwargs):
- pass
-
- # TODO see how to use built-in exception from releng module
def deploy_vnf(self):
+ """
+ Deploy the VNF
+
+ This function MUST be implemented by vnf test cases.
+ The details section MAY be updated in the vnf test cases.
+
+ The deployment can be executed via a specific orchestrator
+ or using nuild-in orchestrators such as:
+
+ * heat, openbaton, cloudify (available on all scenario),
+ * open-o (on open-o scenarios)
+
+ Returns:
+ True if the VNF is properly deployed
+ False if the VNF is not deployed
+
+ Raise VnfDeploymentException if error during VNF deployment
+ """
self.__logger.error("VNF must be deployed")
- raise Exception("VNF not deployed")
+ raise VnfDeploymentException
def test_vnf(self):
+ """
+ Test the VNF
+
+ This function MUST be implemented by vnf test cases.
+ The details section MAY be updated in the vnf test cases.
+
+ Once a VNF is deployed, it is assumed that specific test suite can be
+ run to validate the VNF.
+ Please note that the same test suite can be used on several test case
+ (e.g. clearwater test suite can be used whatever the orchestrator used
+ for the deployment)
+
+ Returns:
+ True if VNF tests are PASS
+ False if test suite is FAIL
+
+ Raise VnfTestException if error during VNF test
+ """
self.__logger.error("VNF must be tested")
- raise Exception("VNF not tested")
+ raise VnfTestException
- # clean before openstack clean run
def clean(self):
- self.__logger.info("test cleaning")
+ """
+ Clean VNF test case.
- def parse_results(self):
- exit_code = self.EX_OK
- self.result = "PASS"
- self.__logger.info(self.details)
- # The 2 VNF steps must be OK to get a PASS result
- if (self.details['vnf']['status'] is not "PASS" or
- self.details['test_vnf']['status'] is not "PASS"):
- exit_code = self.EX_RUN_ERROR
- self.result = "FAIL"
- return exit_code
-
- def log_results(self):
- ft_utils.logger_test_results(self.project_name,
- self.case_name,
- self.result,
- self.details)
-
- def step_failure(self, error_msg):
- part = inspect.stack()[1][3]
- self.__logger.error("Step {0} failed: {1}".format(part, error_msg))
- try:
- step_name = self.details_step_mapping[part]
- part_info = self.details[step_name]
- except KeyError:
- self.details[part] = {}
- part_info = self.details[part]
- part_info['status'] = 'FAIL'
- part_info['result'] = error_msg
- raise Exception(error_msg)
+ It is up to the test providers to delete resources used for the tests.
+ By default we clean:
+
+ * the user,
+ * the tenant
+ """
+ self.__logger.info("test cleaning")
+ keystone_client = os_utils.get_keystone_client()
+ if self.tenant_created:
+ os_utils.delete_tenant(keystone_client, self.tenant_name)
+ if self.user_created:
+ os_utils.delete_user(keystone_client, self.tenant_name)
# http://www.apache.org/licenses/LICENSE-2.0
import logging
-import sys
-import argparse
-
-import functest.core.testcase as testcase
import functest.core.vnf as vnf
class AaaVnf(vnf.VnfOnBoarding):
+ """AAA VNF sample"""
logger = logging.getLogger(__name__)
def deploy_orchestrator(self):
self.logger.info("No VNFM needed to deploy a free radius here")
- return None
+ return True
-# TODO see how to use build in exception form releng module
def deploy_vnf(self):
self.logger.info("Freeradius VNF deployment")
- # TODO apt-get update + config tuning
- deploy_vnf = {}
- deploy_vnf['status'] = "PASS"
- deploy_vnf['result'] = {}
- return deploy_vnf
+ # find a way to deploy freeradius and tester (heat,manual, ..)
+ deploy_vnf = {'status': 'PASS', 'version': 'xxxx'}
+ self.details['deploy_vnf'] = deploy_vnf
+ return True
def test_vnf(self):
self.logger.info("Run test towards freeradius")
- # TODO: once the freeradius is deployed..make some tests
- test_vnf = {}
- test_vnf['status'] = "PASS"
- test_vnf['result'] = {}
- return test_vnf
-
- def main(self, **kwargs):
- self.logger.info("AAA VNF onboarding")
- self.execute()
- if self.result is "PASS":
- return self.EX_OK
- else:
- return self.EX_RUN_ERROR
-
- def run(self):
- kwargs = {}
- return self.main(**kwargs)
-
-
-if __name__ == '__main__':
- logging.basicConfig()
- parser = argparse.ArgumentParser()
- args = vars(parser.parse_args())
- aaa_vnf = AaaVnf()
- try:
- result = aaa_vnf.main(**args)
- if result != testcase.TestCase.EX_OK:
- sys.exit(result)
- if args['pushtodb']:
- sys.exit(aaa_vnf.push_to_db())
- except Exception:
- sys.exit(testcase.TestCase.EX_RUN_ERROR)
+ # once the freeradius is deployed..make some tests
+ test_vnf = {'status': 'PASS', 'version': 'xxxx'}
+ self.details['test_vnf'] = test_vnf
+ return True
from functest.utils.constants import CONST
import functest.utils.functest_utils as ft_utils
+__author__ = ("Valentin Boucher <valentin.boucher@orange.com>, "
+ "Helen Yao <helanyao@gmail.com>")
+
class ClearwaterOnBoardingBase(vnf.VnfOnBoarding):
+ """ vIMS clearwater base usable by several orchestrators"""
def __init__(self, **kwargs):
self.logger = logging.getLogger(__name__)
super(ClearwaterOnBoardingBase, self).__init__(**kwargs)
- self.case_dir = os.path.join(CONST.dir_functest_test, 'vnf', 'ims')
- self.data_dir = CONST.dir_ims_data
- self.result_dir = os.path.join(CONST.dir_results, self.case_name)
- self.test_dir = CONST.dir_repo_vims_test
+ self.case_dir = os.path.join(
+ CONST.__getattribute__('dir_functest_test'),
+ 'vnf',
+ 'ims')
+ self.data_dir = CONST.__getattribute__('dir_ims_data')
+ self.result_dir = os.path.join(CONST.__getattribute__('dir_results'),
+ self.case_name)
+ self.test_dir = CONST.__getattribute__('dir_repo_vims_test')
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)
if rq.status_code != 201:
raise Exception('Failed to get cookie for Ellis')
cookies = rq.cookies
- self.logger.info('Cookies: %s' % cookies)
+ self.logger.info('Cookies: %s', cookies)
number_url = 'http://{0}/accounts/{1}/numbers'.format(
ellis_ip,
import functest.utils.functest_utils as ft_utils
import functest.utils.openstack_utils as os_utils
+__author__ = "Valentin Boucher <valentin.boucher@orange.com>"
+
class CloudifyIms(clearwater_ims_base.ClearwaterOnBoardingBase):
+ """Clearwater vIMS deployed with Cloudify Orchestrator Case"""
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = "cloudify_ims"
super(CloudifyIms, self).__init__(**kwargs)
self.logger = logging.getLogger(__name__)
+ self.neutron_client = ''
+ self.glance_client = ''
+ self.keystone_client = ''
+ self.nova_client = ''
# Retrieve the configuration
try:
blueprint=get_config("cloudify.blueprint", config_file),
inputs=get_config("cloudify.inputs", config_file)
)
- self.logger.debug("Orchestrator configuration: %s" % self.orchestrator)
+ self.logger.debug("Orchestrator configuration: %s", self.orchestrator)
self.vnf = dict(
blueprint=get_config("clearwater.blueprint", config_file),
deployment_name=get_config("clearwater.deployment_name",
inputs=get_config("clearwater.inputs", config_file),
requirements=get_config("clearwater.requirements", config_file)
)
- self.logger.debug("VNF configuration: %s" % self.vnf)
+ self.logger.debug("VNF configuration: %s", self.vnf)
self.images = get_config("tenant_images", config_file)
- self.logger.info("Images needed for vIMS: %s" % self.images)
+ self.logger.info("Images needed for vIMS: %s", self.images)
- def deploy_orchestrator(self, **kwargs):
+ def deploy_orchestrator(self):
self.logger.info("Additional pre-configuration steps")
self.neutron_client = os_utils.get_neutron_client(self.admin_creds)
self.logger.info("Upload some OS images if it doesn't exist")
temp_dir = os.path.join(self.data_dir, "tmp/")
for image_name, image_url in self.images.iteritems():
- self.logger.info("image: %s, url: %s" % (image_name, image_url))
+ self.logger.info("image: %s, url: %s", image_name, image_url)
try:
image_id = os_utils.get_image_id(self.glance_client,
image_name)
- self.logger.debug("image_id: %s" % image_id)
+ self.logger.debug("image_id: %s", image_id)
except Exception:
- self.logger.error("Unexpected error: %s" % sys.exc_info()[0])
+ self.logger.error("Unexpected error: %s", sys.exc_info()[0])
if image_id == '':
- self.logger.info("""%s image doesn't exist on glance repository. Try
- downloading this image and upload on glance !""" % image_name)
- image_id = download_and_add_image_on_glance(self.glance_client,
- image_name,
- image_url,
- temp_dir)
- if image_id == '':
- self.step_failure(
- "Failed to find or upload required OS "
- "image for this deployment")
+ self.logger.info("""%s image does not exist on glance repo.
+ Try downloading this image
+ and upload on glance !""",
+ image_name)
+ image_id = os_utils.download_and_add_image_on_glance(
+ self.glance_client,
+ image_name,
+ image_url,
+ temp_dir)
# Need to extend quota
self.logger.info("Update security group quota for this tenant")
tenant_id = os_utils.get_tenant_id(self.keystone_client,
self.tenant_name)
- self.logger.debug("Tenant id found %s" % tenant_id)
+ self.logger.debug("Tenant id found %s", tenant_id)
if not os_utils.update_sg_quota(self.neutron_client,
tenant_id, 50, 100):
- self.step_failure("Failed to update security group quota" +
+ self.logger.error("Failed to update security group quota"
" for tenant " + self.tenant_name)
+
self.logger.debug("group quota extended")
# start the deployment of cloudify
public_auth_url = os_utils.get_endpoint('identity')
- self.logger.debug("CFY inputs: %s" % self.orchestrator['inputs'])
+ self.logger.debug("CFY inputs: %s", self.orchestrator['inputs'])
cfy = Orchestrator(self.data_dir, self.orchestrator['inputs'])
self.orchestrator['object'] = cfy
self.logger.debug("Orchestrator object created")
- self.logger.debug("Tenant name: %s" % self.tenant_name)
+ self.logger.debug("Tenant name: %s", self.tenant_name)
cfy.set_credentials(username=self.tenant_name,
password=self.tenant_name,
# orchestrator VM flavor
self.logger.info("Check Flavor is available, if not create one")
- self.logger.debug("Flavor details %s " %
+ self.logger.debug("Flavor details %s ",
self.orchestrator['requirements']['ram_min'])
flavor_exist, flavor_id = os_utils.get_or_create_flavor(
"m1.large",
'50',
'2',
public=True)
- self.logger.debug("Flavor id: %s" % flavor_id)
+ self.logger.debug("Flavor id: %s", flavor_id)
if not flavor_id:
self.logger.info("Available flavors are: ")
self.logger.info(self.nova_client.flavor.list())
- self.step_failure("Failed to find required flavor"
- "for this deployment")
+
cfy.set_flavor_id(flavor_id)
self.logger.debug("Flavor OK")
image_id = os_utils.get_image_id(
self.glance_client,
self.orchestrator['requirements']['os_image'])
- self.logger.debug("Orchestrator image id: %s" % image_id)
+ self.logger.debug("Orchestrator image id: %s", image_id)
if image_id == '':
self.logger.error("CFY image not found")
- self.step_failure("Failed to find required OS image"
- " for cloudify manager")
- else:
- self.step_failure("Failed to find required OS image"
- " for cloudify manager")
cfy.set_image_id(image_id)
self.logger.debug("Orchestrator image set")
self.logger.debug("Get External network")
ext_net = os_utils.get_external_net(self.neutron_client)
- self.logger.debug("External network: %s" % ext_net)
- if not ext_net:
- self.step_failure("Failed to get external network")
+ self.logger.debug("External network: %s", ext_net)
cfy.set_external_network_name(ext_net)
self.logger.debug("CFY External network set")
'30',
'1',
public=True)
- self.logger.debug("Flavor id: %s" % flavor_id)
+ self.logger.debug("Flavor id: %s", flavor_id)
if not flavor_id:
self.logger.info("Available flavors are: ")
self.logger.info(self.nova_client.flavor.list())
- self.step_failure("Failed to find required flavor"
- " for this deployment")
cw.set_flavor_id(flavor_id)
if 'os_image' in self.vnf['requirements'].keys():
image_id = os_utils.get_image_id(
self.glance_client, self.vnf['requirements']['os_image'])
- if image_id == '':
- self.step_failure("Failed to find required OS image"
- " for clearwater VMs")
- else:
- self.step_failure("Failed to find required OS image"
- " for clearwater VMs")
cw.set_image_id(image_id)
-
ext_net = os_utils.get_external_net(self.neutron_client)
- if not ext_net:
- self.step_failure("Failed to get external network")
-
cw.set_external_network_name(ext_net)
error = cw.deploy_vnf(self.vnf['blueprint'])
mgr_ip = os.popen(cmd).read()
mgr_ip = mgr_ip.splitlines()[0]
except Exception:
- self.step_failure("Unable to retrieve the IP of the "
- "cloudify manager server !")
+ self.logger.exception("Unable to retrieve the IP of the "
+ "cloudify manager server !")
self.logger.info('Cloudify Manager: %s', mgr_ip)
api_url = 'http://{0}/api/v2/deployments/{1}/outputs'.format(
if dns_ip != "":
vims_test_result = self.run_clearwater_live_test(
dns_ip=dns_ip,
- public_domain=self.inputs["public_domain"])
+ public_domain="") # self.inputs["public_domain"]
if vims_test_result != '':
return {'status': 'PASS', 'result': vims_test_result}
else:
self.orchestrator['object'].undeploy_manager()
super(CloudifyIms, self).clean()
- def main(self, **kwargs):
- self.logger.info("Cloudify IMS VNF onboarding test starting")
- self.execute()
- self.logger.info("Cloudify IMS VNF onboarding test executed")
- if self.result is "PASS":
- return self.EX_OK
- else:
- return self.EX_RUN_ERROR
-
- def run(self):
- kwargs = {}
- return self.main(**kwargs)
-
# ----------------------------------------------------------
#
raise ValueError("The parameter %s is not defined in"
" reporting.yaml" % parameter)
return value
-
-
-def download_and_add_image_on_glance(glance, image_name, image_url, data_dir):
- dest_path = data_dir
- if not os.path.exists(dest_path):
- os.makedirs(dest_path)
- file_name = image_url.rsplit('/')[-1]
- if not ft_utils.download_url(image_url, dest_path):
- return False
-
- image = os_utils.create_glance_image(
- glance, image_name, dest_path + file_name)
- if not image:
- return False
-
- return image
import requests
import functest.opnfv_tests.vnf.ims.clearwater_ims_base as clearwater_ims_base
+from functest.utils.constants import CONST
class OperaIms(clearwater_ims_base.ClearwaterOnBoardingBase):
kwargs["case_name"] = "opera_ims"
super(OperaIms, self).__init__(**kwargs)
self.logger = logging.getLogger(__name__)
- self.ellis_file = os.path.join(self.result_dir, 'ellis.info')
- self.live_test_file = os.path.join(self.result_dir,
- 'live_test_report.json')
+ self.ellis_file = os.path.join(
+ CONST.__getattribute__('dir_results'), 'ellis.info')
+ self.live_test_file = os.path.join(
+ CONST.__getattribute__('dir_results'), 'live_test_report.json')
try:
self.openo_msb_endpoint = os.environ['OPENO_MSB_ENDPOINT']
except KeyError:
import yaml
import functest.core.vnf as vnf
-import functest.utils.functest_utils as ft_utils
import functest.utils.openstack_utils as os_utils
from functest.utils.constants import CONST
from org.openbaton.cli.agents.agents import MainAgent
from org.openbaton.cli.errors.errors import NfvoException
+
+__author__ = "Pauls, Michael <michael.pauls@fokus.fraunhofer.de>"
# ----------------------------------------------------------
#
# UTILS
# -----------------------------------------------------------
-def get_config(parameter, file):
+def get_config(parameter, my_file):
"""
Returns the value of a given parameter in file.yaml
parameter must be given in string format with dots
value = value.get(element)
if value is None:
raise ValueError("The parameter %s is not defined in"
- " %s" % (parameter, file))
+ " %s" % (parameter, my_file))
return value
-def download_and_add_image_on_glance(glance, image_name,
- image_url, data_dir):
- dest_path = data_dir
- if not os.path.exists(dest_path):
- os.makedirs(dest_path)
- file_name = image_url.rsplit('/')[-1]
- if not ft_utils.download_url(image_url, dest_path):
- return False
- image = os_utils.create_glance_image(
- glance, image_name, dest_path + file_name)
- if not image:
- return False
- return image
-
-
def servertest(host, port):
args = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
for family, socktype, proto, canonname, sockaddr in args:
class ImsVnf(vnf.VnfOnBoarding):
+ """OpenIMS VNF deployed with openBaton orchestrator"""
def __init__(self, project='functest', case_name='orchestra_ims',
repo='', cmd=''):
super(ImsVnf, self).__init__(project, case_name, repo, cmd)
+ self.logger = logging.getLogger(__name__)
+ self.logger.info("Orchestra IMS VNF onboarding test starting")
self.ob_password = "openbaton"
self.ob_username = "admin"
self.ob_https = False
self.ob_port = "8080"
self.ob_ip = "localhost"
self.ob_instance_id = ""
- self.logger = logging.getLogger(__name__)
- self.case_dir = os.path.join(CONST.dir_functest_test, 'vnf/ims/')
- self.data_dir = CONST.dir_ims_data
- self.test_dir = CONST.dir_repo_vims_test
+ self.case_dir = os.path.join(
+ CONST.__getattribute__('dir_functest_test'),
+ 'vnf/ims/')
+ self.data_dir = CONST.__getattribute__('dir_ims_data')
+ self.test_dir = CONST.__getattribute__('dir_repo_vims_test')
self.ob_projectid = ""
self.keystone_client = os_utils.get_keystone_client()
self.ob_nsr_id = ""
self.userdata_file = get_config("openbaton.userdata.file",
config_file)
- def deploy_orchestrator(self, **kwargs):
+ def deploy_orchestrator(self):
self.logger.info("Additional pre-configuration steps")
nova_client = os_utils.get_nova_client()
neutron_client = os_utils.get_neutron_client()
self.logger.info("Upload some OS images if it doesn't exist")
temp_dir = os.path.join(self.data_dir, "tmp/")
for image_name, image_url in self.images.iteritems():
- self.logger.info("image: %s, url: %s" % (image_name, image_url))
+ self.logger.info("image: %s, url: %s", image_name, image_url)
try:
image_id = os_utils.get_image_id(glance_client,
image_name)
- self.logger.info("image_id: %s" % image_id)
+ self.logger.info("image_id: %s", image_id)
except BaseException:
- self.logger.error("Unexpected error: %s" % sys.exc_info()[0])
+ self.logger.error("Unexpected error: %s", sys.exc_info()[0])
if image_id == '':
- self.logger.info("""%s image doesn't exist on glance repository. Try
- downloading this image and upload on glance !""" % image_name)
- image_id = download_and_add_image_on_glance(glance_client,
- image_name,
- image_url,
- temp_dir)
+ self.logger.info("""%s image doesn't exist on glance
+ repository. Try downloading this image
+ and upload on glance !""" % image_name)
+ image_id = os_utils.download_and_add_image_on_glance(
+ glance_client,
+ image_name,
+ image_url,
+ temp_dir)
if image_id == '':
- self.step_failure(
- "Failed to find or upload required OS "
- "image for this deployment")
+ self.logger.error("Failed to find or upload required OS "
+ "image for this deployment")
+ return False
+
network_dic = os_utils.create_network_full(neutron_client,
"openbaton_mgmt",
"openbaton_mgmt_subnet",
if floatip is None:
self.logger.error("Cannot create floating IP.")
+ return False
userdata = "#!/bin/bash\n"
userdata += "echo \"Executing userdata...\"\n"
self.logger.info("flavor: m1.medium\n"
"image: %s\n"
"network_id: %s\n"
- "userdata: %s\n"
- % (self.imagename, network_id, userdata))
+ "userdata: %s\n",
+ self.imagename,
+ network_id,
+ userdata)
instance = os_utils.create_instance_and_wait_for_active(
"orchestra",
os_utils.add_secgroup_to_instance(nova_client,
self.ob_instance_id, sg_id)
- self.logger.info("Associating floating ip: '%s' to VM '%s' "
- % (floatip, "orchestra-openbaton"))
+ self.logger.info("Associating floating ip: '%s' to VM '%s' ",
+ floatip,
+ "orchestra-openbaton")
if not os_utils.add_floating_ip(nova_client, instance.id, floatip):
- self.step_failure("Cannot associate floating IP to VM.")
+ self.logger.error("Cannot associate floating IP to VM.")
+ return False
self.logger.info("Waiting for Open Baton NFVO to be up and running...")
x = 0
x += 1
if x == 100:
- self.step_failure("Open Baton is not started correctly")
+ self.logger.error("Open Baton is not started correctly")
self.ob_ip = floatip
self.ob_password = "openbaton"
self.details["orchestrator"] = {
'status': "PASS", 'result': "Deploy Open Baton NFVO: OK"}
self.logger.info("Deploy Open Baton NFVO: OK")
+ return True
def deploy_vnf(self):
self.logger.info("Starting vIMS Deployment...")
'20',
'1',
public=True)
- self.logger.debug("Flavor id: %s" % flavor_id)
+ self.logger.debug("Flavor id: %s", flavor_id)
self.logger.info("Getting project 'default'...")
project_agent = self.main_agent.get_agent("project", self.ob_projectid)
for p in json.loads(project_agent.find()):
if p.get("name") == "default":
self.ob_projectid = p.get("id")
- self.logger.info("Found project 'default': %s" % p)
+ self.logger.info("Found project 'default': %s", p)
break
- self.logger.debug("project id: %s" % self.ob_projectid)
+ self.logger.debug("project id: %s", self.ob_projectid)
if self.ob_projectid == "":
- self.step_failure("Default project id was not found!")
+ self.logger.error("Default project id was not found!")
creds = os_utils.get_credentials()
- self.logger.info("PoP creds: %s" % creds)
+ self.logger.info("PoP creds: %s", creds)
if os_utils.is_keystone_v3():
self.logger.info(
"Using v2 API of OpenStack... -> Using OS_TENANT_NAME")
project_id = creds.get("tenant_name")
- self.logger.debug("project id: %s" % project_id)
+ self.logger.debug("project id: %s", project_id)
vim_json = {
"name": "vim-instance",
}
}
- self.logger.debug("Registering VIM: %s" % vim_json)
+ self.logger.debug("Registering VIM: %s", vim_json)
self.main_agent.get_agent(
"vim",
nsd = {}
try:
- self.logger.info("sending: %s" % self.market_link)
+ self.logger.info("sending: %s", self.market_link)
nsd = market_agent.create(entity=self.market_link)
self.logger.info("Onboarded NSD: " + nsd.get("name"))
except NfvoException as e:
- self.step_failure(e.message)
+ self.logger.error(e.message)
nsr_agent = self.main_agent.get_agent("nsr",
project_id=self.ob_projectid)
nsd_id = nsd.get('id')
if nsd_id is None:
- self.step_failure("NSD not onboarded correctly")
+ self.logger.error("NSD not onboarded correctly")
try:
self.nsr = nsr_agent.create(nsd_id)
except NfvoException as e:
- self.step_failure(e.message)
+ self.logger.error(e.message)
if self.nsr.get('code') is not None:
self.logger.error(
- "vIMS cannot be deployed: %s -> %s" %
- (self.nsr.get('code'), self.nsr.get('message')))
- self.step_failure("vIMS cannot be deployed")
+ "vIMS cannot be deployed: %s -> %s",
+ self.nsr.get('code'),
+ self.nsr.get('message'))
+ self.logger.error("vIMS cannot be deployed")
i = 0
self.logger.info("Waiting for NSR to go to ACTIVE...")
"status") != 'ERROR':
i += 1
if i == 150:
- self.step_failure("After %s sec the NSR did not go to ACTIVE.."
- % 5 * i)
+ self.logger.error("INACTIVE NSR after %s sec..", 5 * i)
+
time.sleep(5)
self.nsr = json.loads(nsr_agent.find(self.nsr.get('id')))
else:
self.details["vnf"] = {'status': "FAIL", 'result': self.nsr}
self.logger.error(self.nsr)
- self.step_failure("Deploy VNF: ERROR")
+ self.logger.error("Deploy VNF: ERROR")
+ return False
+
self.ob_nsr_id = self.nsr.get("id")
self.logger.info(
"Sleep for 60s to ensure that all services are up and running...")
time.sleep(60)
- return self.details.get("vnf")
+ return True
def test_vnf(self):
# Adaptations probably needed
# code used for cloudify_ims
# ruby client on jumphost calling the vIMS on the SUT
self.logger.info(
- "Testing if %s works properly..." %
- self.nsr.get('name'))
+ "Testing if %s works properly...", self.nsr.get('name'))
for vnfr in self.nsr.get('vnfr'):
self.logger.info(
- "Checking ports %s of VNF %s" %
- (self.ims_conf.get(
- vnfr.get('name')).get('ports'),
- vnfr.get('name')))
+ "Checking ports %s of VNF %s",
+ self.ims_conf.get(vnfr.get('name')).get('ports'),
+ vnfr.get('name'))
for vdu in vnfr.get('vdu'):
for vnfci in vdu.get('vnfc_instance'):
self.logger.debug(
- "Checking ports of VNFC instance %s" %
+ "Checking ports of VNFC instance %s",
vnfci.get('hostname'))
for floatingIp in vnfci.get('floatingIps'):
self.logger.debug(
- "Testing %s:%s" %
- (vnfci.get('hostname'), floatingIp.get('ip')))
+ "Testing %s:%s",
+ vnfci.get('hostname'),
+ floatingIp.get('ip'))
for port in self.ims_conf.get(
vnfr.get('name')).get('ports'):
if servertest(floatingIp.get('ip'), port):
self.logger.info(
- "VNFC instance %s is reachable at %s:%s" %
- (vnfci.get('hostname'),
- floatingIp.get('ip'),
- port))
+ "VNFC instance %s is reachable at %s:%s",
+ vnfci.get('hostname'),
+ floatingIp.get('ip'),
+ port)
else:
self.logger.error(
"VNFC instance %s is not reachable "
- "at %s:%s" % (vnfci.get('hostname'),
- floatingIp.get('ip'), port))
+ "at %s:%s",
+ vnfci.get('hostname'),
+ floatingIp.get('ip'),
+ port)
self.details["test_vnf"] = {
'status': "FAIL", 'result': (
"Port %s of server %s -> %s is "
- "not reachable" %
- (port, vnfci.get('hostname'),
- floatingIp.get('ip')))}
- self.step_failure("Test VNF: ERROR")
+ "not reachable",
+ port,
+ vnfci.get('hostname'),
+ floatingIp.get('ip'))}
+ self.logger.error("Test VNF: ERROR")
+ return False
+
self.details["test_vnf"] = {
'status': "PASS",
'result': "All tests have been executed successfully"}
self.logger.info("Test VNF: OK")
- return self.details.get('test_vnf')
+ return True
def clean(self):
self.main_agent.get_agent(
time.sleep(5)
os_utils.delete_instance(nova_client=os_utils.get_nova_client(),
instance_id=self.ob_instance_id)
- # TODO question is the clean removing also the VM?
+ # question is the clean removing also the VM?
# I think so since is goinf to remove the tenant...
super(ImsVnf, self).clean()
-
- def main(self, **kwargs):
- self.logger.info("Orchestra IMS VNF onboarding test starting")
- self.execute()
- self.logger.info("Orchestra IMS VNF onboarding test executed")
- if self.result is "PASS":
- return self.EX_OK
- else:
- return self.EX_RUN_ERROR
-
- def run(self):
- kwargs = {}
- return self.main(**kwargs)
-
-
-if __name__ == '__main__':
- logging.basicConfig()
- test = ImsVnf()
- test.deploy_orchestrator()
- test.deploy_vnf()
- test.test_vnf()
- test.clean()
# pylint: disable=missing-docstring
import logging
-import os
import unittest
import mock
from functest.core import vnf
from functest.core import testcase
+from functest.utils import constants
class VnfBaseTesting(unittest.TestCase):
+ """The class testing VNF."""
+ # pylint: disable=missing-docstring,too-many-public-methods
+
+ tenant_name = 'test_tenant_name'
+ tenant_description = 'description'
def setUp(self):
- self.test = vnf.VnfOnBoarding(
- project='functest', case_name='aaa')
- self.test.project = "functest"
- self.test.start_time = "1"
- self.test.stop_time = "5"
- self.test.result = ""
- self.test.details = {
- "orchestrator": {"status": "PASS", "result": "", "duration": 20},
- "vnf": {"status": "PASS", "result": "", "duration": 15},
- "test_vnf": {"status": "FAIL", "result": "", "duration": 5}}
- self.test.keystone_client = 'test_client'
- self.test.tenant_name = 'test_tenant_name'
-
- def test_execute_deploy_vnf_fail(self):
+ constants.CONST.__setattr__("vnf_foo_tenant_name", self.tenant_name)
+ constants.CONST.__setattr__(
+ "vnf_foo_tenant_description", self.tenant_description)
+ self.test = vnf.VnfOnBoarding(project='functest', case_name='foo')
+
+ def test_run_deploy_vnf_exc(self):
with mock.patch.object(self.test, 'prepare'),\
mock.patch.object(self.test, 'deploy_orchestrator',
return_value=None), \
mock.patch.object(self.test, 'deploy_vnf',
side_effect=Exception):
- self.assertEqual(self.test.execute(),
+ self.assertEqual(self.test.run(),
testcase.TestCase.EX_TESTCASE_FAILED)
- def test_execute_test_vnf_fail(self):
+ def test_run_test_vnf_exc(self):
with mock.patch.object(self.test, 'prepare'),\
mock.patch.object(self.test, 'deploy_orchestrator',
return_value=None), \
mock.patch.object(self.test, 'deploy_vnf'), \
mock.patch.object(self.test, 'test_vnf',
side_effect=Exception):
- self.assertEqual(self.test.execute(),
+ self.assertEqual(self.test.run(),
testcase.TestCase.EX_TESTCASE_FAILED)
- @mock.patch('functest.core.vnf.os_utils.get_tenant_id',
- return_value='test_tenant_id')
- @mock.patch('functest.core.vnf.os_utils.delete_tenant',
- return_value=True)
- @mock.patch('functest.core.vnf.os_utils.get_user_id',
- return_value='test_user_id')
- @mock.patch('functest.core.vnf.os_utils.delete_user',
- return_value=True)
- def test_execute_default(self, *args):
+ def test_run_deploy_orch_ko(self):
with mock.patch.object(self.test, 'prepare'),\
mock.patch.object(self.test, 'deploy_orchestrator',
- return_value=None), \
- mock.patch.object(self.test, 'deploy_vnf'), \
- mock.patch.object(self.test, 'test_vnf'), \
- mock.patch.object(self.test, 'parse_results',
- return_value='ret_exit_code'), \
- mock.patch.object(self.test, 'log_results'):
- self.assertEqual(self.test.execute(),
- 'ret_exit_code')
-
- @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ return_value=False), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=True), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=True):
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_run_vnf_deploy_ko(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=True), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=False), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=True):
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_run_vnf_test_ko(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=True), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=True), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=False):
+ self.assertEqual(self.test.run(),
+ testcase.TestCase.EX_TESTCASE_FAILED)
+
+ def test_run_default(self):
+ with mock.patch.object(self.test, 'prepare'),\
+ mock.patch.object(self.test, 'deploy_orchestrator',
+ return_value=True), \
+ mock.patch.object(self.test, 'deploy_vnf',
+ return_value=True), \
+ mock.patch.object(self.test, 'test_vnf',
+ return_value=True):
+ self.assertEqual(self.test.run(), testcase.TestCase.EX_OK)
+
+ def test_deploy_vnf_unimplemented(self):
+ with self.assertRaises(vnf.VnfDeploymentException):
+ self.test.deploy_vnf()
+
+ def test_test_vnf_unimplemented(self):
+ with self.assertRaises(vnf.VnfTestException):
+ self.test.test_vnf()
+
@mock.patch('functest.core.vnf.os_utils.get_keystone_client')
- @mock.patch('functest.core.vnf.os_utils.get_user_id', return_value='')
- def test_prepare_missing_userid(self, *args):
- with self.assertRaises(Exception):
- self.test.prepare()
+ @mock.patch('functest.core.vnf.os_utils.delete_user',
+ return_value=True)
+ def test_clean_user_set(self, *args):
+ self.test.user_created = True
+ self.test.clean()
+ args[0].assert_called_once_with(mock.ANY, self.tenant_name)
+ args[1].assert_called_once_with()
- @mock.patch('functest.core.vnf.os_utils.get_credentials')
@mock.patch('functest.core.vnf.os_utils.get_keystone_client')
- @mock.patch('functest.core.vnf.os_utils.get_user_id',
- return_value='test_roleid')
- @mock.patch('functest.core.vnf.os_utils.create_tenant',
- return_value='')
- def test_prepare_missing_tenantid(self, *args):
- with self.assertRaises(Exception):
- self.test.prepare()
+ @mock.patch('functest.core.vnf.os_utils.delete_user',
+ return_value=True)
+ def test_clean_user_unset(self, *args):
+ self.test.user_created = False
+ self.test.clean()
+ args[0].assert_not_called()
+ args[1].assert_called_once_with()
- @mock.patch('functest.core.vnf.os_utils.get_credentials')
@mock.patch('functest.core.vnf.os_utils.get_keystone_client')
- @mock.patch('functest.core.vnf.os_utils.get_user_id',
- return_value='test_roleid')
- @mock.patch('functest.core.vnf.os_utils.create_tenant',
- return_value='test_tenantid')
- @mock.patch('functest.core.vnf.os_utils.get_role_id',
- return_value='')
- def test_prepare_missing_roleid(self, *args):
- with self.assertRaises(Exception):
- self.test.prepare()
+ @mock.patch('functest.core.vnf.os_utils.delete_tenant',
+ return_value=True)
+ def test_clean_tenant_set(self, *args):
+ self.test.tenant_created = True
+ self.test.clean()
+ args[0].assert_called_once_with(mock.ANY, self.tenant_name)
+ args[1].assert_called_once_with()
- @mock.patch('functest.core.vnf.os_utils.get_credentials')
@mock.patch('functest.core.vnf.os_utils.get_keystone_client')
- @mock.patch('functest.core.vnf.os_utils.get_user_id',
- return_value='test_roleid')
- @mock.patch('functest.core.vnf.os_utils.create_tenant',
- return_value='test_tenantid')
- @mock.patch('functest.core.vnf.os_utils.get_role_id',
- return_value='test_roleid')
- @mock.patch('functest.core.vnf.os_utils.add_role_user',
- return_value='')
- def test_prepare_role_add_failure(self, *args):
- with self.assertRaises(Exception):
+ @mock.patch('functest.core.vnf.os_utils.delete_tenant',
+ return_value=True)
+ def test_clean_tenant_unset(self, *args):
+ self.test.tenant_created = False
+ self.test.clean()
+ args[0].assert_not_called()
+ args[1].assert_called_once_with()
+
+ def test_deploy_orch_unimplemented(self):
+ self.assertTrue(self.test.deploy_orchestrator())
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials',
+ return_value={'creds': 'test'})
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client',
+ return_value='test')
+ @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf',
+ return_value=0)
+ @mock.patch('functest.core.vnf.os_utils.get_or_create_user_for_vnf',
+ return_value=0)
+ def test_prepare(self, *args):
+ self.assertEqual(self.test.prepare(),
+ testcase.TestCase.EX_OK)
+ args[0].assert_called_once_with('test', self.tenant_name)
+ args[1].assert_called_once_with(
+ 'test', self.tenant_name, self.tenant_description)
+ args[2].assert_called_once_with()
+ args[3].assert_called_once_with()
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials',
+ side_effect=Exception)
+ def test_prepare_admin_creds_ko(self, *args):
+ with self.assertRaises(vnf.VnfPreparationException):
self.test.prepare()
+ args[0].assert_called_once_with()
+
+ @mock.patch('functest.core.vnf.os_utils.get_credentials',
+ return_value='creds')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client',
+ side_effect=Exception)
+ def test_prepare_keystone_client_ko(self, *args):
+ with self.assertRaises(vnf.VnfPreparationException):
+ self.test.prepare()
+ args[0].assert_called_once_with()
+ args[1].assert_called_once_with()
- @mock.patch('functest.core.vnf.os_utils.get_credentials')
+ @mock.patch('functest.core.vnf.os_utils.get_credentials',
+ return_value='creds')
@mock.patch('functest.core.vnf.os_utils.get_keystone_client')
- @mock.patch('functest.core.vnf.os_utils.get_user_id',
- return_value='test_roleid')
- @mock.patch('functest.core.vnf.os_utils.create_tenant',
- return_value='test_tenantid')
- @mock.patch('functest.core.vnf.os_utils.get_role_id',
- return_value='test_roleid')
- @mock.patch('functest.core.vnf.os_utils.add_role_user')
- @mock.patch('functest.core.vnf.os_utils.create_user',
- return_value='')
- def test_create_user_failure(self, *args):
- with self.assertRaises(Exception):
+ @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf',
+ side_effect=Exception)
+ def test_prepare_tenant_creation_ko(self, *args):
+ with self.assertRaises(vnf.VnfPreparationException):
self.test.prepare()
+ args[0].assert_called_once_with(
+ mock.ANY, self.tenant_name, self.tenant_description)
+ args[1].assert_called_once_with()
+ args[2].assert_called_once_with()
- def test_log_results_default(self):
- with mock.patch('functest.core.vnf.'
- 'ft_utils.logger_test_results') \
- as mock_method:
- self.test.log_results()
- self.assertTrue(mock_method.called)
-
- def test_step_failures_default(self):
- with self.assertRaises(Exception):
- self.test.step_failure("error_msg")
-
- def test_deploy_vnf_unimplemented(self):
- with self.assertRaises(Exception) as context:
- self.test.deploy_vnf()
- self.assertIn('VNF not deployed', str(context.exception))
-
- def test_test_vnf_unimplemented(self):
- with self.assertRaises(Exception) as context:
- self.test.test_vnf()()
- self.assertIn('VNF not tested', str(context.exception))
-
- def test_parse_results_ex_ok(self):
- self.test.details['test_vnf']['status'] = 'PASS'
- self.assertEqual(self.test.parse_results(), os.EX_OK)
-
- def test_parse_results_ex_run_error(self):
- self.test.details['vnf']['status'] = 'FAIL'
- self.assertEqual(self.test.parse_results(), os.EX_SOFTWARE)
+ @mock.patch('functest.core.vnf.os_utils.get_credentials',
+ return_value='creds')
+ @mock.patch('functest.core.vnf.os_utils.get_keystone_client')
+ @mock.patch('functest.core.vnf.os_utils.get_or_create_tenant_for_vnf',
+ return_value=0)
+ @mock.patch('functest.core.vnf.os_utils.get_or_create_user_for_vnf',
+ side_effect=Exception)
+ def test_prepare_user_creation_ko(self, *args):
+ with self.assertRaises(vnf.VnfPreparationException):
+ self.test.prepare()
+ args[0].assert_called_once_with(mock.ANY, self.tenant_name)
+ args[1].assert_called_once_with(
+ mock.ANY, self.tenant_name, self.tenant_description)
+ args[2].assert_called_once_with()
+ args[3].assert_called_once_with()
if __name__ == "__main__":
None)
self.assertTrue(mock_logger_error.called)
+ def test_get_or_create_user_for_vnf_get(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'get_user_id',
+ return_value='user_id'), \
+ mock.patch('functest.utils.openstack_utils.get_tenant_id',
+ return_value='tenant_id'):
+ self.assertFalse(openstack_utils.
+ get_or_create_user_for_vnf(self.keystone_client,
+ 'my_vnf'))
+
+ def test_get_or_create_user_for_vnf_create(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'get_user_id',
+ return_value=None), \
+ mock.patch('functest.utils.openstack_utils.get_tenant_id',
+ return_value='tenant_id'):
+ self.assertTrue(openstack_utils.
+ get_or_create_user_for_vnf(self.keystone_client,
+ 'my_vnf'))
+
+ def test_get_or_create_user_for_vnf_error_get_user_id(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'get_user_id',
+ side_effect=Exception):
+ self.assertRaises(Exception)
+
+ def test_get_or_create_user_for_vnf_error_get_tenant_id(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'get_user_id',
+ return_value='user_id'), \
+ mock.patch('functest.utils.openstack_utils.get_tenant_id',
+ side_effect='Exception'):
+ self.assertRaises(Exception)
+
+ def test_get_or_create_tenant_for_vnf_get(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'get_tenant_id',
+ return_value='tenant_id'):
+ self.assertFalse(
+ openstack_utils.get_or_create_tenant_for_vnf(
+ self.keystone_client, 'tenant_name', 'tenant_description'))
+
+ def test_get_or_create_tenant_for_vnf_create(self):
+ with mock.patch('functest.utils.openstack_utils.get_tenant_id',
+ return_value=None):
+ self.assertTrue(
+ openstack_utils.get_or_create_tenant_for_vnf(
+ self.keystone_client, 'tenant_name', 'tenant_description'))
+
+ def test_get_or_create_tenant_for_vnf_error_get_tenant_id(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'get_tenant_id',
+ side_effect=Exception):
+ self.assertRaises(Exception)
+
+ def test_download_and_add_image_on_glance_image_creation_failure(self):
+ with mock.patch('functest.utils.openstack_utils.'
+ 'os.makedirs'), \
+ mock.patch('functest.utils.openstack_utils.'
+ 'ft_utils.download_url',
+ return_value=True), \
+ mock.patch('functest.utils.openstack_utils.'
+ 'create_glance_image',
+ return_value=''):
+ resp = openstack_utils.download_and_add_image_on_glance(
+ self.glance_client,
+ 'image_name',
+ 'http://url',
+ 'data_dir')
+ self.assertEqual(resp, False)
+
if __name__ == "__main__":
logging.disable(logging.CRITICAL)
mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
'os_utils.get_image_id',
return_value=''), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
+ mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.os_utils.'
'download_and_add_image_on_glance') as m, \
self.assertRaises(Exception) as context:
self.ims_vnf.deploy_orchestrator()
{'status': 'PASS',
'result': 'vims_test_result'})
- def test_download_and_add_image_on_glance_incorrect_url(self):
- with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'os.makedirs'), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'ft_utils.download_url',
- return_value=False):
- resp = cloudify_ims.download_and_add_image_on_glance(self.
- glance_client,
- 'image_name',
- 'http://url',
- 'data_dir')
- self.assertEqual(resp, False)
-
- def test_download_and_add_image_on_glance_image_creation_failure(self):
- with mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'os.makedirs'), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'ft_utils.download_url',
- return_value=True), \
- mock.patch('functest.opnfv_tests.vnf.ims.cloudify_ims.'
- 'os_utils.create_glance_image',
- return_value=''):
- resp = cloudify_ims.download_and_add_image_on_glance(self.
- glance_client,
- 'image_name',
- 'http://url',
- 'data_dir')
- self.assertEqual(resp, False)
-
if __name__ == "__main__":
logging.disable(logging.CRITICAL)
'cookies': ""}
self.mock_post_200.configure_mock(**attrs)
- def test_create_ellis_number_failure(self):
- with mock.patch('functest.opnfv_tests.vnf.ims.'
- 'clearwater_ims_base.requests.post',
- return_value=self.mock_post_500), \
- self.assertRaises(Exception) as context:
- self.ims_vnf.create_ellis_number()
-
- msg = "Unable to create a number:"
- self.assertTrue(msg in context.exception)
-
- def _get_post_status(self, url, cookies='', data=''):
- ellis_url = "http://test_ellis_ip/session"
- if url == ellis_url:
- return self.mock_post_200
- return self.mock_post
-
-
if __name__ == "__main__":
logging.disable(logging.CRITICAL)
unittest.main(verbosity=2)
return heatclient.Client(get_heat_client_version(), session=sess)
+def download_and_add_image_on_glance(glance, image_name, image_url, data_dir):
+ dest_path = data_dir
+ if not os.path.exists(dest_path):
+ os.makedirs(dest_path)
+ file_name = image_url.rsplit('/')[-1]
+ if not ft_utils.download_url(image_url, dest_path):
+ return False
+
+ image = create_glance_image(
+ glance, image_name, dest_path + file_name)
+ if not image:
+ return False
+
+ return image
+
+
# *********************************************
# NOVA
# *********************************************
return tenant_id
+def get_or_create_tenant_for_vnf(keystone_client, tenant_name,
+ tenant_description):
+ """Get or Create a Tenant
+
+ Args:
+ keystone_client: keystone client reference
+ tenant_name: the name of the tenant
+ tenant_description: the description of the tenant
+
+ return False if tenant retrieved though get
+ return True if tenant created
+ raise Exception if error during processing
+ """
+ try:
+ tenant_id = get_tenant_id(keystone_client, tenant_name)
+ if not tenant_id:
+ tenant_id = create_tenant(keystone_client, tenant_name,
+ tenant_description)
+ return True
+ else:
+ return False
+ except:
+ raise Exception("Impossible to create a Tenant for the VNF {}".format(
+ tenant_name))
+
+
def create_user(keystone_client, user_name, user_password,
user_email, tenant_id):
try:
return user_id
+def get_or_create_user_for_vnf(keystone_client, vnf_ref):
+ """Get or Create user for VNF
+
+ Args:
+ keystone_client: keystone client reference
+ vnf_ref: VNF reference used as user name & password, tenant name
+
+ return False if user retrieved through get
+ return True if user created
+ raise Exception if error during processing
+ """
+ try:
+ user_id = get_user_id(keystone_client, vnf_ref)
+ tenant_id = get_tenant_id(keystone_client, vnf_ref)
+ if not user_id:
+ user_id = create_user(keystone_client, vnf_ref, vnf_ref,
+ "", tenant_id)
+ return True
+ else:
+ return False
+ add_role_user(keystone_client, user_id, 'admin', vnf_ref)
+ except:
+ raise Exception("Impossible to create a user for the VNF {}".format(
+ vnf_ref))
+
+
def add_role_user(keystone_client, user_id, role_id, tenant_id):
try:
if is_keystone_v3():
{[testenv]deps}
whitelist_externals = bash
modules =
- functest.core.feature
- functest.core.testcase
- functest.core.unit
+ functest.core
functest.opnfv_tests.sdn.odl
- functest.tests.unit.core.test_feature
- functest.tests.unit.core.test_testcase
- functest.tests.unit.core.test_unit
+ functest.tests.unit.core
functest.tests.unit.odl
functest.tests.unit.utils.test_decorators
functest.utils.decorators