import shutil
import subprocess
import time
-import uuid
-
-from snaps.config.flavor import FlavorConfig
-from snaps.config.network import NetworkConfig, SubnetConfig
-from snaps.config.project import ProjectConfig
-from snaps.config.user import UserConfig
-from snaps.openstack.create_flavor import OpenStackFlavor
-from snaps.openstack.tests import openstack_tests
-from snaps.openstack.utils import deploy_utils
+
+from six.moves import configparser
from xtesting.core import testcase
import yaml
-from functest.opnfv_tests.openstack.snaps import snaps_utils
+from functest.core import singlevm
from functest.opnfv_tests.openstack.tempest import conf_utils
from functest.utils import config
from functest.utils import env
+from functest.utils import functest_utils
LOGGER = logging.getLogger(__name__)
-class TempestCommon(testcase.TestCase):
+class TempestCommon(singlevm.VmReady2):
# pylint: disable=too-many-instance-attributes
"""TempestCommon testcases implementation class."""
- TEMPEST_RESULTS_DIR = os.path.join(
- getattr(config.CONF, 'dir_results'), 'tempest')
+ visibility = 'public'
+ filename_alt = '/home/opnfv/functest/images/cirros-0.4.0-x86_64-disk.img'
def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'tempest'
super(TempestCommon, self).__init__(**kwargs)
- self.resources = TempestResourcesManager(**kwargs)
- self.mode = ""
- self.option = []
- self.verifier_id = conf_utils.get_verifier_id()
- self.verifier_repo_dir = conf_utils.get_verifier_repo_dir(
- self.verifier_id)
- self.deployment_id = conf_utils.get_verifier_deployment_id()
- self.deployment_dir = conf_utils.get_verifier_deployment_dir(
- self.verifier_id, self.deployment_id)
+ assert self.orig_cloud
+ assert self.cloud
+ assert self.project
+ if self.orig_cloud.get_role("admin"):
+ self.role_name = "admin"
+ elif self.orig_cloud.get_role("Admin"):
+ self.role_name = "Admin"
+ else:
+ raise Exception("Cannot detect neither admin nor Admin")
+ self.orig_cloud.grant_role(
+ self.role_name, user=self.project.user.id,
+ project=self.project.project.id,
+ domain=self.project.domain.id)
+ self.orig_cloud.grant_role(
+ self.role_name, user=self.project.user.id,
+ domain=self.project.domain.id)
+ self.deployment_id = None
+ self.verifier_id = None
+ self.verifier_repo_dir = None
+ self.deployment_dir = None
self.verification_id = None
- self.res_dir = TempestCommon.TEMPEST_RESULTS_DIR
+ self.res_dir = os.path.join(
+ getattr(config.CONF, 'dir_results'), self.case_name)
self.raw_list = os.path.join(self.res_dir, 'test_raw_list.txt')
self.list = os.path.join(self.res_dir, 'test_list.txt')
self.conf_file = None
+ self.image_alt = None
+ self.flavor_alt = None
+ self.services = []
+ try:
+ self.services = kwargs['run']['args']['services']
+ except Exception: # pylint: disable=broad-except
+ pass
+ self.neutron_extensions = []
+ try:
+ self.neutron_extensions = kwargs['run']['args'][
+ 'neutron_extensions']
+ except Exception: # pylint: disable=broad-except
+ pass
+
+ def create_network_resources(self):
+ pass
+
+ def check_services(self):
+ """Check the mandatory services."""
+ for service in self.services:
+ try:
+ self.cloud.search_services(service)[0]
+ except Exception: # pylint: disable=broad-except
+ self.is_skipped = True
+ break
+
+ def check_extensions(self):
+ """Check the mandatory network extensions."""
+ extensions = self.cloud.get_network_extensions()
+ for network_extension in self.neutron_extensions:
+ if network_extension not in extensions:
+ LOGGER.warning(
+ "Cannot find Neutron extension: %s", network_extension)
+ self.is_skipped = True
+ break
+
+ def check_requirements(self):
+ self.check_services()
+ self.check_extensions()
+ if self.is_skipped:
+ self.project.clean()
@staticmethod
def read_file(filename):
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
for line in proc.stdout:
+ LOGGER.info(line.rstrip())
new_line = line.replace(' ', '').split('|')
if 'Tests' in new_line:
break
- LOGGER.info(line)
if 'Testscount' in new_line:
result['num_tests'] = int(new_line[2])
elif 'Success' in new_line:
result['num_failures'] = int(new_line[2])
return result
- def generate_test_list(self):
+ @staticmethod
+ def backup_tempest_config(conf_file, res_dir):
+ """
+ Copy config file to tempest results directory
+ """
+ if not os.path.exists(res_dir):
+ os.makedirs(res_dir)
+ shutil.copyfile(conf_file,
+ os.path.join(res_dir, 'tempest.conf'))
+
+ def generate_test_list(self, **kwargs):
"""Generate test list based on the test mode."""
LOGGER.debug("Generating test case list...")
- if self.mode == 'custom':
+ self.backup_tempest_config(self.conf_file, '/etc')
+ if kwargs.get('mode') == 'custom':
if os.path.isfile(conf_utils.TEMPEST_CUSTOM):
shutil.copyfile(
conf_utils.TEMPEST_CUSTOM, self.list)
raise Exception("Tempest test list file %s NOT found."
% conf_utils.TEMPEST_CUSTOM)
else:
- if self.mode == 'smoke':
- testr_mode = r"'^tempest\.(api|scenario).*\[.*\bsmoke\b.*\]$'"
- elif self.mode == 'full':
- testr_mode = r"'^tempest\.'"
- else:
- testr_mode = self.mode
- cmd = "(cd {0}; testr list-tests {1} >{2} 2>/dev/null)".format(
+ testr_mode = kwargs.get(
+ 'mode', r'^tempest\.(api|scenario).*\[.*\bsmoke\b.*\]$')
+ cmd = "(cd {0}; stestr list '{1}' >{2} 2>/dev/null)".format(
self.verifier_repo_dir, testr_mode, self.list)
output = subprocess.check_output(cmd, shell=True)
LOGGER.info("%s\n%s", cmd, output)
+ os.remove('/etc/tempest.conf')
def apply_tempest_blacklist(self):
"""Exclude blacklisted test cases."""
result_file = open(self.list, 'w')
black_tests = []
try:
- installer_type = env.get('INSTALLER_TYPE')
deploy_scenario = env.get('DEPLOY_SCENARIO')
- if bool(installer_type) * bool(deploy_scenario):
- # if INSTALLER_TYPE and DEPLOY_SCENARIO are set we read the
- # file
+ if bool(deploy_scenario):
+ # if DEPLOY_SCENARIO is set we read the file
black_list_file = open(conf_utils.TEMPEST_BLACKLIST)
black_list_yaml = yaml.safe_load(black_list_file)
black_list_file.close()
for item in black_list_yaml:
scenarios = item['scenarios']
- installers = item['installers']
- if (deploy_scenario in scenarios and
- installer_type in installers):
+ if deploy_scenario in scenarios:
tests = item['tests']
for test in tests:
black_tests.append(test)
result_file.write(str(cases_line) + '\n')
result_file.close()
- def run_verifier_tests(self):
+ def run_verifier_tests(self, **kwargs):
"""Execute tempest test cases."""
cmd = ["rally", "verify", "start", "--load-list",
self.list]
- cmd.extend(self.option)
+ cmd.extend(kwargs.get('option', []))
LOGGER.info("Starting Tempest test suite: '%s'.", cmd)
f_stdout = open(
os.path.join(self.res_dir, "tempest.log"), 'w+')
- f_stderr = open(
- os.path.join(self.res_dir,
- "tempest-error.log"), 'w+')
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
- stderr=f_stderr,
+ stderr=subprocess.STDOUT,
bufsize=1)
with proc.stdout:
for line in iter(proc.stdout.readline, b''):
if re.search(r"\} tempest\.", line):
- LOGGER.info(line.replace('\n', ''))
- elif re.search('Starting verification', line):
- LOGGER.info(line.replace('\n', ''))
- first_pos = line.index("UUID=") + len("UUID=")
- last_pos = line.index(") for deployment")
- self.verification_id = line[first_pos:last_pos]
- LOGGER.debug('Verification UUID: %s', self.verification_id)
+ LOGGER.info(line.rstrip())
+ elif re.search(r'(?=\(UUID=(.*)\))', line):
+ self.verification_id = re.search(
+ r'(?=\(UUID=(.*)\))', line).group(1)
f_stdout.write(line)
proc.wait()
-
f_stdout.close()
- f_stderr.close()
if self.verification_id is None:
raise Exception('Verification UUID not found')
+ LOGGER.info('Verification UUID: %s', self.verification_id)
+
+ shutil.copy(
+ "{}/tempest.log".format(self.deployment_dir),
+ "{}/tempest.debug.log".format(self.res_dir))
def parse_verifier_result(self):
"""Parse and save test results."""
output = logfile.read()
success_testcases = []
- for match in re.findall(r'.*\{0\} (.*?)[. ]*success ', output):
+ for match in re.findall(r'.*\{\d{1,2}\} (.*?) \.{3} success ',
+ output):
success_testcases.append(match)
failed_testcases = []
- for match in re.findall(r'.*\{0\} (.*?)[. ]*fail', output):
+ for match in re.findall(r'.*\{\d{1,2}\} (.*?) \.{3} fail',
+ output):
failed_testcases.append(match)
skipped_testcases = []
- for match in re.findall(r'.*\{0\} (.*?)[. ]*skip:', output):
+ for match in re.findall(r'.*\{\d{1,2}\} (.*?) \.{3} skip:',
+ output):
skipped_testcases.append(match)
self.details = {"tests_number": stat['num_tests'],
html_file = os.path.join(self.res_dir,
"tempest-report.html")
cmd = ["rally", "verify", "report", "--type", "html", "--uuid",
- self.verification_id, "--to", html_file]
- subprocess.Popen(cmd, stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+ self.verification_id, "--to", str(html_file)]
+ subprocess.check_output(cmd)
+
+ def update_rally_regex(self, rally_conf='/etc/rally/rally.conf'):
+ """Set image name as tempest img_name_regex"""
+ rconfig = configparser.RawConfigParser()
+ rconfig.read(rally_conf)
+ if not rconfig.has_section('openstack'):
+ rconfig.add_section('openstack')
+ rconfig.set('openstack', 'img_name_regex', '^{}$'.format(
+ self.image.name))
+ with open(rally_conf, 'wb') as config_file:
+ rconfig.write(config_file)
+
+ def update_default_role(self, rally_conf='/etc/rally/rally.conf'):
+ """Detect and update the default role if required"""
+ role = self.get_default_role(self.cloud)
+ if not role:
+ return
+ rconfig = configparser.RawConfigParser()
+ rconfig.read(rally_conf)
+ if not rconfig.has_section('openstack'):
+ rconfig.add_section('openstack')
+ rconfig.set('openstack', 'swift_operator_role', role.name)
+ with open(rally_conf, 'wb') as config_file:
+ rconfig.write(config_file)
+
+ def update_rally_logs(self, rally_conf='/etc/rally/rally.conf'):
+ """Print rally logs in res dir"""
+ if not os.path.exists(self.res_dir):
+ os.makedirs(self.res_dir)
+ rconfig = configparser.RawConfigParser()
+ rconfig.read(rally_conf)
+ rconfig.set('DEFAULT', 'debug', True)
+ rconfig.set('DEFAULT', 'use_stderr', False)
+ rconfig.set('DEFAULT', 'log-file', 'rally.log')
+ rconfig.set('DEFAULT', 'log_dir', self.res_dir)
+ with open(rally_conf, 'wb') as config_file:
+ rconfig.write(config_file)
+
+ @staticmethod
+ def clean_rally_conf(rally_conf='/etc/rally/rally.conf'):
+ """Clean Rally config"""
+ rconfig = configparser.RawConfigParser()
+ rconfig.read(rally_conf)
+ if rconfig.has_option('openstack', 'img_name_regex'):
+ rconfig.remove_option('openstack', 'img_name_regex')
+ if rconfig.has_option('openstack', 'swift_operator_role'):
+ rconfig.remove_option('openstack', 'swift_operator_role')
+ if rconfig.has_option('DEFAULT', 'use_stderr'):
+ rconfig.remove_option('DEFAULT', 'use_stderr')
+ if rconfig.has_option('DEFAULT', 'debug'):
+ rconfig.remove_option('DEFAULT', 'debug')
+ if rconfig.has_option('DEFAULT', 'log-file'):
+ rconfig.remove_option('DEFAULT', 'log-file')
+ if rconfig.has_option('DEFAULT', 'log_dir'):
+ rconfig.remove_option('DEFAULT', 'log_dir')
+ with open(rally_conf, 'wb') as config_file:
+ rconfig.write(config_file)
+
+ def update_scenario_section(self):
+ """Update scenario section in tempest.conf"""
+ rconfig = configparser.RawConfigParser()
+ rconfig.read(self.conf_file)
+ filename = getattr(
+ config.CONF, '{}_image'.format(self.case_name), self.filename)
+ if not rconfig.has_section('scenario'):
+ rconfig.add_section('scenario')
+ rconfig.set('scenario', 'img_file', os.path.basename(filename))
+ rconfig.set('scenario', 'img_dir', os.path.dirname(filename))
+ rconfig.set('scenario', 'img_disk_format', getattr(
+ config.CONF, '{}_image_format'.format(self.case_name),
+ self.image_format))
+ extra_properties = self.extra_properties.copy()
+ if env.get('IMAGE_PROPERTIES'):
+ extra_properties.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('IMAGE_PROPERTIES')))
+ extra_properties.update(
+ getattr(config.CONF, '{}_extra_properties'.format(
+ self.case_name), {}))
+ rconfig.set(
+ 'scenario', 'img_properties',
+ functest_utils.convert_dict_to_ini(extra_properties))
+ with open(self.conf_file, 'wb') as config_file:
+ rconfig.write(config_file)
def configure(self, **kwargs): # pylint: disable=unused-argument
"""
"""
if not os.path.exists(self.res_dir):
os.makedirs(self.res_dir)
- resources = self.resources.create()
- compute_cnt = snaps_utils.get_active_compute_cnt(
- self.resources.os_creds)
+ environ = dict(
+ os.environ,
+ OS_USERNAME=self.project.user.name,
+ OS_PROJECT_NAME=self.project.project.name,
+ OS_PROJECT_ID=self.project.project.id,
+ OS_PASSWORD=self.project.password)
+ try:
+ del environ['OS_TENANT_NAME']
+ del environ['OS_TENANT_ID']
+ except Exception: # pylint: disable=broad-except
+ pass
+ self.deployment_id = conf_utils.create_rally_deployment(
+ environ=environ)
+ if not self.deployment_id:
+ raise Exception("Deployment create failed")
+ self.verifier_id = conf_utils.create_verifier()
+ if not self.verifier_id:
+ raise Exception("Verifier create failed")
+ self.verifier_repo_dir = conf_utils.get_verifier_repo_dir(
+ self.verifier_id)
+ self.deployment_dir = conf_utils.get_verifier_deployment_dir(
+ self.verifier_id, self.deployment_id)
+
+ compute_cnt = len(self.orig_cloud.list_hypervisors())
+
+ self.image_alt = self.publish_image_alt()
+ self.flavor_alt = self.create_flavor_alt()
+ LOGGER.debug("flavor: %s", self.flavor_alt)
+
self.conf_file = conf_utils.configure_verifier(self.deployment_dir)
+ if not self.conf_file:
+ raise Exception("Tempest verifier configuring failed")
conf_utils.configure_tempest_update_params(
- self.conf_file, self.res_dir,
- network_name=resources.get("network_name"),
- image_id=resources.get("image_id"),
- flavor_id=resources.get("flavor_id"),
- compute_cnt=compute_cnt)
+ self.conf_file,
+ image_id=self.image.id,
+ flavor_id=self.flavor.id,
+ compute_cnt=compute_cnt,
+ image_alt_id=self.image_alt.id,
+ flavor_alt_id=self.flavor_alt.id,
+ admin_role_name=self.role_name, cidr=self.cidr,
+ domain_id=self.project.domain.id)
+ self.update_scenario_section()
+ self.backup_tempest_config(self.conf_file, self.res_dir)
def run(self, **kwargs):
self.start_time = time.time()
try:
- self.configure()
- self.generate_test_list()
+ assert super(TempestCommon, self).run(
+ **kwargs) == testcase.TestCase.EX_OK
+ if not os.path.exists(self.res_dir):
+ os.makedirs(self.res_dir)
+ self.update_rally_regex()
+ self.update_default_role()
+ self.update_rally_logs()
+ shutil.copy("/etc/rally/rally.conf", self.res_dir)
+ self.configure(**kwargs)
+ self.generate_test_list(**kwargs)
self.apply_tempest_blacklist()
- self.run_verifier_tests()
+ self.run_verifier_tests(**kwargs)
self.parse_verifier_result()
self.generate_report()
res = testcase.TestCase.EX_OK
except Exception: # pylint: disable=broad-except
LOGGER.exception('Error with run')
+ self.result = 0
res = testcase.TestCase.EX_RUN_ERROR
- finally:
- self.resources.cleanup()
self.stop_time = time.time()
return res
-
-class TempestSmokeSerial(TempestCommon):
- """Tempest smoke serial testcase implementation."""
- def __init__(self, **kwargs):
- if "case_name" not in kwargs:
- kwargs["case_name"] = 'tempest_smoke_serial'
- TempestCommon.__init__(self, **kwargs)
- self.mode = "smoke"
- self.option = ["--concurrency", "1"]
-
-
-class TempestNeutronTrunk(TempestCommon):
- """Tempest neutron trunk testcase implementation."""
- def __init__(self, **kwargs):
- if "case_name" not in kwargs:
- kwargs["case_name"] = 'neutron_trunk'
- TempestCommon.__init__(self, **kwargs)
- self.mode = "'neutron.tests.tempest.(api|scenario).test_trunk'"
- self.res_dir = os.path.join(
- getattr(config.CONF, 'dir_results'), 'neutron_trunk')
- self.raw_list = os.path.join(self.res_dir, 'test_raw_list.txt')
- self.list = os.path.join(self.res_dir, 'test_list.txt')
-
- def configure(self, **kwargs):
- super(TempestNeutronTrunk, self).configure(**kwargs)
- rconfig = conf_utils.ConfigParser.RawConfigParser()
- rconfig.read(self.conf_file)
- rconfig.set('network-feature-enabled', 'api_extensions', 'all')
- with open(self.conf_file, 'wb') as config_file:
- rconfig.write(config_file)
-
-
-class TempestSmokeParallel(TempestCommon):
- """Tempest smoke parallel testcase implementation."""
- def __init__(self, **kwargs):
- if "case_name" not in kwargs:
- kwargs["case_name"] = 'tempest_smoke_parallel'
- TempestCommon.__init__(self, **kwargs)
- self.mode = "smoke"
-
-
-class TempestFullParallel(TempestCommon):
- """Tempest full parallel testcase implementation."""
- def __init__(self, **kwargs):
- if "case_name" not in kwargs:
- kwargs["case_name"] = 'tempest_full_parallel'
- TempestCommon.__init__(self, **kwargs)
- self.mode = "full"
-
-
-class TempestCustom(TempestCommon):
- """Tempest custom testcase implementation."""
- def __init__(self, **kwargs):
- if "case_name" not in kwargs:
- kwargs["case_name"] = 'tempest_custom'
- TempestCommon.__init__(self, **kwargs)
- self.mode = "custom"
- self.option = ["--concurrency", "1"]
-
-
-class TempestDefcore(TempestCommon):
- """Tempest Defcore testcase implementation."""
- def __init__(self, **kwargs):
- if "case_name" not in kwargs:
- kwargs["case_name"] = 'tempest_defcore'
- TempestCommon.__init__(self, **kwargs)
- self.mode = "defcore"
- self.option = ["--concurrency", "1"]
-
-
-class TempestResourcesManager(object):
- """Tempest resource manager."""
- def __init__(self, **kwargs):
- self.os_creds = kwargs.get('os_creds') or snaps_utils.get_credentials()
- self.guid = '-' + str(uuid.uuid4())
- self.creators = list()
- self.cirros_image_config = getattr(
- config.CONF, 'snaps_images_cirros', None)
-
- def _create_project(self):
- """Create project for tests."""
- project_creator = deploy_utils.create_project(
- self.os_creds, ProjectConfig(
- name=getattr(
- config.CONF, 'tempest_identity_tenant_name') + self.guid,
- description=getattr(
- config.CONF, 'tempest_identity_tenant_description'),
- domain=self.os_creds.project_domain_name))
- if project_creator is None or project_creator.get_project() is None:
- raise Exception("Failed to create tenant")
- self.creators.append(project_creator)
- return project_creator.get_project().id
-
- def _create_user(self):
- """Create user for tests."""
- user_creator = deploy_utils.create_user(
- self.os_creds, UserConfig(
- name=getattr(
- config.CONF, 'tempest_identity_user_name') + self.guid,
- password=getattr(
- config.CONF, 'tempest_identity_user_password'),
- project_name=getattr(
- config.CONF, 'tempest_identity_tenant_name') + self.guid,
- domain_name=self.os_creds.user_domain_name))
- if user_creator is None or user_creator.get_user() is None:
- raise Exception("Failed to create user")
- self.creators.append(user_creator)
- return user_creator.get_user().id
-
- def _create_network(self, project_name):
- """Create network for tests."""
- tempest_network_type = None
- tempest_physical_network = None
- tempest_segmentation_id = None
-
- tempest_network_type = getattr(
- config.CONF, 'tempest_network_type', None)
- tempest_physical_network = getattr(
- config.CONF, 'tempest_physical_network', None)
- tempest_segmentation_id = getattr(
- config.CONF, 'tempest_segmentation_id', None)
- tempest_net_name = getattr(
- config.CONF, 'tempest_private_net_name') + self.guid
-
- network_creator = deploy_utils.create_network(
- self.os_creds, NetworkConfig(
- name=tempest_net_name,
- project_name=project_name,
- network_type=tempest_network_type,
- physical_network=tempest_physical_network,
- segmentation_id=tempest_segmentation_id,
- subnet_settings=[SubnetConfig(
- name=getattr(
- config.CONF,
- 'tempest_private_subnet_name') + self.guid,
- project_name=project_name,
- cidr=getattr(
- config.CONF, 'tempest_private_subnet_cidr'),
- dns_nameservers=[env.get('NAMESERVER')])]))
- if network_creator is None or network_creator.get_network() is None:
- raise Exception("Failed to create private network")
- self.creators.append(network_creator)
- return tempest_net_name
-
- def _create_image(self, name):
- """Create image for tests"""
- os_image_settings = openstack_tests.cirros_image_settings(
- name, public=True,
- image_metadata=self.cirros_image_config)
- image_creator = deploy_utils.create_image(
- self.os_creds, os_image_settings)
- if image_creator is None:
- raise Exception('Failed to create image')
- self.creators.append(image_creator)
- return image_creator.get_image().id
-
- def _create_flavor(self, name):
- """Create flavor for tests."""
- flavor_metadata = getattr(config.CONF, 'flavor_extra_specs', None)
- flavor_creator = OpenStackFlavor(
- self.os_creds, FlavorConfig(
- name=name,
- ram=getattr(config.CONF, 'openstack_flavor_ram'),
- disk=getattr(config.CONF, 'openstack_flavor_disk'),
- vcpus=getattr(config.CONF, 'openstack_flavor_vcpus'),
- metadata=flavor_metadata))
- flavor = flavor_creator.create()
- if flavor is None:
- raise Exception('Failed to create flavor')
- self.creators.append(flavor_creator)
- return flavor.id
-
- def create(self, create_project=False):
- """Create resources for Tempest test suite."""
- result = {
- 'tempest_net_name': None,
- 'image_id': None,
- 'image_id_alt': None,
- 'flavor_id': None,
- 'flavor_id_alt': None
- }
- project_name = None
-
- if create_project:
- LOGGER.debug("Creating project and user for Tempest suite")
- project_name = getattr(
- config.CONF, 'tempest_identity_tenant_name') + self.guid
- result['project_id'] = self._create_project()
- result['user_id'] = self._create_user()
- result['tenant_id'] = result['project_id'] # for compatibility
-
- LOGGER.debug("Creating private network for Tempest suite")
- result['tempest_net_name'] = self._create_network(project_name)
-
- LOGGER.debug("Creating two images for Tempest suite")
- image_name = getattr(config.CONF, 'openstack_image_name') + self.guid
- result['image_id'] = self._create_image(image_name)
- image_name = getattr(
- config.CONF, 'openstack_image_name_alt') + self.guid
- result['image_id_alt'] = self._create_image(image_name)
-
- LOGGER.info("Creating two flavors for Tempest suite")
- name = getattr(config.CONF, 'openstack_flavor_name') + self.guid
- result['flavor_id'] = self._create_flavor(name)
-
- name = getattr(
- config.CONF, 'openstack_flavor_name_alt') + self.guid
- result['flavor_id_alt'] = self._create_flavor(name)
-
- return result
-
- def cleanup(self):
+ def clean(self):
"""
Cleanup all OpenStack objects. Should be called on completion.
"""
- for creator in reversed(self.creators):
- try:
- creator.clean()
- except Exception as err: # pylint: disable=broad-except
- LOGGER.error('Unexpected error cleaning - %s', err)
+ self.clean_rally_conf()
+ if self.image_alt:
+ self.cloud.delete_image(self.image_alt)
+ if self.flavor_alt:
+ self.orig_cloud.delete_flavor(self.flavor_alt.id)
+ super(TempestCommon, self).clean()