import os_client_config
import shade
+from tempest.lib.common.utils import data_utils
from xtesting.core import testcase
from functest.utils import config
from functest.utils import env
+from functest.utils import functest_utils
-class NewProject(object):
+class NewProject():
"""Ease creating new projects/users"""
# pylint: disable=too-many-instance-attributes
self.user = None
self.password = None
self.domain = None
- self.role = None
self.role_name = None
self.default_member = env.get('NEW_USER_ROLE')
"""Create projects/users"""
assert self.orig_cloud
assert self.case_name
- self.password = str(uuid.uuid4())
+ self.password = data_utils.rand_password().replace('%', '!')
+ self.__logger.debug("password: %s", self.password)
self.domain = self.orig_cloud.get_domain(
name_or_id=self.orig_cloud.auth.get(
"project_domain_name", "Default"))
raise Exception("Cannot detect {}".format(self.default_member))
except Exception: # pylint: disable=broad-except
self.__logger.info("Creating default role %s", self.default_member)
- self.role = self.orig_cloud.create_role(self.default_member)
- self.role_name = self.role.name
- self.__logger.debug("role: %s", self.role)
+ role = self.orig_cloud.create_role(self.default_member)
+ self.role_name = role.name
+ self.__logger.debug("role: %s", role)
self.orig_cloud.grant_role(
self.role_name, user=self.user.id, project=self.project.id,
domain=self.domain.id)
cloud_config=osconfig.get_one_cloud())
self.__logger.debug("new cloud %s", self.cloud.auth)
+ def get_environ(self):
+ "Get new environ"
+ environ = dict(
+ os.environ,
+ OS_USERNAME=self.user.name,
+ OS_PROJECT_NAME=self.project.name,
+ OS_PROJECT_ID=self.project.id,
+ OS_PASSWORD=self.password)
+ try:
+ del environ['OS_TENANT_NAME']
+ del environ['OS_TENANT_ID']
+ except Exception: # pylint: disable=broad-except
+ pass
+ return environ
+
def clean(self):
"""Remove projects/users"""
try:
self.orig_cloud.delete_user(self.user.id)
if self.project:
self.orig_cloud.delete_project(self.project.id)
- if self.role:
- self.orig_cloud.delete_role(self.role.id)
+ secgroups = self.orig_cloud.list_security_groups(
+ filters={'name': 'default',
+ 'project_id': self.project.id})
+ if secgroups:
+ sec_id = secgroups[0].id
+ self.orig_cloud.delete_security_group(sec_id)
except Exception: # pylint: disable=broad-except
self.__logger.exception("Cannot clean all resources")
__logger = logging.getLogger(__name__)
cidr = '192.168.120.0/24'
shared_network = False
+ allow_no_fip = False
def __init__(self, **kwargs):
if "case_name" not in kwargs:
kwargs["case_name"] = 'tenantnetwork1'
super(TenantNetwork1, self).__init__(**kwargs)
- self.res_dir = os.path.join(
- getattr(config.CONF, 'dir_results'), self.case_name)
+ self.dir_results = os.path.join(getattr(config.CONF, 'dir_results'))
+ self.res_dir = os.path.join(self.dir_results, self.case_name)
+ self.output_log_name = 'functest.log'
+ self.output_debug_log_name = 'functest.debug.log'
try:
cloud_config = os_client_config.get_config()
- self.cloud = shade.OpenStackCloud(cloud_config=cloud_config)
+ self.cloud = self.orig_cloud = shade.OpenStackCloud(
+ cloud_config=cloud_config)
except Exception: # pylint: disable=broad-except
- self.cloud = None
+ self.cloud = self.orig_cloud = None
self.ext_net = None
self.__logger.exception("Cannot connect to Cloud")
try:
self.ext_net = self.get_external_network(self.cloud)
except Exception: # pylint: disable=broad-except
+ self.ext_net = None
self.__logger.exception("Cannot get the external network")
self.guid = str(uuid.uuid4())
self.network = None
@staticmethod
def get_public_auth_url(cloud):
"""Get Keystone public endpoint"""
- keystone_id = cloud.search_services('keystone')[0].id
+ keystone_id = functest_utils.search_services(cloud, 'keystone')[0].id
endpoint = cloud.search_endpoints(
filters={'interface': 'public',
'service_id': keystone_id})[0].url
Raises: expection on error
"""
assert self.cloud
- assert self.ext_net
+ if not self.allow_no_fip:
+ assert self.ext_net
provider = {}
if hasattr(config.CONF, '{}_network_type'.format(self.case_name)):
provider["network_type"] = getattr(
if hasattr(config.CONF, '{}_segmentation_id'.format(self.case_name)):
provider["segmentation_id"] = getattr(
config.CONF, '{}_segmentation_id'.format(self.case_name))
- self.network = self.cloud.create_network(
+ domain = self.orig_cloud.get_domain(
+ name_or_id=self.orig_cloud.auth.get(
+ "project_domain_name", "Default"))
+ project = self.orig_cloud.get_project(
+ self.cloud.auth['project_name'],
+ domain_id=domain.id)
+ self.network = self.orig_cloud.create_network(
'{}-net_{}'.format(self.case_name, self.guid),
- provider=provider,
+ provider=provider, project_id=project.id,
shared=self.shared_network)
self.__logger.debug("network: %s", self.network)
self.router = self.cloud.create_router(
name='{}-router_{}'.format(self.case_name, self.guid),
- ext_gateway_net_id=self.ext_net.id)
+ ext_gateway_net_id=self.ext_net.id if self.ext_net else None)
self.__logger.debug("router: %s", self.router)
self.cloud.add_router_interface(self.router, subnet_id=self.subnet.id)