Merge "Bugfix: fix the wrong path for security scan"
[functest.git] / functest / ci / run_tests.py
1 #!/usr/bin/python -u
2 #
3 # Author: Jose Lausuch (jose.lausuch@ericsson.com)
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10
11 import argparse
12 import datetime
13 import importlib
14 import os
15 import re
16 import sys
17
18 import functest.ci.generate_report as generate_report
19 import functest.ci.tier_builder as tb
20 import functest.core.testcase_base as testcase_base
21 import functest.utils.functest_constants as ft_constants
22 import functest.utils.functest_logger as ft_logger
23 import functest.utils.functest_utils as ft_utils
24 import functest.utils.openstack_clean as os_clean
25 import functest.utils.openstack_snapshot as os_snapshot
26 import functest.utils.openstack_utils as os_utils
27 from functest.utils.constants import CONST
28
29 parser = argparse.ArgumentParser()
30 parser.add_argument("-t", "--test", dest="test", action='store',
31                     help="Test case or tier (group of tests) to be executed. "
32                     "It will run all the test if not specified.")
33 parser.add_argument("-n", "--noclean", help="Do not clean OpenStack resources"
34                     " after running each test (default=false).",
35                     action="store_true")
36 parser.add_argument("-r", "--report", help="Push results to database "
37                     "(default=false).", action="store_true")
38 args = parser.parse_args()
39
40
41 """ logging configuration """
42 logger = ft_logger.Logger("run_tests").getLogger()
43
44
45 """ global variables """
46 EXEC_SCRIPT = ("%s/functest/ci/exec_test.sh" % CONST.dir_repo_functest)
47
48 # This will be the return code of this script. If any of the tests fails,
49 # this variable will change to -1
50
51
52 class GlobalVariables:
53     EXECUTED_TEST_CASES = []
54     OVERALL_RESULT = 0
55     CLEAN_FLAG = True
56     REPORT_FLAG = False
57
58
59 def print_separator(str, count=45):
60     line = ""
61     for i in range(0, count - 1):
62         line += str
63     logger.info("%s" % line)
64
65
66 def source_rc_file():
67     rc_file = CONST.openstack_creds
68     if not os.path.isfile(rc_file):
69         logger.error("RC file %s does not exist..." % rc_file)
70         sys.exit(1)
71     logger.debug("Sourcing the OpenStack RC file...")
72     creds = os_utils.source_credentials(rc_file)
73     for key, value in creds.iteritems():
74         if re.search("OS_", key):
75             if key == 'OS_AUTH_URL':
76                 ft_constants.OS_AUTH_URL = value
77                 CONST.OS_AUTH_URL = value
78             elif key == 'OS_USERNAME':
79                 ft_constants.OS_USERNAME = value
80                 CONST.OS_USERNAME = value
81             elif key == 'OS_TENANT_NAME':
82                 ft_constants.OS_TENANT_NAME = value
83                 CONST.OS_TENANT_NAME = value
84             elif key == 'OS_PASSWORD':
85                 ft_constants.OS_PASSWORD = value
86                 CONST.OS_PASSWORD = value
87     logger.debug("OS_AUTH_URL:%s" % CONST.OS_AUTH_URL)
88     logger.debug("OS_USERNAME:%s" % CONST.OS_USERNAME)
89     logger.debug("OS_TENANT_NAME:%s" % CONST.OS_TENANT_NAME)
90     logger.debug("OS_PASSWORD:%s" % CONST.OS_PASSWORD)
91
92
93 def generate_os_snapshot():
94     os_snapshot.main()
95
96
97 def cleanup():
98     os_clean.main()
99
100
101 def update_test_info(test_name, result, duration):
102     for test in GlobalVariables.EXECUTED_TEST_CASES:
103         if test['test_name'] == test_name:
104             test.update({"result": result,
105                          "duration": duration})
106
107
108 def get_run_dict_if_defined(testname):
109     try:
110         dict = ft_utils.get_dict_by_test(testname)
111         if not dict:
112             logger.error("Cannot get {}'s config options".format(testname))
113         elif 'run' in dict:
114             return dict['run']
115         return None
116     except Exception:
117         logger.exception("Cannot get {}'s config options".format(testname))
118         return None
119
120
121 def run_test(test, tier_name):
122     result_str = "PASS"
123     start = datetime.datetime.now()
124     test_name = test.get_name()
125     logger.info("\n")  # blank line
126     print_separator("=")
127     logger.info("Running test case '%s'..." % test_name)
128     print_separator("=")
129     logger.debug("\n%s" % test)
130     source_rc_file()
131
132     if GlobalVariables.CLEAN_FLAG:
133         generate_os_snapshot()
134
135     flags = (" -t %s" % (test_name))
136     if GlobalVariables.REPORT_FLAG:
137         flags += " -r"
138
139     result = testcase_base.TestcaseBase.EX_RUN_ERROR
140     run_dict = get_run_dict_if_defined(test_name)
141     if run_dict:
142         try:
143             module = importlib.import_module(run_dict['module'])
144             cls = getattr(module, run_dict['class'])
145             test_case = cls()
146             result = test_case.run()
147             if result == testcase_base.TestcaseBase.EX_OK:
148                 if GlobalVariables.REPORT_FLAG:
149                     test_case.push_to_db()
150                 result = test_case.check_criteria()
151         except ImportError:
152             logger.exception("Cannot import module {}".format(
153                 run_dict['module']))
154         except AttributeError:
155             logger.exception("Cannot get class {}".format(
156                 run_dict['class']))
157     else:
158         cmd = ("%s%s" % (EXEC_SCRIPT, flags))
159         logger.info("Executing command {} because {} "
160                     "doesn't implement the new framework".format(
161                         cmd, test_name))
162         result = ft_utils.execute_command(cmd)
163
164     if GlobalVariables.CLEAN_FLAG:
165         cleanup()
166     end = datetime.datetime.now()
167     duration = (end - start).seconds
168     duration_str = ("%02d:%02d" % divmod(duration, 60))
169     logger.info("Test execution time: %s" % duration_str)
170
171     if result != 0:
172         logger.error("The test case '%s' failed. " % test_name)
173         OVERALL_RESULT = -1
174         result_str = "FAIL"
175
176         if test.is_blocking():
177             if not args.test or args.test == "all":
178                 logger.info("This test case is blocking. Aborting overall "
179                             "execution.")
180                 # if it is a single test we don't print the whole results table
181                 update_test_info(test_name, result_str, duration_str)
182                 generate_report.main(GlobalVariables.EXECUTED_TEST_CASES)
183             logger.info("Execution exit value: %s" % OVERALL_RESULT)
184             sys.exit(OVERALL_RESULT)
185
186     update_test_info(test_name, result_str, duration_str)
187
188
189 def run_tier(tier):
190     tier_name = tier.get_name()
191     tests = tier.get_tests()
192     if tests is None or len(tests) == 0:
193         logger.info("There are no supported test cases in this tier "
194                     "for the given scenario")
195         return 0
196     logger.info("\n\n")  # blank line
197     print_separator("#")
198     logger.info("Running tier '%s'" % tier_name)
199     print_separator("#")
200     logger.debug("\n%s" % tier)
201     for test in tests:
202         run_test(test, tier_name)
203
204
205 def run_all(tiers):
206     summary = ""
207     tiers_to_run = []
208
209     for tier in tiers.get_tiers():
210         if (len(tier.get_tests()) != 0 and
211                 re.search(CONST.CI_LOOP, tier.get_ci_loop()) is not None):
212             tiers_to_run.append(tier)
213             summary += ("\n    - %s:\n\t   %s"
214                         % (tier.get_name(),
215                            tier.get_test_names()))
216
217     logger.info("Tests to be executed:%s" % summary)
218     GlobalVariables.EXECUTED_TEST_CASES = generate_report.init(tiers_to_run)
219     for tier in tiers_to_run:
220         run_tier(tier)
221
222     generate_report.main(GlobalVariables.EXECUTED_TEST_CASES)
223
224
225 def main():
226
227     CI_INSTALLER_TYPE = CONST.INSTALLER_TYPE
228     CI_SCENARIO = CONST.DEPLOY_SCENARIO
229
230     file = CONST.functest_testcases_yaml
231     _tiers = tb.TierBuilder(CI_INSTALLER_TYPE, CI_SCENARIO, file)
232
233     if args.noclean:
234         GlobalVariables.CLEAN_FLAG = False
235
236     if args.report:
237         GlobalVariables.REPORT_FLAG = True
238
239     if args.test:
240         source_rc_file()
241         if _tiers.get_tier(args.test):
242             run_tier(_tiers.get_tier(args.test))
243
244         elif _tiers.get_test(args.test):
245             run_test(_tiers.get_test(args.test), _tiers.get_tier(args.test))
246
247         elif args.test == "all":
248             run_all(_tiers)
249
250         else:
251             logger.error("Unknown test case or tier '%s', or not supported by "
252                          "the given scenario '%s'."
253                          % (args.test, CI_SCENARIO))
254             logger.debug("Available tiers are:\n\n%s"
255                          % _tiers)
256     else:
257         run_all(_tiers)
258
259     logger.info("Execution exit value: %s" % GlobalVariables.OVERALL_RESULT)
260     sys.exit(GlobalVariables.OVERALL_RESULT)
261
262
263 if __name__ == '__main__':
264     main()