# http://www.apache.org/licenses/LICENSE-2.0
#
+import argparse
import datetime
import importlib
import os
import re
import sys
-import argparse
-
import functest.ci.generate_report as generate_report
import functest.ci.tier_builder as tb
-import functest.core.TestCasesBase as TestCasesBase
+import functest.core.testcase_base as testcase_base
+import functest.utils.functest_constants as ft_constants
import functest.utils.functest_logger as ft_logger
import functest.utils.functest_utils as ft_utils
import functest.utils.openstack_clean as os_clean
import functest.utils.openstack_snapshot as os_snapshot
import functest.utils.openstack_utils as os_utils
-
+from functest.utils.constants import CONST
parser = argparse.ArgumentParser()
parser.add_argument("-t", "--test", dest="test", action='store',
""" global variables """
-EXEC_SCRIPT = ("%s/functest/ci/exec_test.sh" % ft_utils.FUNCTEST_REPO)
-CLEAN_FLAG = True
-REPORT_FLAG = False
-EXECUTED_TEST_CASES = []
+EXEC_SCRIPT = ("%s/functest/ci/exec_test.sh" % CONST.dir_repo_functest)
# This will be the return code of this script. If any of the tests fails,
# this variable will change to -1
-OVERALL_RESULT = 0
+
+
+class GlobalVariables:
+ EXECUTED_TEST_CASES = []
+ OVERALL_RESULT = 0
+ CLEAN_FLAG = True
+ REPORT_FLAG = False
def print_separator(str, count=45):
def source_rc_file():
- rc_file = os.getenv('creds')
+ rc_file = CONST.openstack_creds
if not os.path.isfile(rc_file):
logger.error("RC file %s does not exist..." % rc_file)
sys.exit(1)
logger.debug("Sourcing the OpenStack RC file...")
- os_utils.source_credentials(rc_file)
+ creds = os_utils.source_credentials(rc_file)
+ for key, value in creds.iteritems():
+ if re.search("OS_", key):
+ if key == 'OS_AUTH_URL':
+ ft_constants.OS_AUTH_URL = value
+ CONST.OS_AUTH_URL = value
+ elif key == 'OS_USERNAME':
+ ft_constants.OS_USERNAME = value
+ CONST.OS_USERNAME = value
+ elif key == 'OS_TENANT_NAME':
+ ft_constants.OS_TENANT_NAME = value
+ CONST.OS_TENANT_NAME = value
+ elif key == 'OS_PASSWORD':
+ ft_constants.OS_PASSWORD = value
+ CONST.OS_PASSWORD = value
+ logger.debug("OS_AUTH_URL:%s" % CONST.OS_AUTH_URL)
+ logger.debug("OS_USERNAME:%s" % CONST.OS_USERNAME)
+ logger.debug("OS_TENANT_NAME:%s" % CONST.OS_TENANT_NAME)
+ logger.debug("OS_PASSWORD:%s" % CONST.OS_PASSWORD)
def generate_os_snapshot():
def update_test_info(test_name, result, duration):
- for test in EXECUTED_TEST_CASES:
+ for test in GlobalVariables.EXECUTED_TEST_CASES:
if test['test_name'] == test_name:
test.update({"result": result,
"duration": duration})
def run_test(test, tier_name):
- global OVERALL_RESULT, EXECUTED_TEST_CASES
result_str = "PASS"
start = datetime.datetime.now()
test_name = test.get_name()
logger.info("Running test case '%s'..." % test_name)
print_separator("=")
logger.debug("\n%s" % test)
+ source_rc_file()
- if CLEAN_FLAG:
+ if GlobalVariables.CLEAN_FLAG:
generate_os_snapshot()
flags = (" -t %s" % (test_name))
- if REPORT_FLAG:
+ if GlobalVariables.REPORT_FLAG:
flags += " -r"
- result = TestCasesBase.TestCasesBase.EX_RUN_ERROR
+ result = testcase_base.TestcaseBase.EX_RUN_ERROR
run_dict = get_run_dict_if_defined(test_name)
if run_dict:
try:
cls = getattr(module, run_dict['class'])
test_case = cls()
result = test_case.run()
- if result != TestCasesBase.TestCasesBase.EX_SKIP and REPORT_FLAG:
- test_case.push_to_db()
+ if result == testcase_base.TestcaseBase.EX_OK:
+ if GlobalVariables.REPORT_FLAG:
+ test_case.push_to_db()
+ result = test_case.check_criteria()
except ImportError:
logger.exception("Cannot import module {}".format(
run_dict['module']))
cmd, test_name))
result = ft_utils.execute_command(cmd)
- if CLEAN_FLAG:
+ if GlobalVariables.CLEAN_FLAG:
cleanup()
end = datetime.datetime.now()
duration = (end - start).seconds
"execution.")
# if it is a single test we don't print the whole results table
update_test_info(test_name, result_str, duration_str)
- generate_report.main(EXECUTED_TEST_CASES)
+ generate_report.main(GlobalVariables.EXECUTED_TEST_CASES)
logger.info("Execution exit value: %s" % OVERALL_RESULT)
sys.exit(OVERALL_RESULT)
def run_all(tiers):
- global EXECUTED_TEST_CASES
summary = ""
- BUILD_TAG = os.getenv('BUILD_TAG')
- if BUILD_TAG is not None and re.search("daily", BUILD_TAG) is not None:
- CI_LOOP = "daily"
- else:
- CI_LOOP = "weekly"
-
tiers_to_run = []
for tier in tiers.get_tiers():
if (len(tier.get_tests()) != 0 and
- re.search(CI_LOOP, tier.get_ci_loop()) is not None):
+ re.search(CONST.CI_LOOP, tier.get_ci_loop()) is not None):
tiers_to_run.append(tier)
summary += ("\n - %s:\n\t %s"
% (tier.get_name(),
tier.get_test_names()))
logger.info("Tests to be executed:%s" % summary)
- EXECUTED_TEST_CASES = generate_report.init(tiers_to_run)
+ GlobalVariables.EXECUTED_TEST_CASES = generate_report.init(tiers_to_run)
for tier in tiers_to_run:
run_tier(tier)
- generate_report.main(EXECUTED_TEST_CASES)
+ generate_report.main(GlobalVariables.EXECUTED_TEST_CASES)
def main():
- global CLEAN_FLAG
- global REPORT_FLAG
- CI_INSTALLER_TYPE = os.getenv('INSTALLER_TYPE')
- CI_SCENARIO = os.getenv('DEPLOY_SCENARIO')
+ CI_INSTALLER_TYPE = CONST.INSTALLER_TYPE
+ CI_SCENARIO = CONST.DEPLOY_SCENARIO
- file = ft_utils.get_testcases_file()
+ file = CONST.functest_testcases_yaml
_tiers = tb.TierBuilder(CI_INSTALLER_TYPE, CI_SCENARIO, file)
if args.noclean:
- CLEAN_FLAG = False
+ GlobalVariables.CLEAN_FLAG = False
if args.report:
- REPORT_FLAG = True
+ GlobalVariables.REPORT_FLAG = True
if args.test:
source_rc_file()
else:
run_all(_tiers)
- logger.info("Execution exit value: %s" % OVERALL_RESULT)
- sys.exit(OVERALL_RESULT)
+ logger.info("Execution exit value: %s" % GlobalVariables.OVERALL_RESULT)
+ sys.exit(GlobalVariables.OVERALL_RESULT)
+
if __name__ == '__main__':
main()