Add case_name as constructor arg
[functest.git] / functest / ci / run_tests.py
1 #!/usr/bin/python -u
2 #
3 # Author: Jose Lausuch (jose.lausuch@ericsson.com)
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10
11 import argparse
12 import datetime
13 import enum
14 import importlib
15 import os
16 import re
17 import sys
18
19 import functest.ci.generate_report as generate_report
20 import functest.ci.tier_builder as tb
21 import functest.core.testcase as testcase
22 import functest.utils.functest_logger as ft_logger
23 import functest.utils.functest_utils as ft_utils
24 import functest.utils.openstack_clean as os_clean
25 import functest.utils.openstack_snapshot as os_snapshot
26 import functest.utils.openstack_utils as os_utils
27 from functest.utils.constants import CONST
28
29
30 """ logging configuration """
31 logger = ft_logger.Logger("run_tests").getLogger()
32
33
34 class Result(enum.Enum):
35     EX_OK = os.EX_OK
36     EX_ERROR = -1
37
38
39 class BlockingTestFailed(Exception):
40     pass
41
42
43 class RunTestsParser(object):
44
45     def __init__(self):
46         self.parser = argparse.ArgumentParser()
47         self.parser.add_argument("-t", "--test", dest="test", action='store',
48                                  help="Test case or tier (group of tests) "
49                                  "to be executed. It will run all the test "
50                                  "if not specified.")
51         self.parser.add_argument("-n", "--noclean", help="Do not clean "
52                                  "OpenStack resources after running each "
53                                  "test (default=false).",
54                                  action="store_true")
55         self.parser.add_argument("-r", "--report", help="Push results to "
56                                  "database (default=false).",
57                                  action="store_true")
58
59     def parse_args(self, argv=[]):
60         return vars(self.parser.parse_args(argv))
61
62
63 class GlobalVariables:
64     EXECUTED_TEST_CASES = []
65     OVERALL_RESULT = Result.EX_OK
66     CLEAN_FLAG = True
67     REPORT_FLAG = False
68
69
70 def print_separator(str, count=45):
71     line = ""
72     for i in range(0, count - 1):
73         line += str
74     logger.info("%s" % line)
75
76
77 def source_rc_file():
78     rc_file = CONST.openstack_creds
79     if not os.path.isfile(rc_file):
80         raise Exception("RC file %s does not exist..." % rc_file)
81     logger.debug("Sourcing the OpenStack RC file...")
82     os_utils.source_credentials(rc_file)
83     for key, value in os.environ.iteritems():
84         if re.search("OS_", key):
85             if key == 'OS_AUTH_URL':
86                 CONST.OS_AUTH_URL = value
87             elif key == 'OS_USERNAME':
88                 CONST.OS_USERNAME = value
89             elif key == 'OS_TENANT_NAME':
90                 CONST.OS_TENANT_NAME = value
91             elif key == 'OS_PASSWORD':
92                 CONST.OS_PASSWORD = value
93
94
95 def generate_os_snapshot():
96     os_snapshot.main()
97
98
99 def cleanup():
100     os_clean.main()
101
102
103 def update_test_info(test_name, result, duration):
104     for test in GlobalVariables.EXECUTED_TEST_CASES:
105         if test['test_name'] == test_name:
106             test.update({"result": result,
107                          "duration": duration})
108
109
110 def get_run_dict(testname):
111     try:
112         dict = ft_utils.get_dict_by_test(testname)
113         if not dict:
114             logger.error("Cannot get {}'s config options".format(testname))
115         elif 'run' in dict:
116             return dict['run']
117         return None
118     except Exception:
119         logger.exception("Cannot get {}'s config options".format(testname))
120         return None
121
122
123 def run_test(test, tier_name, testcases=None):
124     result_str = "PASS"
125     start = datetime.datetime.now()
126     test_name = test.get_name()
127     logger.info("\n")  # blank line
128     print_separator("=")
129     logger.info("Running test case '%s'..." % test_name)
130     print_separator("=")
131     logger.debug("\n%s" % test)
132     source_rc_file()
133
134     if test.needs_clean() and GlobalVariables.CLEAN_FLAG:
135         generate_os_snapshot()
136
137     flags = (" -t %s" % (test_name))
138     if GlobalVariables.REPORT_FLAG:
139         flags += " -r"
140
141     result = testcase.TestCase.EX_RUN_ERROR
142     run_dict = get_run_dict(test_name)
143     if run_dict:
144         try:
145             module = importlib.import_module(run_dict['module'])
146             cls = getattr(module, run_dict['class'])
147             test_case = cls(case_name=test_name)
148
149             try:
150                 kwargs = run_dict['args']
151                 result = test_case.run(**kwargs)
152             except KeyError:
153                 result = test_case.run()
154             if result == testcase.TestCase.EX_OK:
155                 if GlobalVariables.REPORT_FLAG:
156                     test_case.push_to_db()
157                 result = test_case.check_criteria()
158         except ImportError:
159             logger.exception("Cannot import module {}".format(
160                 run_dict['module']))
161         except AttributeError:
162             logger.exception("Cannot get class {}".format(
163                 run_dict['class']))
164     else:
165         raise Exception("Cannot import the class for the test case.")
166
167     if test.needs_clean() and GlobalVariables.CLEAN_FLAG:
168         cleanup()
169
170     end = datetime.datetime.now()
171     duration = (end - start).seconds
172     duration_str = ("%02d:%02d" % divmod(duration, 60))
173     logger.info("Test execution time: %s" % duration_str)
174
175     if result != 0:
176         logger.error("The test case '%s' failed. " % test_name)
177         GlobalVariables.OVERALL_RESULT = Result.EX_ERROR
178         result_str = "FAIL"
179
180         if test.is_blocking():
181             if not testcases or testcases == "all":
182                 # if it is a single test we don't print the whole results table
183                 update_test_info(test_name, result_str, duration_str)
184                 generate_report.main(GlobalVariables.EXECUTED_TEST_CASES)
185             raise BlockingTestFailed("The test case {} failed and is blocking"
186                                      .format(test.get_name()))
187
188     update_test_info(test_name, result_str, duration_str)
189
190
191 def run_tier(tier):
192     tier_name = tier.get_name()
193     tests = tier.get_tests()
194     if tests is None or len(tests) == 0:
195         logger.info("There are no supported test cases in this tier "
196                     "for the given scenario")
197         return 0
198     logger.info("\n\n")  # blank line
199     print_separator("#")
200     logger.info("Running tier '%s'" % tier_name)
201     print_separator("#")
202     logger.debug("\n%s" % tier)
203     for test in tests:
204         run_test(test, tier_name)
205
206
207 def run_all(tiers):
208     summary = ""
209     tiers_to_run = []
210
211     for tier in tiers.get_tiers():
212         if (len(tier.get_tests()) != 0 and
213                 re.search(CONST.CI_LOOP, tier.get_ci_loop()) is not None):
214             tiers_to_run.append(tier)
215             summary += ("\n    - %s:\n\t   %s"
216                         % (tier.get_name(),
217                            tier.get_test_names()))
218
219     logger.info("Tests to be executed:%s" % summary)
220     GlobalVariables.EXECUTED_TEST_CASES = generate_report.init(tiers_to_run)
221     for tier in tiers_to_run:
222         run_tier(tier)
223
224     generate_report.main(GlobalVariables.EXECUTED_TEST_CASES)
225
226
227 def main(**kwargs):
228
229     CI_INSTALLER_TYPE = CONST.INSTALLER_TYPE
230     CI_SCENARIO = CONST.DEPLOY_SCENARIO
231
232     file = CONST.functest_testcases_yaml
233     _tiers = tb.TierBuilder(CI_INSTALLER_TYPE, CI_SCENARIO, file)
234
235     if kwargs['noclean']:
236         GlobalVariables.CLEAN_FLAG = False
237
238     if kwargs['report']:
239         GlobalVariables.REPORT_FLAG = True
240
241     try:
242         if kwargs['test']:
243             source_rc_file()
244             if _tiers.get_tier(kwargs['test']):
245                 GlobalVariables.EXECUTED_TEST_CASES = generate_report.init(
246                     [_tiers.get_tier(kwargs['test'])])
247                 run_tier(_tiers.get_tier(kwargs['test']))
248             elif _tiers.get_test(kwargs['test']):
249                 run_test(_tiers.get_test(kwargs['test']),
250                          _tiers.get_tier(kwargs['test']),
251                          kwargs['test'])
252             elif kwargs['test'] == "all":
253                 run_all(_tiers)
254             else:
255                 logger.error("Unknown test case or tier '%s', "
256                              "or not supported by "
257                              "the given scenario '%s'."
258                              % (kwargs['test'], CI_SCENARIO))
259                 logger.debug("Available tiers are:\n\n%s"
260                              % _tiers)
261                 return Result.EX_ERROR
262         else:
263             run_all(_tiers)
264     except Exception as e:
265         logger.error(e)
266         GlobalVariables.OVERALL_RESULT = Result.EX_ERROR
267     logger.info("Execution exit value: %s" % GlobalVariables.OVERALL_RESULT)
268     return GlobalVariables.OVERALL_RESULT
269
270
271 if __name__ == '__main__':
272     parser = RunTestsParser()
273     args = parser.parse_args(sys.argv[1:])
274     sys.exit(main(**args).value)