X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=functest%2Fci%2Frun_tests.py;h=93518de0b7c481b6b71d3c48f1ee417515828877;hb=ac816628995c1e017f12ba23435ae07d24ceecac;hp=7de1577d42aac48dee8ae3a6b39b6306bbb4976d;hpb=dc733c31177b0ffdc4c30b9c4801b765909f1c50;p=functest.git diff --git a/functest/ci/run_tests.py b/functest/ci/run_tests.py old mode 100644 new mode 100755 index 7de1577d4..8317df541 --- a/functest/ci/run_tests.py +++ b/functest/ci/run_tests.py @@ -8,258 +8,272 @@ # http://www.apache.org/licenses/LICENSE-2.0 # -import datetime +import argparse +import enum import importlib +import logging +import logging.config import os import re import sys -import argparse +import prettytable -import functest.ci.generate_report as generate_report import functest.ci.tier_builder as tb -import functest.core.testcase_base as testcase_base -import functest.utils.functest_logger as ft_logger +import functest.core.testcase as testcase import functest.utils.functest_utils as ft_utils -import functest.utils.functest_constants as ft_constants import functest.utils.openstack_clean as os_clean import functest.utils.openstack_snapshot as os_snapshot import functest.utils.openstack_utils as os_utils - - -parser = argparse.ArgumentParser() -parser.add_argument("-t", "--test", dest="test", action='store', - help="Test case or tier (group of tests) to be executed. " - "It will run all the test if not specified.") -parser.add_argument("-n", "--noclean", help="Do not clean OpenStack resources" - " after running each test (default=false).", - action="store_true") -parser.add_argument("-r", "--report", help="Push results to database " - "(default=false).", action="store_true") -args = parser.parse_args() - - -""" logging configuration """ -logger = ft_logger.Logger("run_tests").getLogger() - - -""" global variables """ -EXEC_SCRIPT = ("%s/functest/ci/exec_test.sh" % ft_constants.FUNCTEST_REPO_DIR) - -# This will be the return code of this script. If any of the tests fails, -# this variable will change to -1 - - -class GlobalVariables: - EXECUTED_TEST_CASES = [] - OVERALL_RESULT = 0 - CLEAN_FLAG = True - REPORT_FLAG = False - - -def print_separator(str, count=45): - line = "" - for i in range(0, count - 1): - line += str - logger.info("%s" % line) - - -def source_rc_file(): - rc_file = ft_constants.OPENSTACK_CREDS - if not os.path.isfile(rc_file): - logger.error("RC file %s does not exist..." % rc_file) - sys.exit(1) - logger.debug("Sourcing the OpenStack RC file...") - creds = os_utils.source_credentials(rc_file) - for key, value in creds.iteritems(): - if re.search("OS_", key): - if key == 'OS_AUTH_URL': - ft_constants.OS_AUTH_URL = value - elif key == 'OS_USERNAME': - ft_constants.OS_USERNAME = value - elif key == 'OS_TENANT_NAME': - ft_constants.OS_TENANT_NAME = value - elif key == 'OS_PASSWORD': - ft_constants.OS_PASSWORD = value - logger.debug("OS_AUTH_URL:%s" % ft_constants.OS_AUTH_URL) - logger.debug("OS_USERNAME:%s" % ft_constants.OS_USERNAME) - logger.debug("OS_TENANT_NAME:%s" % ft_constants.OS_TENANT_NAME) - logger.debug("OS_PASSWORD:%s" % ft_constants.OS_PASSWORD) - - -def generate_os_snapshot(): - os_snapshot.main() - - -def cleanup(): - os_clean.main() - - -def update_test_info(test_name, result, duration): - for test in GlobalVariables.EXECUTED_TEST_CASES: - if test['test_name'] == test_name: - test.update({"result": result, - "duration": duration}) - - -def get_run_dict_if_defined(testname): - try: - dict = ft_utils.get_dict_by_test(testname) - if not dict: - logger.error("Cannot get {}'s config options".format(testname)) - elif 'run' in dict: - return dict['run'] - return None - except Exception: - logger.exception("Cannot get {}'s config options".format(testname)) - return None - - -def run_test(test, tier_name): - result_str = "PASS" - start = datetime.datetime.now() - test_name = test.get_name() - logger.info("\n") # blank line - print_separator("=") - logger.info("Running test case '%s'..." % test_name) - print_separator("=") - logger.debug("\n%s" % test) - - if GlobalVariables.CLEAN_FLAG: - generate_os_snapshot() - - flags = (" -t %s" % (test_name)) - if GlobalVariables.REPORT_FLAG: - flags += " -r" - - result = testcase_base.TestcaseBase.EX_RUN_ERROR - run_dict = get_run_dict_if_defined(test_name) - if run_dict: +from functest.utils.constants import CONST + +# __name__ cannot be used here +logger = logging.getLogger('functest.ci.run_tests') + + +class Result(enum.Enum): + EX_OK = os.EX_OK + EX_ERROR = -1 + + +class BlockingTestFailed(Exception): + pass + + +class TestNotEnabled(Exception): + pass + + +class RunTestsParser(object): + + def __init__(self): + self.parser = argparse.ArgumentParser() + self.parser.add_argument("-t", "--test", dest="test", action='store', + help="Test case or tier (group of tests) " + "to be executed. It will run all the test " + "if not specified.") + self.parser.add_argument("-n", "--noclean", help="Do not clean " + "OpenStack resources after running each " + "test (default=false).", + action="store_true") + self.parser.add_argument("-r", "--report", help="Push results to " + "database (default=false).", + action="store_true") + + def parse_args(self, argv=[]): + return vars(self.parser.parse_args(argv)) + + +class Runner(object): + + def __init__(self): + self.executed_test_cases = [] + self.overall_result = Result.EX_OK + self.clean_flag = True + self.report_flag = False + + @staticmethod + def print_separator(str, count=45): + line = "" + for i in range(0, count - 1): + line += str + logger.info("%s" % line) + + @staticmethod + def source_rc_file(): + rc_file = CONST.__getattribute__('openstack_creds') + if not os.path.isfile(rc_file): + raise Exception("RC file %s does not exist..." % rc_file) + logger.debug("Sourcing the OpenStack RC file...") + os_utils.source_credentials(rc_file) + for key, value in os.environ.iteritems(): + if re.search("OS_", key): + if key == 'OS_AUTH_URL': + CONST.__setattr__('OS_AUTH_URL', value) + elif key == 'OS_USERNAME': + CONST.__setattr__('OS_USERNAME', value) + elif key == 'OS_TENANT_NAME': + CONST.__setattr__('OS_TENANT_NAME', value) + elif key == 'OS_PASSWORD': + CONST.__setattr__('OS_PASSWORD', value) + + @staticmethod + def generate_os_snapshot(): + os_snapshot.main() + + @staticmethod + def cleanup(): + os_clean.main() + + @staticmethod + def get_run_dict(testname): try: - module = importlib.import_module(run_dict['module']) - cls = getattr(module, run_dict['class']) - test_case = cls() - result = test_case.run() - if (result == testcase_base.TestCasesBase.EX_OK and - GlobalVariables.REPORT_FLAG): - result = test_case.push_to_db() - except ImportError: - logger.exception("Cannot import module {}".format( - run_dict['module'])) - except AttributeError: - logger.exception("Cannot get class {}".format( - run_dict['class'])) - else: - cmd = ("%s%s" % (EXEC_SCRIPT, flags)) - logger.info("Executing command {} because {} " - "doesn't implement the new framework".format( - cmd, test_name)) - result = ft_utils.execute_command(cmd) - - if GlobalVariables.CLEAN_FLAG: - cleanup() - end = datetime.datetime.now() - duration = (end - start).seconds - duration_str = ("%02d:%02d" % divmod(duration, 60)) - logger.info("Test execution time: %s" % duration_str) - - if result != 0: - logger.error("The test case '%s' failed. " % test_name) - OVERALL_RESULT = -1 - result_str = "FAIL" - - if test.is_blocking(): - if not args.test or args.test == "all": - logger.info("This test case is blocking. Aborting overall " - "execution.") - # if it is a single test we don't print the whole results table - update_test_info(test_name, result_str, duration_str) - generate_report.main(GlobalVariables.EXECUTED_TEST_CASES) - logger.info("Execution exit value: %s" % OVERALL_RESULT) - sys.exit(OVERALL_RESULT) - - update_test_info(test_name, result_str, duration_str) - - -def run_tier(tier): - tier_name = tier.get_name() - tests = tier.get_tests() - if tests is None or len(tests) == 0: - logger.info("There are no supported test cases in this tier " - "for the given scenario") - return 0 - logger.info("\n\n") # blank line - print_separator("#") - logger.info("Running tier '%s'" % tier_name) - print_separator("#") - logger.debug("\n%s" % tier) - for test in tests: - run_test(test, tier_name) - - -def run_all(tiers): - summary = "" - BUILD_TAG = ft_constants.CI_BUILD_TAG - if BUILD_TAG is not None and re.search("daily", BUILD_TAG) is not None: - CI_LOOP = "daily" - else: - CI_LOOP = "weekly" - - tiers_to_run = [] - - for tier in tiers.get_tiers(): - if (len(tier.get_tests()) != 0 and - re.search(CI_LOOP, tier.get_ci_loop()) is not None): - tiers_to_run.append(tier) - summary += ("\n - %s:\n\t %s" - % (tier.get_name(), - tier.get_test_names())) - - logger.info("Tests to be executed:%s" % summary) - GlobalVariables.EXECUTED_TEST_CASES = generate_report.init(tiers_to_run) - for tier in tiers_to_run: - run_tier(tier) - - generate_report.main(GlobalVariables.EXECUTED_TEST_CASES) - - -def main(): - - CI_INSTALLER_TYPE = ft_constants.CI_INSTALLER_TYPE - CI_SCENARIO = ft_constants.CI_SCENARIO - - file = ft_constants.FUNCTEST_TESTCASES_YAML - _tiers = tb.TierBuilder(CI_INSTALLER_TYPE, CI_SCENARIO, file) - - if args.noclean: - GlobalVariables.CLEAN_FLAG = False - - if args.report: - GlobalVariables.REPORT_FLAG = True - - if args.test: - source_rc_file() - if _tiers.get_tier(args.test): - run_tier(_tiers.get_tier(args.test)) - - elif _tiers.get_test(args.test): - run_test(_tiers.get_test(args.test), _tiers.get_tier(args.test)) - - elif args.test == "all": - run_all(_tiers) - + dict = ft_utils.get_dict_by_test(testname) + if not dict: + logger.error("Cannot get {}'s config options".format(testname)) + elif 'run' in dict: + return dict['run'] + return None + except Exception: + logger.exception("Cannot get {}'s config options".format(testname)) + return None + + def run_test(self, test, tier_name, testcases=None): + if not test.is_enabled(): + raise TestNotEnabled( + "The test case {} is not enabled".format(test.get_name())) + logger.info("\n") # blank line + self.print_separator("=") + logger.info("Running test case '%s'..." % test.get_name()) + self.print_separator("=") + logger.debug("\n%s" % test) + self.source_rc_file() + + if test.needs_clean() and self.clean_flag: + self.generate_os_snapshot() + + flags = " -t %s" % test.get_name() + if self.report_flag: + flags += " -r" + + result = testcase.TestCase.EX_RUN_ERROR + run_dict = self.get_run_dict(test.get_name()) + if run_dict: + try: + module = importlib.import_module(run_dict['module']) + cls = getattr(module, run_dict['class']) + test_dict = ft_utils.get_dict_by_test(test.get_name()) + test_case = cls(**test_dict) + self.executed_test_cases.append(test_case) + try: + kwargs = run_dict['args'] + result = test_case.run(**kwargs) + except KeyError: + result = test_case.run() + if result == testcase.TestCase.EX_OK: + if self.report_flag: + test_case.push_to_db() + result = test_case.is_successful() + logger.info("Test result:\n\n%s\n", test_case) + except ImportError: + logger.exception("Cannot import module {}".format( + run_dict['module'])) + except AttributeError: + logger.exception("Cannot get class {}".format( + run_dict['class'])) else: - logger.error("Unknown test case or tier '%s', or not supported by " - "the given scenario '%s'." - % (args.test, CI_SCENARIO)) - logger.debug("Available tiers are:\n\n%s" - % _tiers) - else: - run_all(_tiers) + raise Exception("Cannot import the class for the test case.") + + if test.needs_clean() and self.clean_flag: + self.cleanup() + if result != testcase.TestCase.EX_OK: + logger.error("The test case '%s' failed. " % test.get_name()) + self.overall_result = Result.EX_ERROR + if test.is_blocking(): + raise BlockingTestFailed( + "The test case {} failed and is blocking".format( + test.get_name())) + + def run_tier(self, tier): + tier_name = tier.get_name() + tests = tier.get_tests() + if tests is None or len(tests) == 0: + logger.info("There are no supported test cases in this tier " + "for the given scenario") + return 0 + logger.info("\n\n") # blank line + self.print_separator("#") + logger.info("Running tier '%s'" % tier_name) + self.print_separator("#") + logger.debug("\n%s" % tier) + for test in tests: + self.run_test(test, tier_name) + + def run_all(self, tiers): + summary = "" + tiers_to_run = [] + + for tier in tiers.get_tiers(): + if (len(tier.get_tests()) != 0 and + re.search(CONST.__getattribute__('CI_LOOP'), + tier.get_ci_loop()) is not None): + tiers_to_run.append(tier) + summary += ("\n - %s:\n\t %s" + % (tier.get_name(), + tier.get_test_names())) + + logger.info("Tests to be executed:%s" % summary) + for tier in tiers_to_run: + self.run_tier(tier) + + def main(self, **kwargs): + _tiers = tb.TierBuilder( + CONST.__getattribute__('INSTALLER_TYPE'), + CONST.__getattribute__('DEPLOY_SCENARIO'), + CONST.__getattribute__("functest_testcases_yaml")) + + if kwargs['noclean']: + self.clean_flag = False + + if kwargs['report']: + self.report_flag = True - logger.info("Execution exit value: %s" % GlobalVariables.OVERALL_RESULT) - sys.exit(GlobalVariables.OVERALL_RESULT) + try: + if kwargs['test']: + self.source_rc_file() + logger.debug("Test args: %s", kwargs['test']) + if _tiers.get_tier(kwargs['test']): + self.run_tier(_tiers.get_tier(kwargs['test'])) + elif _tiers.get_test(kwargs['test']): + self.run_test(_tiers.get_test(kwargs['test']), + _tiers.get_tier_name(kwargs['test']), + kwargs['test']) + elif kwargs['test'] == "all": + self.run_all(_tiers) + else: + logger.error("Unknown test case or tier '%s', " + "or not supported by " + "the given scenario '%s'." + % (kwargs['test'], + CONST.__getattribute__('DEPLOY_SCENARIO'))) + logger.debug("Available tiers are:\n\n%s", + _tiers) + return Result.EX_ERROR + else: + self.run_all(_tiers) + except BlockingTestFailed: + pass + except Exception: + logger.exception("Failures when running testcase(s)") + self.overall_result = Result.EX_ERROR + + msg = prettytable.PrettyTable( + header_style='upper', padding_width=5, + field_names=['env var', 'value']) + for env_var in ['INSTALLER_TYPE', 'DEPLOY_SCENARIO', 'BUILD_TAG', + 'CI_LOOP']: + msg.add_row([env_var, CONST.__getattribute__(env_var)]) + logger.info("Deployment description: \n\n%s\n", msg) + + msg = prettytable.PrettyTable( + header_style='upper', padding_width=5, + field_names=['test case', 'project', 'tier', 'duration', 'result']) + for test_case in self.executed_test_cases: + result = 'PASS' if(test_case.is_successful( + ) == test_case.EX_OK) else 'FAIL' + msg.add_row([test_case.case_name, test_case.project_name, + _tiers.get_tier_name(test_case.case_name), + test_case.get_duration(), result]) + logger.info("FUNCTEST REPORT: \n\n%s\n", msg) + + logger.info("Execution exit value: %s" % self.overall_result) + return self.overall_result if __name__ == '__main__': - main() + logging.config.fileConfig( + CONST.__getattribute__('dir_functest_logging_cfg')) + parser = RunTestsParser() + args = parser.parse_args(sys.argv[1:]) + runner = Runner() + sys.exit(runner.main(**args).value)