import shutil
import subprocess
import time
-import uuid
-
-from snaps.config.flavor import FlavorConfig
-from snaps.config.network import NetworkConfig, SubnetConfig
-from snaps.config.project import ProjectConfig
-from snaps.config.user import UserConfig
-from snaps.openstack.create_flavor import OpenStackFlavor
-from snaps.openstack.tests import openstack_tests
-from snaps.openstack.utils import deploy_utils
+
+from six.moves import configparser
from xtesting.core import testcase
import yaml
-from functest.opnfv_tests.openstack.snaps import snaps_utils
+from functest.core import singlevm
from functest.opnfv_tests.openstack.tempest import conf_utils
from functest.utils import config
from functest.utils import env
-from functest.utils import functest_utils
LOGGER = logging.getLogger(__name__)
-class TempestCommon(testcase.TestCase):
+class TempestCommon(singlevm.VmReady1):
# pylint: disable=too-many-instance-attributes
"""TempestCommon testcases implementation class."""
TEMPEST_RESULTS_DIR = os.path.join(
getattr(config.CONF, 'dir_results'), 'tempest')
+ visibility = 'public'
+
def __init__(self, **kwargs):
super(TempestCommon, self).__init__(**kwargs)
- self.resources = TempestResourcesManager(**kwargs)
self.mode = ""
self.option = []
self.verifier_id = conf_utils.get_verifier_id()
self.raw_list = os.path.join(self.res_dir, 'test_raw_list.txt')
self.list = os.path.join(self.res_dir, 'test_list.txt')
self.conf_file = None
+ self.image_alt = None
+ self.flavor_alt = None
@staticmethod
def read_file(filename):
result['num_failures'] = int(new_line[2])
return result
+ @staticmethod
+ def backup_tempest_config(conf_file, res_dir):
+ """
+ Copy config file to tempest results directory
+ """
+ if not os.path.exists(res_dir):
+ os.makedirs(res_dir)
+ shutil.copyfile(conf_file,
+ os.path.join(res_dir, 'tempest.conf'))
+
def generate_test_list(self):
"""Generate test list based on the test mode."""
LOGGER.debug("Generating test case list...")
+ self.backup_tempest_config(self.conf_file, '/etc')
if self.mode == 'custom':
if os.path.isfile(conf_utils.TEMPEST_CUSTOM):
shutil.copyfile(
% conf_utils.TEMPEST_CUSTOM)
else:
if self.mode == 'smoke':
- testr_mode = r"'tempest\.(api|scenario).*\[.*\bsmoke\b.*\]'"
+ testr_mode = r"'^tempest\.(api|scenario).*\[.*\bsmoke\b.*\]$'"
elif self.mode == 'full':
testr_mode = r"'^tempest\.'"
else:
testr_mode = self.mode
- cmd = ("cd {0};"
- "testr list-tests {1} > {2};"
- "cd -;".format(self.verifier_repo_dir,
- testr_mode,
- self.list))
- functest_utils.execute_command(cmd)
+ cmd = "(cd {0}; stestr list {1} >{2} 2>/dev/null)".format(
+ self.verifier_repo_dir, testr_mode, self.list)
+ output = subprocess.check_output(cmd, shell=True)
+ LOGGER.info("%s\n%s", cmd, output)
+ os.remove('/etc/tempest.conf')
def apply_tempest_blacklist(self):
"""Exclude blacklisted test cases."""
for line in iter(proc.stdout.readline, b''):
if re.search(r"\} tempest\.", line):
LOGGER.info(line.replace('\n', ''))
- elif re.search('Starting verification', line):
- LOGGER.info(line.replace('\n', ''))
- first_pos = line.index("UUID=") + len("UUID=")
- last_pos = line.index(") for deployment")
- self.verification_id = line[first_pos:last_pos]
- LOGGER.debug('Verification UUID: %s', self.verification_id)
+ elif re.search(r'(?=\(UUID=(.*)\))', line):
+ self.verification_id = re.search(
+ r'(?=\(UUID=(.*)\))', line).group(1)
+ LOGGER.info('Verification UUID: %s', self.verification_id)
f_stdout.write(line)
proc.wait()
return
with open(os.path.join(self.res_dir,
- "tempest.log"), 'r') as logfile:
+ "tempest-error.log"), 'r') as logfile:
output = logfile.read()
success_testcases = []
- for match in re.findall(r'.*\{0\} (.*?)[. ]*success ', output):
+ for match in re.findall(r'.*\{\d{1,2}\} (.*?) \.{3} success ',
+ output):
success_testcases.append(match)
failed_testcases = []
- for match in re.findall(r'.*\{0\} (.*?)[. ]*fail ', output):
+ for match in re.findall(r'.*\{\d{1,2}\} (.*?) \.{3} fail',
+ output):
failed_testcases.append(match)
skipped_testcases = []
- for match in re.findall(r'.*\{0\} (.*?)[. ]*skip:', output):
+ for match in re.findall(r'.*\{\d{1,2}\} (.*?) \.{3} skip:',
+ output):
skipped_testcases.append(match)
- self.details = {"tests": stat['num_tests'],
- "failures": stat['num_failures'],
+ self.details = {"tests_number": stat['num_tests'],
+ "success_number": stat['num_success'],
+ "skipped_number": stat['num_skipped'],
+ "failures_number": stat['num_failures'],
"success": success_testcases,
"skipped": skipped_testcases,
- "errors": failed_testcases}
+ "failures": failed_testcases}
except Exception: # pylint: disable=broad-except
self.result = 0
"""
if not os.path.exists(self.res_dir):
os.makedirs(self.res_dir)
- resources = self.resources.create()
- compute_cnt = snaps_utils.get_active_compute_cnt(
- self.resources.os_creds)
+ compute_cnt = len(self.cloud.list_hypervisors())
+
+ self.image_alt = self.publish_image(
+ '{}-img_alt_{}'.format(self.case_name, self.guid))
+ self.flavor_alt = self.create_flavor_alt()
+ LOGGER.debug("flavor: %s", self.flavor_alt)
+
self.conf_file = conf_utils.configure_verifier(self.deployment_dir)
conf_utils.configure_tempest_update_params(
- self.conf_file, self.res_dir,
- network_name=resources.get("network_name"),
- image_id=resources.get("image_id"),
- flavor_id=resources.get("flavor_id"),
- compute_cnt=compute_cnt)
+ self.conf_file, network_name=self.network.id,
+ image_id=self.image.id,
+ flavor_id=self.flavor.id,
+ compute_cnt=compute_cnt,
+ image_alt_id=self.image_alt.id,
+ flavor_alt_id=self.flavor_alt.id)
+ self.backup_tempest_config(self.conf_file, self.res_dir)
def run(self, **kwargs):
self.start_time = time.time()
try:
- self.configure()
+ super(TempestCommon, self).run(**kwargs)
+ self.configure(**kwargs)
self.generate_test_list()
self.apply_tempest_blacklist()
self.run_verifier_tests()
except Exception: # pylint: disable=broad-except
LOGGER.exception('Error with run')
res = testcase.TestCase.EX_RUN_ERROR
- finally:
- self.resources.cleanup()
self.stop_time = time.time()
return res
+ def clean(self):
+ """
+ Cleanup all OpenStack objects. Should be called on completion.
+ """
+ super(TempestCommon, self).clean()
+ self.cloud.delete_image(self.image_alt)
+ self.orig_cloud.delete_flavor(self.flavor_alt.id)
+
class TempestSmokeSerial(TempestCommon):
"""Tempest smoke serial testcase implementation."""
if "case_name" not in kwargs:
kwargs["case_name"] = 'neutron_trunk'
TempestCommon.__init__(self, **kwargs)
- self.mode = "'neutron.tests.tempest.(api|scenario).test_trunk'"
+ self.mode = "'neutron_tempest_plugin.(api|scenario).test_trunk'"
self.res_dir = os.path.join(
getattr(config.CONF, 'dir_results'), 'neutron_trunk')
self.raw_list = os.path.join(self.res_dir, 'test_raw_list.txt')
def configure(self, **kwargs):
super(TempestNeutronTrunk, self).configure(**kwargs)
- rconfig = conf_utils.ConfigParser.RawConfigParser()
+ rconfig = configparser.RawConfigParser()
rconfig.read(self.conf_file)
rconfig.set('network-feature-enabled', 'api_extensions', 'all')
with open(self.conf_file, 'wb') as config_file:
rconfig.write(config_file)
+class TempestBarbican(TempestCommon):
+ """Tempest Barbican testcase implementation."""
+ def __init__(self, **kwargs):
+ if "case_name" not in kwargs:
+ kwargs["case_name"] = 'barbican'
+ TempestCommon.__init__(self, **kwargs)
+ self.mode = "'barbican_tempest_plugin.tests.(api|scenario)'"
+ self.res_dir = os.path.join(
+ getattr(config.CONF, 'dir_results'), 'barbican')
+ self.raw_list = os.path.join(self.res_dir, 'test_raw_list.txt')
+ self.list = os.path.join(self.res_dir, 'test_list.txt')
+
+
class TempestSmokeParallel(TempestCommon):
"""Tempest smoke parallel testcase implementation."""
def __init__(self, **kwargs):
TempestCommon.__init__(self, **kwargs)
self.mode = "defcore"
self.option = ["--concurrency", "1"]
-
-
-class TempestResourcesManager(object):
- """Tempest resource manager."""
- def __init__(self, **kwargs):
- self.os_creds = kwargs.get('os_creds') or snaps_utils.get_credentials()
- self.guid = '-' + str(uuid.uuid4())
- self.creators = list()
- self.cirros_image_config = getattr(
- config.CONF, 'snaps_images_cirros', None)
-
- def _create_project(self):
- """Create project for tests."""
- project_creator = deploy_utils.create_project(
- self.os_creds, ProjectConfig(
- name=getattr(
- config.CONF, 'tempest_identity_tenant_name') + self.guid,
- description=getattr(
- config.CONF, 'tempest_identity_tenant_description'),
- domain=self.os_creds.project_domain_name))
- if project_creator is None or project_creator.get_project() is None:
- raise Exception("Failed to create tenant")
- self.creators.append(project_creator)
- return project_creator.get_project().id
-
- def _create_user(self):
- """Create user for tests."""
- user_creator = deploy_utils.create_user(
- self.os_creds, UserConfig(
- name=getattr(
- config.CONF, 'tempest_identity_user_name') + self.guid,
- password=getattr(
- config.CONF, 'tempest_identity_user_password'),
- project_name=getattr(
- config.CONF, 'tempest_identity_tenant_name') + self.guid,
- domain_name=self.os_creds.user_domain_name))
- if user_creator is None or user_creator.get_user() is None:
- raise Exception("Failed to create user")
- self.creators.append(user_creator)
- return user_creator.get_user().id
-
- def _create_network(self, project_name):
- """Create network for tests."""
- tempest_network_type = None
- tempest_physical_network = None
- tempest_segmentation_id = None
-
- tempest_network_type = getattr(
- config.CONF, 'tempest_network_type', None)
- tempest_physical_network = getattr(
- config.CONF, 'tempest_physical_network', None)
- tempest_segmentation_id = getattr(
- config.CONF, 'tempest_segmentation_id', None)
- tempest_net_name = getattr(
- config.CONF, 'tempest_private_net_name') + self.guid
-
- network_creator = deploy_utils.create_network(
- self.os_creds, NetworkConfig(
- name=tempest_net_name,
- project_name=project_name,
- network_type=tempest_network_type,
- physical_network=tempest_physical_network,
- segmentation_id=tempest_segmentation_id,
- subnet_settings=[SubnetConfig(
- name=getattr(
- config.CONF,
- 'tempest_private_subnet_name') + self.guid,
- project_name=project_name,
- cidr=getattr(
- config.CONF, 'tempest_private_subnet_cidr'),
- dns_nameservers=[env.get('NAMESERVER')])]))
- if network_creator is None or network_creator.get_network() is None:
- raise Exception("Failed to create private network")
- self.creators.append(network_creator)
- return tempest_net_name
-
- def _create_image(self, name):
- """Create image for tests"""
- os_image_settings = openstack_tests.cirros_image_settings(
- name, public=True,
- image_metadata=self.cirros_image_config)
- image_creator = deploy_utils.create_image(
- self.os_creds, os_image_settings)
- if image_creator is None:
- raise Exception('Failed to create image')
- self.creators.append(image_creator)
- return image_creator.get_image().id
-
- def _create_flavor(self, name):
- """Create flavor for tests."""
- flavor_metadata = getattr(config.CONF, 'flavor_extra_specs', None)
- flavor_creator = OpenStackFlavor(
- self.os_creds, FlavorConfig(
- name=name,
- ram=getattr(config.CONF, 'openstack_flavor_ram'),
- disk=getattr(config.CONF, 'openstack_flavor_disk'),
- vcpus=getattr(config.CONF, 'openstack_flavor_vcpus'),
- metadata=flavor_metadata))
- flavor = flavor_creator.create()
- if flavor is None:
- raise Exception('Failed to create flavor')
- self.creators.append(flavor_creator)
- return flavor.id
-
- def create(self, create_project=False):
- """Create resources for Tempest test suite."""
- result = {
- 'tempest_net_name': None,
- 'image_id': None,
- 'image_id_alt': None,
- 'flavor_id': None,
- 'flavor_id_alt': None
- }
- project_name = None
-
- if create_project:
- LOGGER.debug("Creating project and user for Tempest suite")
- project_name = getattr(
- config.CONF, 'tempest_identity_tenant_name') + self.guid
- result['project_id'] = self._create_project()
- result['user_id'] = self._create_user()
- result['tenant_id'] = result['project_id'] # for compatibility
-
- LOGGER.debug("Creating private network for Tempest suite")
- result['tempest_net_name'] = self._create_network(project_name)
-
- LOGGER.debug("Creating two images for Tempest suite")
- image_name = getattr(config.CONF, 'openstack_image_name') + self.guid
- result['image_id'] = self._create_image(image_name)
- image_name = getattr(
- config.CONF, 'openstack_image_name_alt') + self.guid
- result['image_id_alt'] = self._create_image(image_name)
-
- LOGGER.info("Creating two flavors for Tempest suite")
- name = getattr(config.CONF, 'openstack_flavor_name') + self.guid
- result['flavor_id'] = self._create_flavor(name)
-
- name = getattr(
- config.CONF, 'openstack_flavor_name_alt') + self.guid
- result['flavor_id_alt'] = self._create_flavor(name)
-
- return result
-
- def cleanup(self):
- """
- Cleanup all OpenStack objects. Should be called on completion.
- """
- for creator in reversed(self.creators):
- try:
- creator.clean()
- except Exception as err: # pylint: disable=broad-except
- LOGGER.error('Unexpected error cleaning - %s', err)