c88c82820ce24aa4e8dfb3e26e7fdd4b4debf098
[functest-xtesting.git] / xtesting / ci / run_tests.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Ericsson AB and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """ The entry of running tests:
11 1) Parses xtesting/ci/testcases.yaml to check which testcase(s) to be run
12 2) Execute the common operations on every testcase (run, push results to db...)
13 3) Return the right status code
14 """
15
16 import argparse
17 import errno
18 import logging
19 import logging.config
20 import os
21 import re
22 import sys
23 import textwrap
24
25 import enum
26 import pkg_resources
27 import prettytable
28 from stevedore import driver
29 import yaml
30
31 from xtesting.ci import tier_builder
32 from xtesting.core import testcase
33 from xtesting.utils import constants
34 from xtesting.utils import env
35
36 LOGGER = logging.getLogger('xtesting.ci.run_tests')
37
38
39 class Result(enum.Enum):
40     """The overall result in enumerated type"""
41     # pylint: disable=too-few-public-methods
42     EX_OK = os.EX_OK
43     EX_ERROR = -1
44
45
46 class BlockingTestFailed(Exception):
47     """Exception when the blocking test fails"""
48
49
50 class RunTestsParser():
51     """Parser to run tests"""
52     # pylint: disable=too-few-public-methods
53
54     def __init__(self):
55         self.parser = argparse.ArgumentParser()
56         self.parser.add_argument("-t", "--test", dest="test", action='store',
57                                  help="Test case or tier (group of tests) "
58                                  "to be executed. It will run all the test "
59                                  "if not specified.")
60         self.parser.add_argument("-n", "--noclean", help="Do not clean "
61                                  "OpenStack resources after running each "
62                                  "test (default=false).",
63                                  action="store_true")
64         self.parser.add_argument("-r", "--report", help="Push results to "
65                                  "database (default=false).",
66                                  action="store_true")
67         self.parser.add_argument("-p", "--push", help="Push artifacts to "
68                                  "S3 repository (default=false).",
69                                  action="store_true")
70
71     def parse_args(self, argv=None):
72         """Parse arguments.
73
74         It can call sys.exit if arguments are incorrect.
75
76         Returns:
77             the arguments from cmdline
78         """
79         return vars(self.parser.parse_args(argv))
80
81
82 class Runner():
83     """Runner class"""
84
85     def __init__(self):
86         self.executed_test_cases = {}
87         self.overall_result = Result.EX_OK
88         self.clean_flag = True
89         self.report_flag = False
90         self.push_flag = False
91         self.tiers = tier_builder.TierBuilder(_get_xtesting_config(
92             constants.TESTCASE_DESCRIPTION,
93             constants.TESTCASE_DESCRIPTION_DEFAULT))
94
95     @staticmethod
96     def source_envfile(rc_file=constants.ENV_FILE):
97         """Source the env file passed as arg"""
98         if not os.path.isfile(rc_file):
99             LOGGER.debug("No env file %s found", rc_file)
100             return
101         with open(rc_file, "r", encoding='utf-8') as rcfd:
102             for line in rcfd:
103                 var = (line.rstrip('"\n').replace('export ', '').split(
104                     "=") if re.search(r'(.*)=(.*)', line) else None)
105                 # The two next lines should be modified as soon as rc_file
106                 # conforms with common rules. Be aware that it could induce
107                 # issues if value starts with '
108                 if var:
109                     key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0])
110                     value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:]))
111                     os.environ[key] = value
112             rcfd.seek(0, 0)
113             LOGGER.debug("Sourcing env file %s\n\n%s", rc_file, rcfd.read())
114
115     @staticmethod
116     def get_dict_by_test(testname):
117         # pylint: disable=missing-docstring
118         with open(pkg_resources.resource_filename(
119                 'xtesting', 'ci/testcases.yaml'), encoding='utf-8') as tyaml:
120             testcases_yaml = yaml.safe_load(tyaml)
121         for dic_tier in testcases_yaml.get("tiers"):
122             for dic_testcase in dic_tier['testcases']:
123                 if dic_testcase['case_name'] == testname:
124                     return dic_testcase
125         LOGGER.error('Project %s is not defined in testcases.yaml', testname)
126         return None
127
128     @staticmethod
129     def get_run_dict(testname):
130         """Obtain the 'run' block of the testcase from testcases.yaml"""
131         try:
132             dic_testcase = Runner.get_dict_by_test(testname)
133             if not dic_testcase:
134                 LOGGER.error("Cannot get %s's config options", testname)
135             elif 'run' in dic_testcase:
136                 return dic_testcase['run']
137             return None
138         except Exception:  # pylint: disable=broad-except
139             LOGGER.exception("Cannot get %s's config options", testname)
140             return None
141
142     def run_test(self, test):
143         """Run one test case"""
144         if not test.is_enabled() or test.is_skipped():
145             msg = prettytable.PrettyTable(
146                 header_style='upper', padding_width=5,
147                 field_names=['test case', 'project', 'duration',
148                              'result'])
149             msg.add_row([test.get_name(), test.get_project(), "00:00", "SKIP"])
150             LOGGER.info("Test result:\n\n%s\n", msg)
151             return testcase.TestCase.EX_TESTCASE_SKIPPED
152         result = testcase.TestCase.EX_TESTCASE_FAILED
153         run_dict = self.get_run_dict(test.get_name())
154         if run_dict:
155             try:
156                 LOGGER.info("Loading test case '%s'...", test.get_name())
157                 test_dict = Runner.get_dict_by_test(test.get_name())
158                 test_case = driver.DriverManager(
159                     namespace='xtesting.testcase',
160                     name=run_dict['name'],
161                     invoke_on_load=True,
162                     invoke_kwds=test_dict).driver
163                 self.executed_test_cases[test.get_name()] = test_case
164                 test_case.check_requirements()
165                 if test_case.is_skipped:
166                     LOGGER.info("Skipping test case '%s'...", test.get_name())
167                     LOGGER.info("Test result:\n\n%s\n", test_case)
168                     return testcase.TestCase.EX_TESTCASE_SKIPPED
169                 LOGGER.info("Running test case '%s'...", test.get_name())
170                 try:
171                     kwargs = run_dict['args']
172                     test_case.run(**kwargs)
173                 except KeyError:
174                     test_case.run()
175                 result = test_case.is_successful()
176                 LOGGER.info("Test result:\n\n%s\n", test_case)
177                 if self.clean_flag:
178                     test_case.clean()
179                 if self.push_flag:
180                     test_case.publish_artifacts()
181                 if self.report_flag:
182                     test_case.push_to_db()
183             except ImportError:
184                 LOGGER.exception("Cannot import module %s", run_dict['module'])
185             except AttributeError:
186                 LOGGER.exception("Cannot get class %s", run_dict['class'])
187             except Exception:  # pylint: disable=broad-except
188                 LOGGER.exception(
189                     "\n\nPlease fix the testcase %s.\n"
190                     "All exceptions should be caught by the testcase instead!"
191                     "\n\n",
192                     test.get_name())
193         else:
194             raise Exception("Cannot import the class for the test case.")
195         return result
196
197     def run_tier(self, tier):
198         """Run one tier"""
199         tests = tier.get_tests()
200         if not tests:
201             LOGGER.info("There are no supported test cases in this tier "
202                         "for the given scenario")
203             self.overall_result = Result.EX_ERROR
204         else:
205             for test in tests:
206                 self.run_test(test)
207                 test_case = self.executed_test_cases[test.get_name()]
208                 if test_case.is_successful() == test_case.EX_TESTCASE_FAILED:
209                     LOGGER.error("The test case '%s' failed.", test.get_name())
210                     self.overall_result = Result.EX_ERROR
211                     if test.is_blocking():
212                         raise BlockingTestFailed(
213                             f"The test case {test.get_name()} "
214                             "failed and is blocking")
215         return self.overall_result
216
217     def run_all(self):
218         """Run all available testcases"""
219         tiers_to_run = []
220         msg = prettytable.PrettyTable(
221             header_style='upper', padding_width=5,
222             field_names=['tiers', 'description', 'testcases'])
223         for tier in self.tiers.get_tiers():
224             if tier.get_tests():
225                 tiers_to_run.append(tier)
226                 msg.add_row([tier.get_name(),
227                              textwrap.fill(tier.description, width=40),
228                              textwrap.fill(' '.join([str(x.get_name(
229                                  )) for x in tier.get_tests()]), width=40)])
230         LOGGER.info("TESTS TO BE EXECUTED:\n\n%s\n", msg)
231         for tier in tiers_to_run:
232             self.run_tier(tier)
233
234     def main(self, **kwargs):  # pylint: disable=too-many-branches
235         """Entry point of class Runner"""
236         if 'noclean' in kwargs:
237             self.clean_flag = not kwargs['noclean']
238         if 'report' in kwargs:
239             self.report_flag = kwargs['report']
240         if 'push' in kwargs:
241             self.push_flag = kwargs['push']
242         try:
243             LOGGER.info("Deployment description:\n\n%s\n", env.string())
244             self.source_envfile()
245             if 'test' in kwargs:
246                 LOGGER.debug("Test args: %s", kwargs['test'])
247                 if self.tiers.get_tier(kwargs['test']):
248                     self.run_tier(self.tiers.get_tier(kwargs['test']))
249                 elif self.tiers.get_test(kwargs['test']):
250                     result = self.run_test(
251                         self.tiers.get_test(kwargs['test']))
252                     if result == testcase.TestCase.EX_TESTCASE_FAILED:
253                         LOGGER.error("The test case '%s' failed.",
254                                      kwargs['test'])
255                         self.overall_result = Result.EX_ERROR
256                 elif kwargs['test'] == "all":
257                     self.run_all()
258                 else:
259                     LOGGER.error("Unknown test case or tier '%s', or not "
260                                  "supported by the given scenario '%s'.",
261                                  kwargs['test'],
262                                  env.get('DEPLOY_SCENARIO'))
263                     LOGGER.debug("Available tiers are:\n\n%s",
264                                  self.tiers)
265                     return Result.EX_ERROR
266             else:
267                 self.run_all()
268         except BlockingTestFailed:
269             pass
270         except Exception:  # pylint: disable=broad-except
271             LOGGER.exception("Failures when running testcase(s)")
272             self.overall_result = Result.EX_ERROR
273         if not self.tiers.get_test(kwargs['test']):
274             self.summary(self.tiers.get_tier(kwargs['test']))
275         LOGGER.info("Execution exit value: %s", self.overall_result)
276         return self.overall_result
277
278     def summary(self, tier=None):
279         """To generate xtesting report showing the overall results"""
280         msg = prettytable.PrettyTable(
281             header_style='upper', padding_width=5,
282             field_names=['test case', 'project', 'tier',
283                          'duration', 'result'])
284         tiers = [tier] if tier else self.tiers.get_tiers()
285         for each_tier in tiers:
286             for test in each_tier.get_tests():
287                 try:
288                     test_case = self.executed_test_cases[test.get_name()]
289                 except KeyError:
290                     msg.add_row([test.get_name(), test.get_project(),
291                                  each_tier.get_name(), "00:00", "SKIP"])
292                 else:
293                     if test_case.is_skipped:
294                         result = 'SKIP'
295                     else:
296                         result = 'PASS' if(test_case.is_successful(
297                             ) == test_case.EX_OK) else 'FAIL'
298                     msg.add_row(
299                         [test_case.case_name, test_case.project_name,
300                          self.tiers.get_tier_name(test_case.case_name),
301                          test_case.get_duration(), result])
302             for test in each_tier.get_skipped_test():
303                 msg.add_row([test.get_name(), test.get_project(),
304                              each_tier.get_name(), "00:00", "SKIP"])
305         LOGGER.info("Xtesting report:\n\n%s\n", msg)
306
307
308 def _get_xtesting_config(filename, default):
309     for path in constants.XTESTING_PATHES:
310         abspath = os.path.abspath(os.path.expanduser(path))
311         if os.path.isfile(os.path.join(abspath, filename)):
312             return os.path.join(abspath, filename)
313     return default
314
315
316 def main():
317     """Entry point"""
318     try:
319         os.makedirs(constants.RESULTS_DIR)
320     except OSError as ex:
321         if ex.errno != errno.EEXIST:
322             print(f"Cannot create {constants.RESULTS_DIR}")
323             return testcase.TestCase.EX_RUN_ERROR
324     if env.get('DEBUG').lower() == 'true':
325         logging.config.fileConfig(_get_xtesting_config(
326             'logging.debug.ini', constants.DEBUG_INI_PATH_DEFAULT))
327     else:
328         logging.config.fileConfig(_get_xtesting_config(
329             'logging.ini', constants.INI_PATH_DEFAULT))
330     logging.captureWarnings(True)
331     parser = RunTestsParser()
332     args = parser.parse_args(sys.argv[1:])
333     # Reset argv to prevent wrong usage by the underlying test framework
334     # e.g. pyats fails by expecting an arg to -p (publish to database) when
335     # called via Robot.run()
336     sys.argv = [sys.argv[0]]
337     runner = Runner()
338     return runner.main(**args).value