"""Ease deploying tenant networks
-It offers a simple way to create all tenant network ressources required by a
+It offers a simple way to create all tenant network resources required by a
testcase (including all Functest ones):
+
- TenantNetwork1 selects the user and the project set as env vars
- - TenantNetwork2 creates a user and project to isolate the same ressources
+ - TenantNetwork2 creates a user and project to isolate the same resources
This classes could be reused by more complexed scenarios (Single VM)
"""
import os_client_config
import shade
+from tempest.lib.common.utils import data_utils
from xtesting.core import testcase
from functest.utils import config
from functest.utils import env
+from functest.utils import functest_utils
-class NewProject(object):
+class NewProject():
"""Ease creating new projects/users"""
# pylint: disable=too-many-instance-attributes
self.user = None
self.password = None
self.domain = None
- self.role = None
self.role_name = None
self.default_member = env.get('NEW_USER_ROLE')
"""Create projects/users"""
assert self.orig_cloud
assert self.case_name
- self.password = str(uuid.uuid4())
+ self.password = data_utils.rand_password().replace('%', '!')
+ self.__logger.debug("password: %s", self.password)
self.domain = self.orig_cloud.get_domain(
name_or_id=self.orig_cloud.auth.get(
"project_domain_name", "Default"))
self.project = self.orig_cloud.create_project(
- name='{}-project_{}'.format(self.case_name, self.guid),
- description="Created by OPNFV Functest: {}".format(
- self.case_name),
+ name=f'{self.case_name[:18]}-project_{self.guid}',
+ description=f"Created by OPNFV Functest: {self.case_name}",
domain_id=self.domain.id)
self.__logger.debug("project: %s", self.project)
self.user = self.orig_cloud.create_user(
- name='{}-user_{}'.format(self.case_name, self.guid),
+ name=f'{self.case_name}-user_{self.guid}',
password=self.password,
domain_id=self.domain.id)
self.__logger.debug("user: %s", self.user)
elif self.orig_cloud.get_role(self.default_member.lower()):
self.role_name = self.default_member.lower()
else:
- raise Exception("Cannot detect {}".format(self.default_member))
+ raise Exception(f"Cannot detect {self.default_member}")
except Exception: # pylint: disable=broad-except
self.__logger.info("Creating default role %s", self.default_member)
- self.role = self.orig_cloud.create_role(self.default_member)
- self.role_name = self.role.name
- self.__logger.debug("role: %s", self.role)
+ role = self.orig_cloud.create_role(self.default_member)
+ self.role_name = role.name
+ self.__logger.debug("role: %s", role)
self.orig_cloud.grant_role(
self.role_name, user=self.user.id, project=self.project.id,
domain=self.domain.id)
cloud_config=osconfig.get_one_cloud())
self.__logger.debug("new cloud %s", self.cloud.auth)
+ def get_environ(self):
+ "Get new environ"
+ environ = dict(
+ os.environ,
+ OS_USERNAME=self.user.name,
+ OS_PROJECT_NAME=self.project.name,
+ OS_PROJECT_ID=self.project.id,
+ OS_PASSWORD=self.password)
+ try:
+ del environ['OS_TENANT_NAME']
+ del environ['OS_TENANT_ID']
+ except Exception: # pylint: disable=broad-except
+ pass
+ return environ
+
def clean(self):
"""Remove projects/users"""
try:
self.orig_cloud.delete_user(self.user.id)
if self.project:
self.orig_cloud.delete_project(self.project.id)
- if self.role:
- self.orig_cloud.delete_role(self.role.id)
+ secgroups = self.orig_cloud.list_security_groups(
+ filters={'name': 'default',
+ 'project_id': self.project.id})
+ if secgroups:
+ sec_id = secgroups[0].id
+ self.orig_cloud.delete_security_group(sec_id)
except Exception: # pylint: disable=broad-except
- self.__logger.exception("Cannot clean all ressources")
+ self.__logger.exception("Cannot clean all resources")
class TenantNetwork1(testcase.TestCase):
# pylint: disable=too-many-instance-attributes
"""Create a tenant network (scenario1)
- It creates and configures all tenant network ressources required by
+ It creates and configures all tenant network resources required by
advanced testcases (subnet, network and router).
It ensures that all testcases inheriting from TenantNetwork1 could work
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'tenantnetwork1'
- super(TenantNetwork1, self).__init__(**kwargs)
- self.res_dir = os.path.join(
- getattr(config.CONF, 'dir_results'), self.case_name)
+ super().__init__(**kwargs)
+ self.dir_results = os.path.join(getattr(config.CONF, 'dir_results'))
+ self.res_dir = os.path.join(self.dir_results, self.case_name)
+ self.output_log_name = 'functest.log'
+ self.output_debug_log_name = 'functest.debug.log'
+ self.ext_net = None
try:
cloud_config = os_client_config.get_config()
- self.cloud = shade.OpenStackCloud(cloud_config=cloud_config)
+ self.cloud = self.orig_cloud = shade.OpenStackCloud(
+ cloud_config=cloud_config)
except Exception: # pylint: disable=broad-except
- self.cloud = None
- self.ext_net = None
+ self.cloud = self.orig_cloud = None
self.__logger.exception("Cannot connect to Cloud")
- try:
- self.ext_net = self.get_external_network(self.cloud)
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Cannot get the external network")
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ try:
+ self.ext_net = self.get_external_network(self.cloud)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Cannot get the external network")
self.guid = str(uuid.uuid4())
self.network = None
self.subnet = None
@staticmethod
def get_public_auth_url(cloud):
"""Get Keystone public endpoint"""
- keystone_id = cloud.search_services('keystone')[0].id
+ keystone_id = functest_utils.search_services(cloud, 'keystone')[0].id
endpoint = cloud.search_endpoints(
filters={'interface': 'public',
'service_id': keystone_id})[0].url
return endpoint
- def _create_network_resources(self):
+ def create_network_resources(self):
+ """Create all tenant network resources
+
+ It creates a router which gateway is the external network detected.
+ The new subnet is attached to that router.
+
+ Raises: expection on error
+ """
assert self.cloud
- assert self.ext_net
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ assert self.ext_net
provider = {}
- if hasattr(config.CONF, '{}_network_type'.format(self.case_name)):
+ if hasattr(config.CONF, f'{self.case_name}_network_type'):
provider["network_type"] = getattr(
- config.CONF, '{}_network_type'.format(self.case_name))
- if hasattr(config.CONF, '{}_physical_network'.format(self.case_name)):
+ config.CONF, f'{self.case_name}_network_type')
+ if hasattr(config.CONF, f'{self.case_name}_physical_network'):
provider["physical_network"] = getattr(
- config.CONF, '{}_physical_network'.format(self.case_name))
- if hasattr(config.CONF, '{}_segmentation_id'.format(self.case_name)):
+ config.CONF, f'{self.case_name}_physical_network')
+ if hasattr(config.CONF, f'{self.case_name}_segmentation_id'):
provider["segmentation_id"] = getattr(
- config.CONF, '{}_segmentation_id'.format(self.case_name))
- self.network = self.cloud.create_network(
- '{}-net_{}'.format(self.case_name, self.guid),
- provider=provider,
+ config.CONF, f'{self.case_name}_segmentation_id')
+ domain = self.orig_cloud.get_domain(
+ name_or_id=self.orig_cloud.auth.get(
+ "project_domain_name", "Default"))
+ project = self.orig_cloud.get_project(
+ self.cloud.auth['project_name'],
+ domain_id=domain.id)
+ self.network = self.orig_cloud.create_network(
+ f'{self.case_name}-net_{self.guid}',
+ provider=provider, project_id=project.id,
shared=self.shared_network)
self.__logger.debug("network: %s", self.network)
self.subnet = self.cloud.create_subnet(
self.network.id,
- subnet_name='{}-subnet_{}'.format(self.case_name, self.guid),
+ subnet_name=f'{self.case_name}-subnet_{self.guid}',
cidr=getattr(
- config.CONF, '{}_private_subnet_cidr'.format(self.case_name),
+ config.CONF, f'{self.case_name}_private_subnet_cidr',
self.cidr),
enable_dhcp=True,
dns_nameservers=[env.get('NAMESERVER')])
self.__logger.debug("subnet: %s", self.subnet)
self.router = self.cloud.create_router(
- name='{}-router_{}'.format(self.case_name, self.guid),
- ext_gateway_net_id=self.ext_net.id)
+ name=f'{self.case_name}-router_{self.guid}',
+ ext_gateway_net_id=self.ext_net.id if self.ext_net else None)
self.__logger.debug("router: %s", self.router)
self.cloud.add_router_interface(self.router, subnet_id=self.subnet.id)
try:
assert self.cloud
self.start_time = time.time()
- self._create_network_resources()
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ self.create_network_resources()
self.result = 100
status = testcase.TestCase.EX_OK
except Exception: # pylint: disable=broad-except
if self.network:
self.cloud.delete_network(self.network.id)
except Exception: # pylint: disable=broad-except
- self.__logger.exception("cannot clean all ressources")
+ self.__logger.exception("cannot clean all resources")
class TenantNetwork2(TenantNetwork1):
"""Create a tenant network (scenario2)
It creates new user/project before creating and configuring all tenant
- network ressources required by a testcase (subnet, network and router).
+ network resources required by a testcase (subnet, network and router).
It ensures that all testcases inheriting from TenantNetwork2 could work
without network specific configurations (or at least read the same config
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'tenantnetwork2'
- super(TenantNetwork2, self).__init__(**kwargs)
+ super().__init__(**kwargs)
try:
assert self.cloud
self.project = NewProject(
def clean(self):
try:
- super(TenantNetwork2, self).clean()
+ super().clean()
assert self.project
self.project.clean()
except Exception: # pylint: disable=broad-except
- self.__logger.exception("Cannot clean all ressources")
+ self.__logger.exception("Cannot clean all resources")