Allow dynamically skipping testcases
[functest-xtesting.git] / xtesting / ci / run_tests.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Ericsson AB and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """ The entry of running tests:
11 1) Parses xtesting/ci/testcases.yaml to check which testcase(s) to be run
12 2) Execute the common operations on every testcase (run, push results to db...)
13 3) Return the right status code
14 """
15
16 import argparse
17 import errno
18 import importlib
19 import logging
20 import logging.config
21 import os
22 import re
23 import sys
24 import textwrap
25
26 import enum
27 import pkg_resources
28 import prettytable
29 import six
30 import yaml
31
32 from xtesting.ci import tier_builder
33 from xtesting.core import testcase
34 from xtesting.utils import constants
35 from xtesting.utils import env
36
37 LOGGER = logging.getLogger('xtesting.ci.run_tests')
38
39
40 class Result(enum.Enum):
41     """The overall result in enumerated type"""
42     # pylint: disable=too-few-public-methods
43     EX_OK = os.EX_OK
44     EX_ERROR = -1
45
46
47 class BlockingTestFailed(Exception):
48     """Exception when the blocking test fails"""
49     pass
50
51
52 class TestNotEnabled(Exception):
53     """Exception when the test is not enabled"""
54     pass
55
56
57 class RunTestsParser(object):
58     """Parser to run tests"""
59     # pylint: disable=too-few-public-methods
60
61     def __init__(self):
62         self.parser = argparse.ArgumentParser()
63         self.parser.add_argument("-t", "--test", dest="test", action='store',
64                                  help="Test case or tier (group of tests) "
65                                  "to be executed. It will run all the test "
66                                  "if not specified.")
67         self.parser.add_argument("-n", "--noclean", help="Do not clean "
68                                  "OpenStack resources after running each "
69                                  "test (default=false).",
70                                  action="store_true")
71         self.parser.add_argument("-r", "--report", help="Push results to "
72                                  "database (default=false).",
73                                  action="store_true")
74
75     def parse_args(self, argv=None):
76         """Parse arguments.
77
78         It can call sys.exit if arguments are incorrect.
79
80         Returns:
81             the arguments from cmdline
82         """
83         return vars(self.parser.parse_args(argv))
84
85
86 class Runner(object):
87     """Runner class"""
88
89     def __init__(self):
90         self.executed_test_cases = {}
91         self.overall_result = Result.EX_OK
92         self.clean_flag = True
93         self.report_flag = False
94         self.tiers = tier_builder.TierBuilder(
95             pkg_resources.resource_filename('xtesting', 'ci/testcases.yaml'))
96
97     @staticmethod
98     def source_envfile(rc_file=constants.ENV_FILE):
99         """Source the env file passed as arg"""
100         if not os.path.isfile(rc_file):
101             LOGGER.debug("No env file %s found", rc_file)
102             return
103         with open(rc_file, "r") as rcfd:
104             for line in rcfd:
105                 var = (line.rstrip('"\n').replace('export ', '').split(
106                     "=") if re.search(r'(.*)=(.*)', line) else None)
107                 # The two next lines should be modified as soon as rc_file
108                 # conforms with common rules. Be aware that it could induce
109                 # issues if value starts with '
110                 if var:
111                     key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0])
112                     value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:]))
113                     os.environ[key] = value
114             rcfd.seek(0, 0)
115             LOGGER.info("Sourcing env file %s\n\n%s", rc_file, rcfd.read())
116
117     @staticmethod
118     def get_dict_by_test(testname):
119         # pylint: disable=bad-continuation,missing-docstring
120         with open(pkg_resources.resource_filename(
121                 'xtesting', 'ci/testcases.yaml')) as tyaml:
122             testcases_yaml = yaml.safe_load(tyaml)
123         for dic_tier in testcases_yaml.get("tiers"):
124             for dic_testcase in dic_tier['testcases']:
125                 if dic_testcase['case_name'] == testname:
126                     return dic_testcase
127         LOGGER.error('Project %s is not defined in testcases.yaml', testname)
128         return None
129
130     @staticmethod
131     def get_run_dict(testname):
132         """Obtain the 'run' block of the testcase from testcases.yaml"""
133         try:
134             dic_testcase = Runner.get_dict_by_test(testname)
135             if not dic_testcase:
136                 LOGGER.error("Cannot get %s's config options", testname)
137             elif 'run' in dic_testcase:
138                 return dic_testcase['run']
139             return None
140         except Exception:  # pylint: disable=broad-except
141             LOGGER.exception("Cannot get %s's config options", testname)
142             return None
143
144     def run_test(self, test):
145         """Run one test case"""
146         if not test.is_enabled():
147             raise TestNotEnabled(
148                 "The test case {} is not enabled".format(test.get_name()))
149         result = testcase.TestCase.EX_RUN_ERROR
150         run_dict = self.get_run_dict(test.get_name())
151         if run_dict:
152             try:
153                 module = importlib.import_module(run_dict['module'])
154                 cls = getattr(module, run_dict['class'])
155                 test_dict = Runner.get_dict_by_test(test.get_name())
156                 test_case = cls(**test_dict)
157                 self.executed_test_cases[test.get_name()] = test_case
158                 test_case.check_requirements()
159                 if test_case.is_skipped:
160                     LOGGER.info("Skipping test case '%s'...", test.get_name())
161                     return testcase.TestCase.EX_TESTCASE_SKIPPED
162                 LOGGER.info("Running test case '%s'...", test.get_name())
163                 try:
164                     kwargs = run_dict['args']
165                     test_case.run(**kwargs)
166                 except KeyError:
167                     test_case.run()
168                 if self.report_flag:
169                     test_case.push_to_db()
170                 result = test_case.is_successful()
171                 LOGGER.info("Test result:\n\n%s\n", test_case)
172                 if self.clean_flag:
173                     test_case.clean()
174             except ImportError:
175                 LOGGER.exception("Cannot import module %s", run_dict['module'])
176             except AttributeError:
177                 LOGGER.exception("Cannot get class %s", run_dict['class'])
178         else:
179             raise Exception("Cannot import the class for the test case.")
180         return result
181
182     def run_tier(self, tier):
183         """Run one tier"""
184         tests = tier.get_tests()
185         if not tests:
186             LOGGER.info("There are no supported test cases in this tier "
187                         "for the given scenario")
188             self.overall_result = Result.EX_ERROR
189         else:
190             for test in tests:
191                 self.run_test(test)
192                 test_case = self.executed_test_cases[test.get_name()]
193                 if test_case.is_successful() == test_case.EX_TESTCASE_FAILED:
194                     LOGGER.error("The test case '%s' failed.", test.get_name())
195                     self.overall_result = Result.EX_ERROR
196                     if test.is_blocking():
197                         raise BlockingTestFailed(
198                             "The test case {} failed and is blocking".format(
199                                 test.get_name()))
200         return self.overall_result
201
202     def run_all(self):
203         """Run all available testcases"""
204         tiers_to_run = []
205         msg = prettytable.PrettyTable(
206             header_style='upper', padding_width=5,
207             field_names=['tiers', 'order', 'description',
208                          'testcases'])
209         for tier in self.tiers.get_tiers():
210             if tier.get_tests():
211                 tiers_to_run.append(tier)
212                 msg.add_row([tier.get_name(), tier.get_order(),
213                              textwrap.fill(tier.description, width=40),
214                              textwrap.fill(' '.join([str(x.get_name(
215                                  )) for x in tier.get_tests()]), width=40)])
216         LOGGER.info("TESTS TO BE EXECUTED:\n\n%s\n", msg)
217         for tier in tiers_to_run:
218             self.run_tier(tier)
219
220     def main(self, **kwargs):
221         """Entry point of class Runner"""
222         if 'noclean' in kwargs:
223             self.clean_flag = not kwargs['noclean']
224         if 'report' in kwargs:
225             self.report_flag = kwargs['report']
226         try:
227             LOGGER.info("Deployment description:\n\n%s\n", env.string())
228             self.source_envfile()
229             if 'test' in kwargs:
230                 LOGGER.debug("Test args: %s", kwargs['test'])
231                 if self.tiers.get_tier(kwargs['test']):
232                     self.run_tier(self.tiers.get_tier(kwargs['test']))
233                 elif self.tiers.get_test(kwargs['test']):
234                     result = self.run_test(
235                         self.tiers.get_test(kwargs['test']))
236                     if result != testcase.TestCase.EX_OK:
237                         LOGGER.error("The test case '%s' failed.",
238                                      kwargs['test'])
239                         self.overall_result = Result.EX_ERROR
240                 elif kwargs['test'] == "all":
241                     self.run_all()
242                 else:
243                     LOGGER.error("Unknown test case or tier '%s', or not "
244                                  "supported by the given scenario '%s'.",
245                                  kwargs['test'],
246                                  env.get('DEPLOY_SCENARIO'))
247                     LOGGER.debug("Available tiers are:\n\n%s",
248                                  self.tiers)
249                     return Result.EX_ERROR
250             else:
251                 self.run_all()
252         except BlockingTestFailed:
253             pass
254         except Exception:  # pylint: disable=broad-except
255             LOGGER.exception("Failures when running testcase(s)")
256             self.overall_result = Result.EX_ERROR
257         if not self.tiers.get_test(kwargs['test']):
258             self.summary(self.tiers.get_tier(kwargs['test']))
259         LOGGER.info("Execution exit value: %s", self.overall_result)
260         return self.overall_result
261
262     def summary(self, tier=None):
263         """To generate xtesting report showing the overall results"""
264         msg = prettytable.PrettyTable(
265             header_style='upper', padding_width=5,
266             field_names=['test case', 'project', 'tier',
267                          'duration', 'result'])
268         tiers = [tier] if tier else self.tiers.get_tiers()
269         for each_tier in tiers:
270             for test in each_tier.get_tests():
271                 try:
272                     test_case = self.executed_test_cases[test.get_name()]
273                 except KeyError:
274                     msg.add_row([test.get_name(), test.get_project(),
275                                  each_tier.get_name(), "00:00", "SKIP"])
276                 else:
277                     if test_case.is_skipped:
278                         result = 'SKIP'
279                     else:
280                         result = 'PASS' if(test_case.is_successful(
281                             ) == test_case.EX_OK) else 'FAIL'
282                     msg.add_row(
283                         [test_case.case_name, test_case.project_name,
284                          self.tiers.get_tier_name(test_case.case_name),
285                          test_case.get_duration(), result])
286             for test in each_tier.get_skipped_test():
287                 msg.add_row([test.get_name(), test.get_project(),
288                              each_tier.get_name(), "00:00", "SKIP"])
289         LOGGER.info("Xtesting report:\n\n%s\n", msg)
290
291
292 def main():
293     """Entry point"""
294     try:
295         os.makedirs('/var/lib/xtesting/results/')
296     except OSError as ex:
297         if ex.errno != errno.EEXIST:
298             six.print_("Cannot create /var/lib/xtesting/results/")
299             return testcase.TestCase.EX_RUN_ERROR
300     logging.config.fileConfig(pkg_resources.resource_filename(
301         'xtesting', 'ci/logging.ini'))
302     logging.captureWarnings(True)
303     parser = RunTestsParser()
304     args = parser.parse_args(sys.argv[1:])
305     runner = Runner()
306     return runner.main(**args).value