Rewrite last direct call to ci/testcases.yaml
[functest-xtesting.git] / xtesting / ci / run_tests.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Ericsson AB and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """ The entry of running tests:
11 1) Parses xtesting/ci/testcases.yaml to check which testcase(s) to be run
12 2) Execute the common operations on every testcase (run, push results to db...)
13 3) Return the right status code
14 """
15
16 import argparse
17 import errno
18 import logging
19 import logging.config
20 import os
21 import re
22 import sys
23 import textwrap
24
25 import enum
26 import prettytable
27 from stevedore import driver
28 import yaml
29
30 from xtesting.ci import tier_builder
31 from xtesting.core import testcase
32 from xtesting.utils import config
33 from xtesting.utils import constants
34 from xtesting.utils import env
35
36 LOGGER = logging.getLogger('xtesting.ci.run_tests')
37
38
39 class Result(enum.Enum):
40     """The overall result in enumerated type"""
41     # pylint: disable=too-few-public-methods
42     EX_OK = os.EX_OK
43     EX_ERROR = -1
44
45
46 class BlockingTestFailed(Exception):
47     """Exception when the blocking test fails"""
48
49
50 class RunTestsParser():
51     """Parser to run tests"""
52     # pylint: disable=too-few-public-methods
53
54     def __init__(self):
55         self.parser = argparse.ArgumentParser()
56         self.parser.add_argument("-t", "--test", dest="test", action='store',
57                                  help="Test case or tier (group of tests) "
58                                  "to be executed. It will run all the test "
59                                  "if not specified.")
60         self.parser.add_argument("-n", "--noclean", help="Do not clean "
61                                  "OpenStack resources after running each "
62                                  "test (default=false).",
63                                  action="store_true")
64         self.parser.add_argument("-r", "--report", help="Push results to "
65                                  "database (default=false).",
66                                  action="store_true")
67         self.parser.add_argument("-p", "--push", help="Push artifacts to "
68                                  "S3 repository (default=false).",
69                                  action="store_true")
70
71     def parse_args(self, argv=None):
72         """Parse arguments.
73
74         It can call sys.exit if arguments are incorrect.
75
76         Returns:
77             the arguments from cmdline
78         """
79         return vars(self.parser.parse_args(argv))
80
81
82 class Runner():
83     """Runner class"""
84
85     def __init__(self):
86         self.executed_test_cases = {}
87         self.overall_result = Result.EX_OK
88         self.clean_flag = True
89         self.report_flag = False
90         self.push_flag = False
91         self.tiers = tier_builder.TierBuilder(config.get_xtesting_config(
92             constants.TESTCASE_DESCRIPTION,
93             constants.TESTCASE_DESCRIPTION_DEFAULT))
94
95     @staticmethod
96     def source_envfile(rc_file=constants.ENV_FILE):
97         """Source the env file passed as arg"""
98         if not os.path.isfile(rc_file):
99             LOGGER.debug("No env file %s found", rc_file)
100             return
101         with open(rc_file, "r", encoding='utf-8') as rcfd:
102             for line in rcfd:
103                 var = (line.rstrip('"\n').replace('export ', '').split(
104                     "=") if re.search(r'(.*)=(.*)', line) else None)
105                 # The two next lines should be modified as soon as rc_file
106                 # conforms with common rules. Be aware that it could induce
107                 # issues if value starts with '
108                 if var:
109                     key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0])
110                     value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:]))
111                     os.environ[key] = value
112             rcfd.seek(0, 0)
113             LOGGER.debug("Sourcing env file %s\n\n%s", rc_file, rcfd.read())
114
115     @staticmethod
116     def get_dict_by_test(testname):
117         # pylint: disable=missing-docstring
118         with open(config.get_xtesting_config(
119                 constants.TESTCASE_DESCRIPTION,
120                 constants.TESTCASE_DESCRIPTION_DEFAULT),
121                 encoding='utf-8') as tyaml:
122             testcases_yaml = yaml.safe_load(tyaml)
123         for dic_tier in testcases_yaml.get("tiers"):
124             for dic_testcase in dic_tier['testcases']:
125                 if dic_testcase['case_name'] == testname:
126                     return dic_testcase
127         LOGGER.error('Project %s is not defined in testcases.yaml', testname)
128         return None
129
130     @staticmethod
131     def get_run_dict(testname):
132         """Obtain the 'run' block of the testcase from testcases.yaml"""
133         try:
134             dic_testcase = Runner.get_dict_by_test(testname)
135             if not dic_testcase:
136                 LOGGER.error("Cannot get %s's config options", testname)
137             elif 'run' in dic_testcase:
138                 return dic_testcase['run']
139             return None
140         except Exception:  # pylint: disable=broad-except
141             LOGGER.exception("Cannot get %s's config options", testname)
142             return None
143
144     def run_test(self, test):
145         """Run one test case"""
146         if not test.is_enabled() or test.is_skipped():
147             msg = prettytable.PrettyTable(
148                 header_style='upper', padding_width=5,
149                 field_names=['test case', 'project', 'duration',
150                              'result'])
151             msg.add_row([test.get_name(), test.get_project(), "00:00", "SKIP"])
152             LOGGER.info("Test result:\n\n%s\n", msg)
153             return testcase.TestCase.EX_TESTCASE_SKIPPED
154         result = testcase.TestCase.EX_TESTCASE_FAILED
155         run_dict = self.get_run_dict(test.get_name())
156         if run_dict:
157             try:
158                 LOGGER.info("Loading test case '%s'...", test.get_name())
159                 test_dict = Runner.get_dict_by_test(test.get_name())
160                 test_case = driver.DriverManager(
161                     namespace='xtesting.testcase',
162                     name=run_dict['name'],
163                     invoke_on_load=True,
164                     invoke_kwds=test_dict).driver
165                 self.executed_test_cases[test.get_name()] = test_case
166                 test_case.check_requirements()
167                 if test_case.is_skipped:
168                     LOGGER.info("Skipping test case '%s'...", test.get_name())
169                     LOGGER.info("Test result:\n\n%s\n", test_case)
170                     return testcase.TestCase.EX_TESTCASE_SKIPPED
171                 LOGGER.info("Running test case '%s'...", test.get_name())
172                 try:
173                     kwargs = run_dict['args']
174                     test_case.run(**kwargs)
175                 except KeyError:
176                     test_case.run()
177                 result = test_case.is_successful()
178                 LOGGER.info("Test result:\n\n%s\n", test_case)
179                 if self.clean_flag:
180                     test_case.clean()
181                 if self.push_flag:
182                     test_case.publish_artifacts()
183                 if self.report_flag:
184                     test_case.push_to_db()
185             except ImportError:
186                 LOGGER.exception("Cannot import module %s", run_dict['module'])
187             except AttributeError:
188                 LOGGER.exception("Cannot get class %s", run_dict['class'])
189             except Exception:  # pylint: disable=broad-except
190                 LOGGER.exception(
191                     "\n\nPlease fix the testcase %s.\n"
192                     "All exceptions should be caught by the testcase instead!"
193                     "\n\n",
194                     test.get_name())
195         else:
196             raise Exception("Cannot import the class for the test case.")
197         return result
198
199     def run_tier(self, tier):
200         """Run one tier"""
201         tests = tier.get_tests()
202         if not tests:
203             LOGGER.info("There are no supported test cases in this tier "
204                         "for the given scenario")
205             self.overall_result = Result.EX_ERROR
206         else:
207             for test in tests:
208                 self.run_test(test)
209                 test_case = self.executed_test_cases[test.get_name()]
210                 if test_case.is_successful() == test_case.EX_TESTCASE_FAILED:
211                     LOGGER.error("The test case '%s' failed.", test.get_name())
212                     self.overall_result = Result.EX_ERROR
213                     if test.is_blocking():
214                         raise BlockingTestFailed(
215                             f"The test case {test.get_name()} "
216                             "failed and is blocking")
217         return self.overall_result
218
219     def run_all(self):
220         """Run all available testcases"""
221         tiers_to_run = []
222         msg = prettytable.PrettyTable(
223             header_style='upper', padding_width=5,
224             field_names=['tiers', 'description', 'testcases'])
225         for tier in self.tiers.get_tiers():
226             if tier.get_tests():
227                 tiers_to_run.append(tier)
228                 msg.add_row([tier.get_name(),
229                              textwrap.fill(tier.description, width=40),
230                              textwrap.fill(' '.join([str(x.get_name(
231                                  )) for x in tier.get_tests()]), width=40)])
232         LOGGER.info("TESTS TO BE EXECUTED:\n\n%s\n", msg)
233         for tier in tiers_to_run:
234             self.run_tier(tier)
235
236     def main(self, **kwargs):  # pylint: disable=too-many-branches
237         """Entry point of class Runner"""
238         if 'noclean' in kwargs:
239             self.clean_flag = not kwargs['noclean']
240         if 'report' in kwargs:
241             self.report_flag = kwargs['report']
242         if 'push' in kwargs:
243             self.push_flag = kwargs['push']
244         try:
245             LOGGER.info("Deployment description:\n\n%s\n", env.string())
246             self.source_envfile()
247             if 'test' in kwargs:
248                 LOGGER.debug("Test args: %s", kwargs['test'])
249                 if self.tiers.get_tier(kwargs['test']):
250                     self.run_tier(self.tiers.get_tier(kwargs['test']))
251                 elif self.tiers.get_test(kwargs['test']):
252                     result = self.run_test(
253                         self.tiers.get_test(kwargs['test']))
254                     if result == testcase.TestCase.EX_TESTCASE_FAILED:
255                         LOGGER.error("The test case '%s' failed.",
256                                      kwargs['test'])
257                         self.overall_result = Result.EX_ERROR
258                 elif kwargs['test'] == "all":
259                     self.run_all()
260                 else:
261                     LOGGER.error("Unknown test case or tier '%s', or not "
262                                  "supported by the given scenario '%s'.",
263                                  kwargs['test'],
264                                  env.get('DEPLOY_SCENARIO'))
265                     LOGGER.debug("Available tiers are:\n\n%s",
266                                  self.tiers)
267                     return Result.EX_ERROR
268             else:
269                 self.run_all()
270         except BlockingTestFailed:
271             pass
272         except Exception:  # pylint: disable=broad-except
273             LOGGER.exception("Failures when running testcase(s)")
274             self.overall_result = Result.EX_ERROR
275         if not self.tiers.get_test(kwargs['test']):
276             self.summary(self.tiers.get_tier(kwargs['test']))
277         LOGGER.info("Execution exit value: %s", self.overall_result)
278         return self.overall_result
279
280     def summary(self, tier=None):
281         """To generate xtesting report showing the overall results"""
282         msg = prettytable.PrettyTable(
283             header_style='upper', padding_width=5,
284             field_names=['test case', 'project', 'tier',
285                          'duration', 'result'])
286         tiers = [tier] if tier else self.tiers.get_tiers()
287         for each_tier in tiers:
288             for test in each_tier.get_tests():
289                 try:
290                     test_case = self.executed_test_cases[test.get_name()]
291                 except KeyError:
292                     msg.add_row([test.get_name(), test.get_project(),
293                                  each_tier.get_name(), "00:00", "SKIP"])
294                 else:
295                     if test_case.is_skipped:
296                         result = 'SKIP'
297                     else:
298                         result = 'PASS' if(test_case.is_successful(
299                             ) == test_case.EX_OK) else 'FAIL'
300                     msg.add_row(
301                         [test_case.case_name, test_case.project_name,
302                          self.tiers.get_tier_name(test_case.case_name),
303                          test_case.get_duration(), result])
304             for test in each_tier.get_skipped_test():
305                 msg.add_row([test.get_name(), test.get_project(),
306                              each_tier.get_name(), "00:00", "SKIP"])
307         LOGGER.info("Xtesting report:\n\n%s\n", msg)
308
309
310 def main():
311     """Entry point"""
312     try:
313         os.makedirs(constants.RESULTS_DIR)
314     except OSError as ex:
315         if ex.errno != errno.EEXIST:
316             print(f"Cannot create {constants.RESULTS_DIR}")
317             return testcase.TestCase.EX_RUN_ERROR
318     if env.get('DEBUG').lower() == 'true':
319         logging.config.fileConfig(config.get_xtesting_config(
320             'logging.debug.ini', constants.DEBUG_INI_PATH_DEFAULT))
321     else:
322         logging.config.fileConfig(config.get_xtesting_config(
323             'logging.ini', constants.INI_PATH_DEFAULT))
324     logging.captureWarnings(True)
325     parser = RunTestsParser()
326     args = parser.parse_args(sys.argv[1:])
327     # Reset argv to prevent wrong usage by the underlying test framework
328     # e.g. pyats fails by expecting an arg to -p (publish to database) when
329     # called via Robot.run()
330     sys.argv = [sys.argv[0]]
331     runner = Runner()
332     return runner.main(**args).value