-
-
-class TempestResourcesManager(object):
- """Tempest resource manager."""
- def __init__(self, **kwargs):
- self.os_creds = kwargs.get('os_creds') or snaps_utils.get_credentials()
- self.guid = '-' + str(uuid.uuid4())
- self.creators = list()
- self.cirros_image_config = getattr(
- config.CONF, 'snaps_images_cirros', None)
-
- def _create_project(self):
- """Create project for tests."""
- project_creator = deploy_utils.create_project(
- self.os_creds, ProjectConfig(
- name=getattr(
- config.CONF, 'tempest_identity_tenant_name') + self.guid,
- description=getattr(
- config.CONF, 'tempest_identity_tenant_description'),
- domain=self.os_creds.project_domain_name))
- if project_creator is None or project_creator.get_project() is None:
- raise Exception("Failed to create tenant")
- self.creators.append(project_creator)
- return project_creator.get_project().id
-
- def _create_user(self):
- """Create user for tests."""
- user_creator = deploy_utils.create_user(
- self.os_creds, UserConfig(
- name=getattr(
- config.CONF, 'tempest_identity_user_name') + self.guid,
- password=getattr(
- config.CONF, 'tempest_identity_user_password'),
- project_name=getattr(
- config.CONF, 'tempest_identity_tenant_name') + self.guid,
- domain_name=self.os_creds.user_domain_name))
- if user_creator is None or user_creator.get_user() is None:
- raise Exception("Failed to create user")
- self.creators.append(user_creator)
- return user_creator.get_user().id
-
- def _create_network(self, project_name):
- """Create network for tests."""
- tempest_network_type = None
- tempest_physical_network = None
- tempest_segmentation_id = None
-
- tempest_network_type = getattr(
- config.CONF, 'tempest_network_type', None)
- tempest_physical_network = getattr(
- config.CONF, 'tempest_physical_network', None)
- tempest_segmentation_id = getattr(
- config.CONF, 'tempest_segmentation_id', None)
- tempest_net_name = getattr(
- config.CONF, 'tempest_private_net_name') + self.guid
-
- network_creator = deploy_utils.create_network(
- self.os_creds, NetworkConfig(
- name=tempest_net_name,
- project_name=project_name,
- network_type=tempest_network_type,
- physical_network=tempest_physical_network,
- segmentation_id=tempest_segmentation_id,
- subnet_settings=[SubnetConfig(
- name=getattr(
- config.CONF,
- 'tempest_private_subnet_name') + self.guid,
- project_name=project_name,
- cidr=getattr(
- config.CONF, 'tempest_private_subnet_cidr'),
- dns_nameservers=[env.get('NAMESERVER')])]))
- if network_creator is None or network_creator.get_network() is None:
- raise Exception("Failed to create private network")
- self.creators.append(network_creator)
- return tempest_net_name
-
- def _create_image(self, name):
- """Create image for tests"""
- os_image_settings = openstack_tests.cirros_image_settings(
- name, public=True,
- image_metadata=self.cirros_image_config)
- image_creator = deploy_utils.create_image(
- self.os_creds, os_image_settings)
- if image_creator is None:
- raise Exception('Failed to create image')
- self.creators.append(image_creator)
- return image_creator.get_image().id
-
- def _create_flavor(self, name):
- """Create flavor for tests."""
- flavor_metadata = getattr(config.CONF, 'flavor_extra_specs', None)
- flavor_creator = OpenStackFlavor(
- self.os_creds, FlavorConfig(
- name=name,
- ram=getattr(config.CONF, 'openstack_flavor_ram'),
- disk=getattr(config.CONF, 'openstack_flavor_disk'),
- vcpus=getattr(config.CONF, 'openstack_flavor_vcpus'),
- metadata=flavor_metadata))
- flavor = flavor_creator.create()
- if flavor is None:
- raise Exception('Failed to create flavor')
- self.creators.append(flavor_creator)
- return flavor.id
-
- def create(self, create_project=False):
- """Create resources for Tempest test suite."""
- result = {
- 'tempest_net_name': None,
- 'image_id': None,
- 'image_id_alt': None,
- 'flavor_id': None,
- 'flavor_id_alt': None
- }
- project_name = None
-
- if create_project:
- LOGGER.debug("Creating project and user for Tempest suite")
- project_name = getattr(
- config.CONF, 'tempest_identity_tenant_name') + self.guid
- result['project_id'] = self._create_project()
- result['user_id'] = self._create_user()
- result['tenant_id'] = result['project_id'] # for compatibility
-
- LOGGER.debug("Creating private network for Tempest suite")
- result['tempest_net_name'] = self._create_network(project_name)
-
- LOGGER.debug("Creating two images for Tempest suite")
- image_name = getattr(config.CONF, 'openstack_image_name') + self.guid
- result['image_id'] = self._create_image(image_name)
- image_name = getattr(
- config.CONF, 'openstack_image_name_alt') + self.guid
- result['image_id_alt'] = self._create_image(image_name)
-
- LOGGER.info("Creating two flavors for Tempest suite")
- name = getattr(config.CONF, 'openstack_flavor_name') + self.guid
- result['flavor_id'] = self._create_flavor(name)
-
- name = getattr(
- config.CONF, 'openstack_flavor_name_alt') + self.guid
- result['flavor_id_alt'] = self._create_flavor(name)
-
- return result
-
- def cleanup(self):
- """
- Cleanup all OpenStack objects. Should be called on completion.
- """
- for creator in reversed(self.creators):
- try:
- creator.clean()
- except Exception as err: # pylint: disable=broad-except
- LOGGER.error('Unexpected error cleaning - %s', err)