- self.MODE = "defcore"
- self.OPTION = ["--concurrency", "1"]
-
-
-class TempestResourcesManager(object):
-
- def __init__(self, **kwargs):
- self.os_creds = kwargs.get('os_creds') or snaps_utils.get_credentials()
- self.guid = '-' + str(uuid.uuid4())
- self.creators = list()
-
- if hasattr(CONST, 'snaps_images_cirros'):
- self.cirros_image_config = CONST.__getattribute__(
- 'snaps_images_cirros')
- else:
- self.cirros_image_config = None
-
- def _create_project(self):
- """Create project for tests."""
- project_creator = deploy_utils.create_project(
- self.os_creds, ProjectConfig(
- name=CONST.__getattribute__(
- 'tempest_identity_tenant_name') + self.guid,
- description=CONST.__getattribute__(
- 'tempest_identity_tenant_description')))
- if project_creator is None or project_creator.get_project() is None:
- raise Exception("Failed to create tenant")
- self.creators.append(project_creator)
- return project_creator.get_project().id
-
- def _create_user(self):
- """Create user for tests."""
- user_creator = deploy_utils.create_user(
- self.os_creds, UserConfig(
- name=CONST.__getattribute__(
- 'tempest_identity_user_name') + self.guid,
- password=CONST.__getattribute__(
- 'tempest_identity_user_password'),
- project_name=CONST.__getattribute__(
- 'tempest_identity_tenant_name') + self.guid))
- if user_creator is None or user_creator.get_user() is None:
- raise Exception("Failed to create user")
- self.creators.append(user_creator)
- return user_creator.get_user().id
-
- def _create_network(self, project_name):
- """Create network for tests."""
- tempest_network_type = None
- tempest_physical_network = None
- tempest_segmentation_id = None
-
- if hasattr(CONST, 'tempest_network_type'):
- tempest_network_type = CONST.__getattribute__(
- 'tempest_network_type')
- if hasattr(CONST, 'tempest_physical_network'):
- tempest_physical_network = CONST.__getattribute__(
- 'tempest_physical_network')
- if hasattr(CONST, 'tempest_segmentation_id'):
- tempest_segmentation_id = CONST.__getattribute__(
- 'tempest_segmentation_id')
-
- network_creator = deploy_utils.create_network(
- self.os_creds, NetworkConfig(
- name=CONST.__getattribute__(
- 'tempest_private_net_name') + self.guid,
- project_name=project_name,
- network_type=tempest_network_type,
- physical_network=tempest_physical_network,
- segmentation_id=tempest_segmentation_id,
- subnet_settings=[SubnetConfig(
- name=CONST.__getattribute__(
- 'tempest_private_subnet_name') + self.guid,
- project_name=project_name,
- cidr=CONST.__getattribute__(
- 'tempest_private_subnet_cidr'))]))
- if network_creator is None or network_creator.get_network() is None:
- raise Exception("Failed to create private network")
- self.creators.append(network_creator)
-
- def _create_image(self, name):
- """Create image for tests"""
- os_image_settings = openstack_tests.cirros_image_settings(
- name, public=True,
- image_metadata=self.cirros_image_config)
- image_creator = deploy_utils.create_image(
- self.os_creds, os_image_settings)
- if image_creator is None:
- raise Exception('Failed to create image')
- self.creators.append(image_creator)
- return image_creator.get_image().id
-
- def _create_flavor(self, name):
- """Create flavor for tests."""
- scenario = CONST.__getattribute__('DEPLOY_SCENARIO')
- flavor_metadata = None
- if 'ovs' in scenario or 'fdio' in scenario:
- flavor_metadata = create_flavor.MEM_PAGE_SIZE_LARGE
- flavor_creator = OpenStackFlavor(
- self.os_creds, FlavorConfig(
- name=name,
- ram=CONST.__getattribute__('openstack_flavor_ram'),
- disk=CONST.__getattribute__('openstack_flavor_disk'),
- vcpus=CONST.__getattribute__('openstack_flavor_vcpus'),
- metadata=flavor_metadata))
- flavor = flavor_creator.create()
- if flavor is None:
- raise Exception('Failed to create flavor')
- self.creators.append(flavor_creator)
- return flavor.id
-
- def create(self, use_custom_images=False, use_custom_flavors=False,
- create_project=False):
- """Create resources for Tempest test suite."""
- result = {
- 'image_id': None,
- 'image_id_alt': None,
- 'flavor_id': None,
- 'flavor_id_alt': None
- }
- project_name = None
-
- if create_project:
- logger.debug("Creating project and user for Tempest suite")
- project_name = CONST.__getattribute__(
- 'tempest_identity_tenant_name') + self.guid
- result['project_id'] = self._create_project()
- result['user_id'] = self._create_user()
- result['tenant_id'] = result['project_id'] # for compatibility
-
- logger.debug("Creating private network for Tempest suite")
- self._create_network(project_name)
-
- logger.debug("Creating image for Tempest suite")
- image_name = CONST.__getattribute__('openstack_image_name') + self.guid
- result['image_id'] = self._create_image(image_name)
-
- if use_custom_images:
- logger.debug("Creating 2nd image for Tempest suite")
- image_name = CONST.__getattribute__(
- 'openstack_image_name_alt') + self.guid
- result['image_id_alt'] = self._create_image(image_name)
-
- if (CONST.__getattribute__('tempest_use_custom_flavors') == 'True' or
- use_custom_flavors):
- logger.info("Creating flavor for Tempest suite")
- name = CONST.__getattribute__('openstack_flavor_name') + self.guid
- result['flavor_id'] = self._create_flavor(name)
-
- if use_custom_flavors:
- logger.info("Creating 2nd flavor for Tempest suite")
- scenario = CONST.__getattribute__('DEPLOY_SCENARIO')
- if 'ovs' in scenario or 'fdio' in scenario:
- CONST.__setattr__('openstack_flavor_ram', 1024)
- name = CONST.__getattribute__(
- 'openstack_flavor_name_alt') + self.guid
- result['flavor_id_alt'] = self._create_flavor(name)
-
- return result
-
- def cleanup(self):
- """
- Cleanup all OpenStack objects. Should be called on completion.
- """
- for creator in reversed(self.creators):
- try:
- creator.clean()
- except Exception as e:
- logger.error('Unexpected error cleaning - %s', e)