b8c4a8033bafb3a56631aecaf7fcf261c6345a30
[functest-xtesting.git] / xtesting / ci / run_tests.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Ericsson AB and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """ The entry of running tests:
11 1) Parses xtesting/ci/testcases.yaml to check which testcase(s) to be run
12 2) Execute the common operations on every testcase (run, push results to db...)
13 3) Return the right status code
14 """
15
16 import argparse
17 import errno
18 import logging
19 import logging.config
20 import os
21 import re
22 import sys
23 import textwrap
24
25 import enum
26 import prettytable
27 from stevedore import driver
28 import yaml
29
30 from xtesting.ci import tier_builder
31 from xtesting.core import testcase
32 from xtesting.utils import config
33 from xtesting.utils import constants
34 from xtesting.utils import env
35
36 LOGGER = logging.getLogger('xtesting.ci.run_tests')
37
38
39 class Result(enum.Enum):
40     """The overall result in enumerated type"""
41     # pylint: disable=too-few-public-methods
42     EX_OK = os.EX_OK
43     EX_ERROR = -1
44
45
46 class BlockingTestFailed(Exception):
47     """Exception when the blocking test fails"""
48
49
50 class RunTestsParser():
51     """Parser to run tests"""
52     # pylint: disable=too-few-public-methods
53
54     def __init__(self):
55         self.parser = argparse.ArgumentParser()
56         self.parser.add_argument("-t", "--test", dest="test", action='store',
57                                  help="Test case or tier (group of tests) "
58                                  "to be executed. It will run all the test "
59                                  "if not specified.")
60         self.parser.add_argument("-n", "--noclean", help="Do not clean "
61                                  "OpenStack resources after running each "
62                                  "test (default=false).",
63                                  action="store_true")
64         self.parser.add_argument("-r", "--report", help="Push results to "
65                                  "database (default=false).",
66                                  action="store_true")
67         self.parser.add_argument("-p", "--push", help="Push artifacts to "
68                                  "S3 repository (default=false).",
69                                  action="store_true")
70
71     def parse_args(self, argv=None):
72         """Parse arguments.
73
74         It can call sys.exit if arguments are incorrect.
75
76         Returns:
77             the arguments from cmdline
78         """
79         return vars(self.parser.parse_args(argv))
80
81
82 class Runner():
83     """Runner class"""
84
85     def __init__(self):
86         self.executed_test_cases = {}
87         self.overall_result = Result.EX_OK
88         self.clean_flag = True
89         self.report_flag = False
90         self.push_flag = False
91         self.tiers = tier_builder.TierBuilder(config.get_xtesting_config(
92             constants.TESTCASE_DESCRIPTION,
93             constants.TESTCASE_DESCRIPTION_DEFAULT))
94
95     @staticmethod
96     def source_envfile(rc_file=constants.ENV_FILE):
97         """Source the env file passed as arg"""
98         if not os.path.isfile(rc_file):
99             LOGGER.debug("No env file %s found", rc_file)
100             return
101         with open(rc_file, "r", encoding='utf-8') as rcfd:
102             for line in rcfd:
103                 var = (line.rstrip('"\n').replace('export ', '').split(
104                     "=") if re.search(r'(.*)=(.*)', line) else None)
105                 # The two next lines should be modified as soon as rc_file
106                 # conforms with common rules. Be aware that it could induce
107                 # issues if value starts with '
108                 if var:
109                     key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0])
110                     value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:]))
111                     os.environ[key] = value
112             rcfd.seek(0, 0)
113             LOGGER.debug("Sourcing env file %s\n\n%s", rc_file, rcfd.read())
114
115     @staticmethod
116     def get_dict_by_test(testname):
117         # pylint: disable=missing-docstring
118         with open(config.get_xtesting_config(
119                 constants.TESTCASE_DESCRIPTION,
120                 constants.TESTCASE_DESCRIPTION_DEFAULT),
121                 encoding='utf-8') as tyaml:
122             testcases_yaml = yaml.safe_load(tyaml)
123         for dic_tier in testcases_yaml.get("tiers"):
124             for dic_testcase in dic_tier['testcases']:
125                 if dic_testcase['case_name'] == testname:
126                     return dic_testcase
127         LOGGER.error('Project %s is not defined in testcases.yaml', testname)
128         return None
129
130     @staticmethod
131     def get_run_dict(testname):
132         """Obtain the 'run' block of the testcase from testcases.yaml"""
133         try:
134             dic_testcase = Runner.get_dict_by_test(testname)
135             if not dic_testcase:
136                 LOGGER.error("Cannot get %s's config options", testname)
137             elif 'run' in dic_testcase:
138                 return dic_testcase['run']
139             return None
140         except Exception:  # pylint: disable=broad-except
141             LOGGER.exception("Cannot get %s's config options", testname)
142             return None
143
144     def run_test(self, test):
145         """Run one test case"""
146         # pylint: disable=too-many-branches,broad-exception-raised
147         if not test.is_enabled() or test.is_skipped():
148             msg = prettytable.PrettyTable(
149                 header_style='upper', padding_width=5,
150                 field_names=['test case', 'project', 'duration',
151                              'result'])
152             msg.add_row([test.get_name(), test.get_project(), "00:00", "SKIP"])
153             LOGGER.info("Test result:\n\n%s\n", msg)
154             return testcase.TestCase.EX_TESTCASE_SKIPPED
155         result = testcase.TestCase.EX_TESTCASE_FAILED
156         run_dict = self.get_run_dict(test.get_name())
157         if run_dict:
158             try:
159                 LOGGER.info("Loading test case '%s'...", test.get_name())
160                 test_dict = Runner.get_dict_by_test(test.get_name())
161                 test_case = driver.DriverManager(
162                     namespace='xtesting.testcase',
163                     name=run_dict['name'],
164                     invoke_on_load=True,
165                     invoke_kwds=test_dict).driver
166                 self.executed_test_cases[test.get_name()] = test_case
167                 test_case.check_requirements()
168                 if test_case.is_skipped:
169                     LOGGER.info("Skipping test case '%s'...", test.get_name())
170                     LOGGER.info("Test result:\n\n%s\n", test_case)
171                     return testcase.TestCase.EX_TESTCASE_SKIPPED
172                 if 'env' in run_dict:
173                     LOGGER.info(
174                         "Setting env for test case '%s'...", test.get_name())
175                     for key, value in run_dict['env'].items():
176                         os.environ[key] = str(value)
177                 LOGGER.info("Running test case '%s'...", test.get_name())
178                 try:
179                     kwargs = run_dict['args']
180                     test_case.run(**kwargs)
181                 except KeyError:
182                     test_case.run()
183                 result = test_case.is_successful()
184                 LOGGER.info("Test result:\n\n%s\n", test_case)
185                 if self.clean_flag:
186                     test_case.clean()
187                 if self.push_flag:
188                     test_case.publish_artifacts()
189                 if self.report_flag:
190                     test_case.push_to_db()
191             except ImportError:
192                 LOGGER.exception("Cannot import module %s", run_dict['module'])
193             except AttributeError:
194                 LOGGER.exception("Cannot get class %s", run_dict['class'])
195             except Exception:  # pylint: disable=broad-except
196                 LOGGER.exception(
197                     "\n\nPlease fix the testcase %s.\n"
198                     "All exceptions should be caught by the testcase instead!"
199                     "\n\n",
200                     test.get_name())
201         else:
202             raise Exception("Cannot import the class for the test case.")
203         return result
204
205     def run_tier(self, tier):
206         """Run one tier"""
207         tests = tier.get_tests()
208         if not tests:
209             LOGGER.info("There are no supported test cases in this tier "
210                         "for the given scenario")
211             self.overall_result = Result.EX_ERROR
212         else:
213             for test in tests:
214                 self.run_test(test)
215                 test_case = self.executed_test_cases[test.get_name()]
216                 if test_case.is_successful() == test_case.EX_TESTCASE_FAILED:
217                     LOGGER.error("The test case '%s' failed.", test.get_name())
218                     self.overall_result = Result.EX_ERROR
219                     if test.is_blocking():
220                         raise BlockingTestFailed(
221                             f"The test case {test.get_name()} "
222                             "failed and is blocking")
223         return self.overall_result
224
225     def run_all(self):
226         """Run all available testcases"""
227         tiers_to_run = []
228         msg = prettytable.PrettyTable(
229             header_style='upper', padding_width=5,
230             field_names=['tiers', 'description', 'testcases'])
231         for tier in self.tiers.get_tiers():
232             if tier.get_tests():
233                 tiers_to_run.append(tier)
234                 msg.add_row([tier.get_name(),
235                              textwrap.fill(tier.description, width=40),
236                              textwrap.fill(' '.join([str(x.get_name(
237                                  )) for x in tier.get_tests()]), width=40)])
238         LOGGER.info("TESTS TO BE EXECUTED:\n\n%s\n", msg)
239         for tier in tiers_to_run:
240             self.run_tier(tier)
241
242     def main(self, **kwargs):  # pylint: disable=too-many-branches
243         """Entry point of class Runner"""
244         if 'noclean' in kwargs:
245             self.clean_flag = not kwargs['noclean']
246         if 'report' in kwargs:
247             self.report_flag = kwargs['report']
248         if 'push' in kwargs:
249             self.push_flag = kwargs['push']
250         try:
251             LOGGER.info("Deployment description:\n\n%s\n", env.string())
252             self.source_envfile()
253             if 'test' in kwargs:
254                 LOGGER.debug("Test args: %s", kwargs['test'])
255                 if self.tiers.get_tier(kwargs['test']):
256                     self.run_tier(self.tiers.get_tier(kwargs['test']))
257                 elif self.tiers.get_test(kwargs['test']):
258                     result = self.run_test(
259                         self.tiers.get_test(kwargs['test']))
260                     if result == testcase.TestCase.EX_TESTCASE_FAILED:
261                         LOGGER.error("The test case '%s' failed.",
262                                      kwargs['test'])
263                         self.overall_result = Result.EX_ERROR
264                 elif kwargs['test'] == "all":
265                     self.run_all()
266                 else:
267                     LOGGER.error("Unknown test case or tier '%s', or not "
268                                  "supported by the given scenario '%s'.",
269                                  kwargs['test'],
270                                  env.get('DEPLOY_SCENARIO'))
271                     LOGGER.debug("Available tiers are:\n\n%s",
272                                  self.tiers)
273                     return Result.EX_ERROR
274             else:
275                 self.run_all()
276         except BlockingTestFailed:
277             pass
278         except Exception:  # pylint: disable=broad-except
279             LOGGER.exception("Failures when running testcase(s)")
280             self.overall_result = Result.EX_ERROR
281         if not self.tiers.get_test(kwargs['test']):
282             self.summary(self.tiers.get_tier(kwargs['test']))
283         LOGGER.info("Execution exit value: %s", self.overall_result)
284         return self.overall_result
285
286     def summary(self, tier=None):
287         """To generate xtesting report showing the overall results"""
288         msg = prettytable.PrettyTable(
289             header_style='upper', padding_width=5,
290             field_names=['test case', 'project', 'tier',
291                          'duration', 'result'])
292         tiers = [tier] if tier else self.tiers.get_tiers()
293         for each_tier in tiers:
294             for test in each_tier.get_tests():
295                 try:
296                     test_case = self.executed_test_cases[test.get_name()]
297                 except KeyError:
298                     msg.add_row([test.get_name(), test.get_project(),
299                                  each_tier.get_name(), "00:00", "SKIP"])
300                 else:
301                     if test_case.is_skipped:
302                         result = 'SKIP'
303                     else:
304                         result = 'PASS' if(test_case.is_successful(
305                             ) == test_case.EX_OK) else 'FAIL'
306                     msg.add_row(
307                         [test_case.case_name, test_case.project_name,
308                          self.tiers.get_tier_name(test_case.case_name),
309                          test_case.get_duration(), result])
310             for test in each_tier.get_skipped_test():
311                 msg.add_row([test.get_name(), test.get_project(),
312                              each_tier.get_name(), "00:00", "SKIP"])
313         LOGGER.info("Xtesting report:\n\n%s\n", msg)
314
315
316 def main():
317     """Entry point"""
318     try:
319         os.makedirs(constants.RESULTS_DIR)
320     except OSError as ex:
321         if ex.errno != errno.EEXIST:
322             print(f"Cannot create {constants.RESULTS_DIR}")
323             return testcase.TestCase.EX_RUN_ERROR
324     if env.get('DEBUG').lower() == 'true':
325         logging.config.fileConfig(config.get_xtesting_config(
326             'logging.debug.ini', constants.DEBUG_INI_PATH_DEFAULT))
327     else:
328         logging.config.fileConfig(config.get_xtesting_config(
329             'logging.ini', constants.INI_PATH_DEFAULT))
330     logging.captureWarnings(True)
331     parser = RunTestsParser()
332     args = parser.parse_args(sys.argv[1:])
333     # Reset argv to prevent wrong usage by the underlying test framework
334     # e.g. pyats fails by expecting an arg to -p (publish to database) when
335     # called via Robot.run()
336     sys.argv = [sys.argv[0]]
337     runner = Runner()
338     return runner.main(**args).value