Merge "Added Unit Tests for ci/prepare_env"
[functest.git] / functest / ci / run_tests.py
1 #!/usr/bin/python -u
2 #
3 # Author: Jose Lausuch (jose.lausuch@ericsson.com)
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10
11 import argparse
12 import datetime
13 import enum
14 import importlib
15 import os
16 import re
17 import sys
18
19 import functest.ci.generate_report as generate_report
20 import functest.ci.tier_builder as tb
21 import functest.core.testcase_base as testcase_base
22 import functest.utils.functest_constants as ft_constants
23 import functest.utils.functest_logger as ft_logger
24 import functest.utils.functest_utils as ft_utils
25 import functest.utils.openstack_clean as os_clean
26 import functest.utils.openstack_snapshot as os_snapshot
27 import functest.utils.openstack_utils as os_utils
28 from functest.utils.constants import CONST
29
30
31 """ logging configuration """
32 logger = ft_logger.Logger("run_tests").getLogger()
33
34
35 class Result(enum.Enum):
36     EX_OK = os.EX_OK
37     EX_ERROR = -1
38
39
40 class BlockingTestFailed(Exception):
41     pass
42
43
44 class RunTestsParser(object):
45
46     def __init__(self):
47         self.parser = argparse.ArgumentParser()
48         self.parser.add_argument("-t", "--test", dest="test", action='store',
49                                  help="Test case or tier (group of tests) "
50                                  "to be executed. It will run all the test "
51                                  "if not specified.")
52         self.parser.add_argument("-n", "--noclean", help="Do not clean "
53                                  "OpenStack resources after running each "
54                                  "test (default=false).",
55                                  action="store_true")
56         self.parser.add_argument("-r", "--report", help="Push results to "
57                                  "database (default=false).",
58                                  action="store_true")
59
60     def parse_args(self, argv=[]):
61         return vars(self.parser.parse_args(argv))
62
63
64 class GlobalVariables:
65     EXECUTED_TEST_CASES = []
66     OVERALL_RESULT = Result.EX_OK
67     CLEAN_FLAG = True
68     REPORT_FLAG = False
69
70
71 def print_separator(str, count=45):
72     line = ""
73     for i in range(0, count - 1):
74         line += str
75     logger.info("%s" % line)
76
77
78 def source_rc_file():
79     rc_file = CONST.openstack_creds
80     if not os.path.isfile(rc_file):
81         raise Exception("RC file %s does not exist..." % rc_file)
82     logger.debug("Sourcing the OpenStack RC file...")
83     os_utils.source_credentials(rc_file)
84     for key, value in os.environ.iteritems():
85         if re.search("OS_", key):
86             if key == 'OS_AUTH_URL':
87                 ft_constants.OS_AUTH_URL = value
88                 CONST.OS_AUTH_URL = value
89             elif key == 'OS_USERNAME':
90                 ft_constants.OS_USERNAME = value
91                 CONST.OS_USERNAME = value
92             elif key == 'OS_TENANT_NAME':
93                 ft_constants.OS_TENANT_NAME = value
94                 CONST.OS_TENANT_NAME = value
95             elif key == 'OS_PASSWORD':
96                 ft_constants.OS_PASSWORD = value
97                 CONST.OS_PASSWORD = value
98
99
100 def generate_os_snapshot():
101     os_snapshot.main()
102
103
104 def cleanup():
105     os_clean.main()
106
107
108 def update_test_info(test_name, result, duration):
109     for test in GlobalVariables.EXECUTED_TEST_CASES:
110         if test['test_name'] == test_name:
111             test.update({"result": result,
112                          "duration": duration})
113
114
115 def get_run_dict(testname):
116     try:
117         dict = ft_utils.get_dict_by_test(testname)
118         if not dict:
119             logger.error("Cannot get {}'s config options".format(testname))
120         elif 'run' in dict:
121             return dict['run']
122         return None
123     except Exception:
124         logger.exception("Cannot get {}'s config options".format(testname))
125         return None
126
127
128 def run_test(test, tier_name, testcases=None):
129     result_str = "PASS"
130     start = datetime.datetime.now()
131     test_name = test.get_name()
132     logger.info("\n")  # blank line
133     print_separator("=")
134     logger.info("Running test case '%s'..." % test_name)
135     print_separator("=")
136     logger.debug("\n%s" % test)
137     source_rc_file()
138
139     if test.needs_clean() and GlobalVariables.CLEAN_FLAG:
140         generate_os_snapshot()
141
142     flags = (" -t %s" % (test_name))
143     if GlobalVariables.REPORT_FLAG:
144         flags += " -r"
145
146     result = testcase_base.TestcaseBase.EX_RUN_ERROR
147     run_dict = get_run_dict(test_name)
148     if run_dict:
149         try:
150             module = importlib.import_module(run_dict['module'])
151             cls = getattr(module, run_dict['class'])
152             test_case = cls()
153
154             try:
155                 kwargs = run_dict['args']
156                 result = test_case.run(**kwargs)
157             except KeyError:
158                 result = test_case.run()
159             if result == testcase_base.TestcaseBase.EX_OK:
160                 if GlobalVariables.REPORT_FLAG:
161                     test_case.push_to_db()
162                 result = test_case.check_criteria()
163         except ImportError:
164             logger.exception("Cannot import module {}".format(
165                 run_dict['module']))
166         except AttributeError:
167             logger.exception("Cannot get class {}".format(
168                 run_dict['class']))
169     else:
170         raise Exception("Cannot import the class for the test case.")
171
172     if test.needs_clean() and GlobalVariables.CLEAN_FLAG:
173         cleanup()
174
175     end = datetime.datetime.now()
176     duration = (end - start).seconds
177     duration_str = ("%02d:%02d" % divmod(duration, 60))
178     logger.info("Test execution time: %s" % duration_str)
179
180     if result != 0:
181         logger.error("The test case '%s' failed. " % test_name)
182         GlobalVariables.OVERALL_RESULT = Result.EX_ERROR
183         result_str = "FAIL"
184
185         if test.is_blocking():
186             if not testcases or testcases == "all":
187                 # if it is a single test we don't print the whole results table
188                 update_test_info(test_name, result_str, duration_str)
189                 generate_report.main(GlobalVariables.EXECUTED_TEST_CASES)
190             raise BlockingTestFailed("The test case {} failed and is blocking"
191                                      .format(test.get_name()))
192
193     update_test_info(test_name, result_str, duration_str)
194
195
196 def run_tier(tier):
197     tier_name = tier.get_name()
198     tests = tier.get_tests()
199     if tests is None or len(tests) == 0:
200         logger.info("There are no supported test cases in this tier "
201                     "for the given scenario")
202         return 0
203     logger.info("\n\n")  # blank line
204     print_separator("#")
205     logger.info("Running tier '%s'" % tier_name)
206     print_separator("#")
207     logger.debug("\n%s" % tier)
208     for test in tests:
209         run_test(test, tier_name)
210
211
212 def run_all(tiers):
213     summary = ""
214     tiers_to_run = []
215
216     for tier in tiers.get_tiers():
217         if (len(tier.get_tests()) != 0 and
218                 re.search(CONST.CI_LOOP, tier.get_ci_loop()) is not None):
219             tiers_to_run.append(tier)
220             summary += ("\n    - %s:\n\t   %s"
221                         % (tier.get_name(),
222                            tier.get_test_names()))
223
224     logger.info("Tests to be executed:%s" % summary)
225     GlobalVariables.EXECUTED_TEST_CASES = generate_report.init(tiers_to_run)
226     for tier in tiers_to_run:
227         run_tier(tier)
228
229     generate_report.main(GlobalVariables.EXECUTED_TEST_CASES)
230
231
232 def main(**kwargs):
233
234     CI_INSTALLER_TYPE = CONST.INSTALLER_TYPE
235     CI_SCENARIO = CONST.DEPLOY_SCENARIO
236
237     file = CONST.functest_testcases_yaml
238     _tiers = tb.TierBuilder(CI_INSTALLER_TYPE, CI_SCENARIO, file)
239
240     if kwargs['noclean']:
241         GlobalVariables.CLEAN_FLAG = False
242
243     if kwargs['report']:
244         GlobalVariables.REPORT_FLAG = True
245
246     try:
247         if kwargs['test']:
248             source_rc_file()
249             if _tiers.get_tier(kwargs['test']):
250                 GlobalVariables.EXECUTED_TEST_CASES = generate_report.init(
251                     [_tiers.get_tier(kwargs['test'])])
252                 run_tier(_tiers.get_tier(kwargs['test']))
253             elif _tiers.get_test(kwargs['test']):
254                 run_test(_tiers.get_test(kwargs['test']),
255                          _tiers.get_tier(kwargs['test']),
256                          kwargs['test'])
257             elif kwargs['test'] == "all":
258                 run_all(_tiers)
259             else:
260                 logger.error("Unknown test case or tier '%s', "
261                              "or not supported by "
262                              "the given scenario '%s'."
263                              % (kwargs['test'], CI_SCENARIO))
264                 logger.debug("Available tiers are:\n\n%s"
265                              % _tiers)
266                 return Result.EX_ERROR
267         else:
268             run_all(_tiers)
269     except Exception as e:
270         logger.error(e)
271         GlobalVariables.OVERALL_RESULT = Result.EX_ERROR
272     logger.info("Execution exit value: %s" % GlobalVariables.OVERALL_RESULT)
273     return GlobalVariables.OVERALL_RESULT
274
275
276 if __name__ == '__main__':
277     parser = RunTestsParser()
278     args = parser.parse_args(sys.argv[1:])
279     sys.exit(main(**args).value)