Modify TestCase constructor attributes
[functest.git] / functest / ci / run_tests.py
old mode 100644 (file)
new mode 100755 (executable)
index d6991f6..0ca73f3
@@ -8,51 +8,61 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 #
 
+import argparse
 import datetime
+import enum
 import importlib
 import os
 import re
 import sys
 
-import argparse
-
 import functest.ci.generate_report as generate_report
 import functest.ci.tier_builder as tb
-import functest.core.testcase_base as testcase_base
+import functest.core.testcase as testcase
 import functest.utils.functest_logger as ft_logger
 import functest.utils.functest_utils as ft_utils
-import functest.utils.functest_constants as ft_constants
 import functest.utils.openstack_clean as os_clean
 import functest.utils.openstack_snapshot as os_snapshot
 import functest.utils.openstack_utils as os_utils
-
-
-parser = argparse.ArgumentParser()
-parser.add_argument("-t", "--test", dest="test", action='store',
-                    help="Test case or tier (group of tests) to be executed. "
-                    "It will run all the test if not specified.")
-parser.add_argument("-n", "--noclean", help="Do not clean OpenStack resources"
-                    " after running each test (default=false).",
-                    action="store_true")
-parser.add_argument("-r", "--report", help="Push results to database "
-                    "(default=false).", action="store_true")
-args = parser.parse_args()
+from functest.utils.constants import CONST
 
 
 """ logging configuration """
 logger = ft_logger.Logger("run_tests").getLogger()
 
 
-""" global variables """
-EXEC_SCRIPT = ("%s/functest/ci/exec_test.sh" % ft_constants.FUNCTEST_REPO_DIR)
+class Result(enum.Enum):
+    EX_OK = os.EX_OK
+    EX_ERROR = -1
+
 
-# This will be the return code of this script. If any of the tests fails,
-# this variable will change to -1
+class BlockingTestFailed(Exception):
+    pass
+
+
+class RunTestsParser(object):
+
+    def __init__(self):
+        self.parser = argparse.ArgumentParser()
+        self.parser.add_argument("-t", "--test", dest="test", action='store',
+                                 help="Test case or tier (group of tests) "
+                                 "to be executed. It will run all the test "
+                                 "if not specified.")
+        self.parser.add_argument("-n", "--noclean", help="Do not clean "
+                                 "OpenStack resources after running each "
+                                 "test (default=false).",
+                                 action="store_true")
+        self.parser.add_argument("-r", "--report", help="Push results to "
+                                 "database (default=false).",
+                                 action="store_true")
+
+    def parse_args(self, argv=[]):
+        return vars(self.parser.parse_args(argv))
 
 
 class GlobalVariables:
     EXECUTED_TEST_CASES = []
-    OVERALL_RESULT = 0
+    OVERALL_RESULT = Result.EX_OK
     CLEAN_FLAG = True
     REPORT_FLAG = False
 
@@ -65,26 +75,21 @@ def print_separator(str, count=45):
 
 
 def source_rc_file():
-    rc_file = ft_constants.OPENSTACK_CREDS
+    rc_file = CONST.openstack_creds
     if not os.path.isfile(rc_file):
-        logger.error("RC file %s does not exist..." % rc_file)
-        sys.exit(1)
+        raise Exception("RC file %s does not exist..." % rc_file)
     logger.debug("Sourcing the OpenStack RC file...")
-    creds = os_utils.source_credentials(rc_file)
-    for key, value in creds.iteritems():
+    os_utils.source_credentials(rc_file)
+    for key, value in os.environ.iteritems():
         if re.search("OS_", key):
             if key == 'OS_AUTH_URL':
-                ft_constants.OS_AUTH_URL = value
+                CONST.OS_AUTH_URL = value
             elif key == 'OS_USERNAME':
-                ft_constants.OS_USERNAME = value
+                CONST.OS_USERNAME = value
             elif key == 'OS_TENANT_NAME':
-                ft_constants.OS_TENANT_NAME = value
+                CONST.OS_TENANT_NAME = value
             elif key == 'OS_PASSWORD':
-                ft_constants.OS_PASSWORD = value
-    logger.debug("OS_AUTH_URL:%s" % ft_constants.OS_AUTH_URL)
-    logger.debug("OS_USERNAME:%s" % ft_constants.OS_USERNAME)
-    logger.debug("OS_TENANT_NAME:%s" % ft_constants.OS_TENANT_NAME)
-    logger.debug("OS_PASSWORD:%s" % ft_constants.OS_PASSWORD)
+                CONST.OS_PASSWORD = value
 
 
 def generate_os_snapshot():
@@ -102,7 +107,7 @@ def update_test_info(test_name, result, duration):
                          "duration": duration})
 
 
-def get_run_dict_if_defined(testname):
+def get_run_dict(testname):
     try:
         dict = ft_utils.get_dict_by_test(testname)
         if not dict:
@@ -115,7 +120,7 @@ def get_run_dict_if_defined(testname):
         return None
 
 
-def run_test(test, tier_name):
+def run_test(test, tier_name, testcases=None):
     result_str = "PASS"
     start = datetime.datetime.now()
     test_name = test.get_name()
@@ -126,24 +131,31 @@ def run_test(test, tier_name):
     logger.debug("\n%s" % test)
     source_rc_file()
 
-    if GlobalVariables.CLEAN_FLAG:
+    if test.needs_clean() and GlobalVariables.CLEAN_FLAG:
         generate_os_snapshot()
 
     flags = (" -t %s" % (test_name))
     if GlobalVariables.REPORT_FLAG:
         flags += " -r"
 
-    result = testcase_base.TestcaseBase.EX_RUN_ERROR
-    run_dict = get_run_dict_if_defined(test_name)
+    result = testcase.TestCase.EX_RUN_ERROR
+    run_dict = get_run_dict(test_name)
     if run_dict:
         try:
             module = importlib.import_module(run_dict['module'])
             cls = getattr(module, run_dict['class'])
-            test_case = cls()
-            result = test_case.run()
-            if (result == testcase_base.TestcaseBase.EX_OK and
-                    GlobalVariables.REPORT_FLAG):
-                test_case.push_to_db()
+            test_dict = ft_utils.get_dict_by_test(test_name)
+            test_case = cls(**test_dict)
+
+            try:
+                kwargs = run_dict['args']
+                result = test_case.run(**kwargs)
+            except KeyError:
+                result = test_case.run()
+            if result == testcase.TestCase.EX_OK:
+                if GlobalVariables.REPORT_FLAG:
+                    test_case.push_to_db()
+                result = test_case.check_criteria()
         except ImportError:
             logger.exception("Cannot import module {}".format(
                 run_dict['module']))
@@ -151,14 +163,11 @@ def run_test(test, tier_name):
             logger.exception("Cannot get class {}".format(
                 run_dict['class']))
     else:
-        cmd = ("%s%s" % (EXEC_SCRIPT, flags))
-        logger.info("Executing command {} because {} "
-                    "doesn't implement the new framework".format(
-                        cmd, test_name))
-        result = ft_utils.execute_command(cmd)
+        raise Exception("Cannot import the class for the test case.")
 
-    if GlobalVariables.CLEAN_FLAG:
+    if test.needs_clean() and GlobalVariables.CLEAN_FLAG:
         cleanup()
+
     end = datetime.datetime.now()
     duration = (end - start).seconds
     duration_str = ("%02d:%02d" % divmod(duration, 60))
@@ -166,18 +175,16 @@ def run_test(test, tier_name):
 
     if result != 0:
         logger.error("The test case '%s' failed. " % test_name)
-        OVERALL_RESULT = -1
+        GlobalVariables.OVERALL_RESULT = Result.EX_ERROR
         result_str = "FAIL"
 
         if test.is_blocking():
-            if not args.test or args.test == "all":
-                logger.info("This test case is blocking. Aborting overall "
-                            "execution.")
+            if not testcases or testcases == "all":
                 # if it is a single test we don't print the whole results table
                 update_test_info(test_name, result_str, duration_str)
                 generate_report.main(GlobalVariables.EXECUTED_TEST_CASES)
-            logger.info("Execution exit value: %s" % OVERALL_RESULT)
-            sys.exit(OVERALL_RESULT)
+            raise BlockingTestFailed("The test case {} failed and is blocking"
+                                     .format(test.get_name()))
 
     update_test_info(test_name, result_str, duration_str)
 
@@ -200,17 +207,11 @@ def run_tier(tier):
 
 def run_all(tiers):
     summary = ""
-    BUILD_TAG = ft_constants.CI_BUILD_TAG
-    if BUILD_TAG is not None and re.search("daily", BUILD_TAG) is not None:
-        CI_LOOP = "daily"
-    else:
-        CI_LOOP = "weekly"
-
     tiers_to_run = []
 
     for tier in tiers.get_tiers():
         if (len(tier.get_tests()) != 0 and
-                re.search(CI_LOOP, tier.get_ci_loop()) is not None):
+                re.search(CONST.CI_LOOP, tier.get_ci_loop()) is not None):
             tiers_to_run.append(tier)
             summary += ("\n    - %s:\n\t   %s"
                         % (tier.get_name(),
@@ -224,43 +225,51 @@ def run_all(tiers):
     generate_report.main(GlobalVariables.EXECUTED_TEST_CASES)
 
 
-def main():
+def main(**kwargs):
 
-    CI_INSTALLER_TYPE = ft_constants.CI_INSTALLER_TYPE
-    CI_SCENARIO = ft_constants.CI_SCENARIO
+    CI_INSTALLER_TYPE = CONST.INSTALLER_TYPE
+    CI_SCENARIO = CONST.DEPLOY_SCENARIO
 
-    file = ft_constants.FUNCTEST_TESTCASES_YAML
+    file = CONST.functest_testcases_yaml
     _tiers = tb.TierBuilder(CI_INSTALLER_TYPE, CI_SCENARIO, file)
 
-    if args.noclean:
+    if kwargs['noclean']:
         GlobalVariables.CLEAN_FLAG = False
 
-    if args.report:
+    if kwargs['report']:
         GlobalVariables.REPORT_FLAG = True
 
-    if args.test:
-        source_rc_file()
-        if _tiers.get_tier(args.test):
-            run_tier(_tiers.get_tier(args.test))
-
-        elif _tiers.get_test(args.test):
-            run_test(_tiers.get_test(args.test), _tiers.get_tier(args.test))
-
-        elif args.test == "all":
-            run_all(_tiers)
-
+    try:
+        if kwargs['test']:
+            source_rc_file()
+            if _tiers.get_tier(kwargs['test']):
+                GlobalVariables.EXECUTED_TEST_CASES = generate_report.init(
+                    [_tiers.get_tier(kwargs['test'])])
+                run_tier(_tiers.get_tier(kwargs['test']))
+            elif _tiers.get_test(kwargs['test']):
+                run_test(_tiers.get_test(kwargs['test']),
+                         _tiers.get_tier(kwargs['test']),
+                         kwargs['test'])
+            elif kwargs['test'] == "all":
+                run_all(_tiers)
+            else:
+                logger.error("Unknown test case or tier '%s', "
+                             "or not supported by "
+                             "the given scenario '%s'."
+                             % (kwargs['test'], CI_SCENARIO))
+                logger.debug("Available tiers are:\n\n%s"
+                             % _tiers)
+                return Result.EX_ERROR
         else:
-            logger.error("Unknown test case or tier '%s', or not supported by "
-                         "the given scenario '%s'."
-                         % (args.test, CI_SCENARIO))
-            logger.debug("Available tiers are:\n\n%s"
-                         % _tiers)
-    else:
-        run_all(_tiers)
-
+            run_all(_tiers)
+    except Exception as e:
+        logger.error(e)
+        GlobalVariables.OVERALL_RESULT = Result.EX_ERROR
     logger.info("Execution exit value: %s" % GlobalVariables.OVERALL_RESULT)
-    sys.exit(GlobalVariables.OVERALL_RESULT)
+    return GlobalVariables.OVERALL_RESULT
 
 
 if __name__ == '__main__':
-    main()
+    parser = RunTestsParser()
+    args = parser.parse_args(sys.argv[1:])
+    sys.exit(main(**args).value)