Don't override env values via testcases.yaml if they exist
[functest-xtesting.git] / xtesting / ci / run_tests.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Ericsson AB and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """ The entry of running tests:
11 1) Parses xtesting/ci/testcases.yaml to check which testcase(s) to be run
12 2) Execute the common operations on every testcase (run, push results to db...)
13 3) Return the right status code
14 """
15
16 import argparse
17 import errno
18 import logging
19 import logging.config
20 import os
21 import re
22 import sys
23 import textwrap
24
25 import enum
26 import prettytable
27 from stevedore import driver
28 import yaml
29
30 from xtesting.ci import tier_builder
31 from xtesting.core import testcase
32 from xtesting.utils import config
33 from xtesting.utils import constants
34 from xtesting.utils import env
35
36 LOGGER = logging.getLogger('xtesting.ci.run_tests')
37
38
39 class Result(enum.Enum):
40     """The overall result in enumerated type"""
41     # pylint: disable=too-few-public-methods
42     EX_OK = os.EX_OK
43     EX_ERROR = -1
44
45
46 class BlockingTestFailed(Exception):
47     """Exception when the blocking test fails"""
48
49
50 class RunTestsParser():
51     """Parser to run tests"""
52     # pylint: disable=too-few-public-methods
53
54     def __init__(self):
55         self.parser = argparse.ArgumentParser()
56         self.parser.add_argument("-t", "--test", dest="test", action='store',
57                                  help="Test case or tier (group of tests) "
58                                  "to be executed. It will run all the test "
59                                  "if not specified.")
60         self.parser.add_argument("-n", "--noclean", help="Do not clean "
61                                  "OpenStack resources after running each "
62                                  "test (default=false).",
63                                  action="store_true")
64         self.parser.add_argument("-r", "--report", help="Push results to "
65                                  "database (default=false).",
66                                  action="store_true")
67         self.parser.add_argument("-p", "--push", help="Push artifacts to "
68                                  "S3 repository (default=false).",
69                                  action="store_true")
70
71     def parse_args(self, argv=None):
72         """Parse arguments.
73
74         It can call sys.exit if arguments are incorrect.
75
76         Returns:
77             the arguments from cmdline
78         """
79         return vars(self.parser.parse_args(argv))
80
81
82 class Runner():
83     """Runner class"""
84
85     def __init__(self):
86         self.executed_test_cases = {}
87         self.overall_result = Result.EX_OK
88         self.clean_flag = True
89         self.report_flag = False
90         self.push_flag = False
91         self.tiers = tier_builder.TierBuilder(config.get_xtesting_config(
92             constants.TESTCASE_DESCRIPTION,
93             constants.TESTCASE_DESCRIPTION_DEFAULT))
94
95     @staticmethod
96     def source_envfile(rc_file=constants.ENV_FILE):
97         """Source the env file passed as arg"""
98         if not os.path.isfile(rc_file):
99             LOGGER.debug("No env file %s found", rc_file)
100             return
101         with open(rc_file, "r", encoding='utf-8') as rcfd:
102             for line in rcfd:
103                 var = (line.rstrip('"\n').replace('export ', '').split(
104                     "=") if re.search(r'(.*)=(.*)', line) else None)
105                 # The two next lines should be modified as soon as rc_file
106                 # conforms with common rules. Be aware that it could induce
107                 # issues if value starts with '
108                 if var:
109                     key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0])
110                     value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:]))
111                     os.environ[key] = value
112             rcfd.seek(0, 0)
113             LOGGER.debug("Sourcing env file %s\n\n%s", rc_file, rcfd.read())
114
115     @staticmethod
116     def get_dict_by_test(testname):
117         # pylint: disable=missing-docstring
118         with open(config.get_xtesting_config(
119                 constants.TESTCASE_DESCRIPTION,
120                 constants.TESTCASE_DESCRIPTION_DEFAULT),
121                 encoding='utf-8') as tyaml:
122             testcases_yaml = yaml.safe_load(tyaml)
123         for dic_tier in testcases_yaml.get("tiers"):
124             for dic_testcase in dic_tier['testcases']:
125                 if dic_testcase['case_name'] == testname:
126                     return dic_testcase
127         LOGGER.error('Project %s is not defined in testcases.yaml', testname)
128         return None
129
130     @staticmethod
131     def get_run_dict(testname):
132         """Obtain the 'run' block of the testcase from testcases.yaml"""
133         try:
134             dic_testcase = Runner.get_dict_by_test(testname)
135             if not dic_testcase:
136                 LOGGER.error("Cannot get %s's config options", testname)
137             elif 'run' in dic_testcase:
138                 return dic_testcase['run']
139             return None
140         except Exception:  # pylint: disable=broad-except
141             LOGGER.exception("Cannot get %s's config options", testname)
142             return None
143
144     def run_test(self, test):
145         """Run one test case"""
146         # pylint: disable=too-many-branches,broad-exception-raised
147         if not test.is_enabled() or test.is_skipped():
148             msg = prettytable.PrettyTable(
149                 header_style='upper', padding_width=5,
150                 field_names=['test case', 'project', 'duration',
151                              'result'])
152             msg.add_row([test.get_name(), test.get_project(), "00:00", "SKIP"])
153             LOGGER.info("Test result:\n\n%s\n", msg)
154             return testcase.TestCase.EX_TESTCASE_SKIPPED
155         result = testcase.TestCase.EX_TESTCASE_FAILED
156         run_dict = self.get_run_dict(test.get_name())
157         if run_dict:
158             try:
159                 LOGGER.info("Loading test case '%s'...", test.get_name())
160                 test_dict = Runner.get_dict_by_test(test.get_name())
161                 test_case = driver.DriverManager(
162                     namespace='xtesting.testcase',
163                     name=run_dict['name'],
164                     invoke_on_load=True,
165                     invoke_kwds=test_dict).driver
166                 self.executed_test_cases[test.get_name()] = test_case
167                 test_case.check_requirements()
168                 if test_case.is_skipped:
169                     LOGGER.info("Skipping test case '%s'...", test.get_name())
170                     LOGGER.info("Test result:\n\n%s\n", test_case)
171                     return testcase.TestCase.EX_TESTCASE_SKIPPED
172                 if 'env' in run_dict:
173                     for key, value in run_dict['env'].items():
174                         if key not in os.environ:
175                             LOGGER.info("Setting env for test case '%s'...",
176                                         test.get_name())
177                             os.environ[key] = str(value)
178                 LOGGER.info("Running test case '%s'...", test.get_name())
179                 try:
180                     kwargs = run_dict['args']
181                     test_case.run(**kwargs)
182                 except KeyError:
183                     test_case.run()
184                 result = test_case.is_successful()
185                 LOGGER.info("Test result:\n\n%s\n", test_case)
186                 if self.clean_flag:
187                     test_case.clean()
188                 if self.push_flag:
189                     test_case.publish_artifacts()
190                 if self.report_flag:
191                     test_case.push_to_db()
192             except ImportError:
193                 LOGGER.exception("Cannot import module %s", run_dict['module'])
194             except AttributeError:
195                 LOGGER.exception("Cannot get class %s", run_dict['class'])
196             except Exception:  # pylint: disable=broad-except
197                 LOGGER.exception(
198                     "\n\nPlease fix the testcase %s.\n"
199                     "All exceptions should be caught by the testcase instead!"
200                     "\n\n",
201                     test.get_name())
202         else:
203             raise Exception("Cannot import the class for the test case.")
204         return result
205
206     def run_tier(self, tier):
207         """Run one tier"""
208         tests = tier.get_tests()
209         if not tests:
210             LOGGER.info("There are no supported test cases in this tier "
211                         "for the given scenario")
212             self.overall_result = Result.EX_ERROR
213         else:
214             for test in tests:
215                 self.run_test(test)
216                 test_case = self.executed_test_cases[test.get_name()]
217                 if test_case.is_successful() == test_case.EX_TESTCASE_FAILED:
218                     LOGGER.error("The test case '%s' failed.", test.get_name())
219                     self.overall_result = Result.EX_ERROR
220                     if test.is_blocking():
221                         raise BlockingTestFailed(
222                             f"The test case {test.get_name()} "
223                             "failed and is blocking")
224         return self.overall_result
225
226     def run_all(self):
227         """Run all available testcases"""
228         tiers_to_run = []
229         msg = prettytable.PrettyTable(
230             header_style='upper', padding_width=5,
231             field_names=['tiers', 'description', 'testcases'])
232         for tier in self.tiers.get_tiers():
233             if tier.get_tests():
234                 tiers_to_run.append(tier)
235                 msg.add_row([tier.get_name(),
236                              textwrap.fill(tier.description, width=40),
237                              textwrap.fill(' '.join([str(x.get_name(
238                                  )) for x in tier.get_tests()]), width=40)])
239         LOGGER.info("TESTS TO BE EXECUTED:\n\n%s\n", msg)
240         for tier in tiers_to_run:
241             self.run_tier(tier)
242
243     def main(self, **kwargs):  # pylint: disable=too-many-branches
244         """Entry point of class Runner"""
245         if 'noclean' in kwargs:
246             self.clean_flag = not kwargs['noclean']
247         if 'report' in kwargs:
248             self.report_flag = kwargs['report']
249         if 'push' in kwargs:
250             self.push_flag = kwargs['push']
251         try:
252             LOGGER.info("Deployment description:\n\n%s\n", env.string())
253             self.source_envfile()
254             if 'test' in kwargs:
255                 LOGGER.debug("Test args: %s", kwargs['test'])
256                 if self.tiers.get_tier(kwargs['test']):
257                     self.run_tier(self.tiers.get_tier(kwargs['test']))
258                 elif self.tiers.get_test(kwargs['test']):
259                     result = self.run_test(
260                         self.tiers.get_test(kwargs['test']))
261                     if result == testcase.TestCase.EX_TESTCASE_FAILED:
262                         LOGGER.error("The test case '%s' failed.",
263                                      kwargs['test'])
264                         self.overall_result = Result.EX_ERROR
265                 elif kwargs['test'] == "all":
266                     self.run_all()
267                 else:
268                     LOGGER.error("Unknown test case or tier '%s', or not "
269                                  "supported by the given scenario '%s'.",
270                                  kwargs['test'],
271                                  env.get('DEPLOY_SCENARIO'))
272                     LOGGER.debug("Available tiers are:\n\n%s",
273                                  self.tiers)
274                     return Result.EX_ERROR
275             else:
276                 self.run_all()
277         except BlockingTestFailed:
278             pass
279         except Exception:  # pylint: disable=broad-except
280             LOGGER.exception("Failures when running testcase(s)")
281             self.overall_result = Result.EX_ERROR
282         if not self.tiers.get_test(kwargs['test']):
283             self.summary(self.tiers.get_tier(kwargs['test']))
284         LOGGER.info("Execution exit value: %s", self.overall_result)
285         return self.overall_result
286
287     def summary(self, tier=None):
288         """To generate xtesting report showing the overall results"""
289         msg = prettytable.PrettyTable(
290             header_style='upper', padding_width=5,
291             field_names=['test case', 'project', 'tier',
292                          'duration', 'result'])
293         tiers = [tier] if tier else self.tiers.get_tiers()
294         for each_tier in tiers:
295             for test in each_tier.get_tests():
296                 try:
297                     test_case = self.executed_test_cases[test.get_name()]
298                 except KeyError:
299                     msg.add_row([test.get_name(), test.get_project(),
300                                  each_tier.get_name(), "00:00", "SKIP"])
301                 else:
302                     if test_case.is_skipped:
303                         result = 'SKIP'
304                     else:
305                         result = 'PASS' if(test_case.is_successful(
306                             ) == test_case.EX_OK) else 'FAIL'
307                     msg.add_row(
308                         [test_case.case_name, test_case.project_name,
309                          self.tiers.get_tier_name(test_case.case_name),
310                          test_case.get_duration(), result])
311             for test in each_tier.get_skipped_test():
312                 msg.add_row([test.get_name(), test.get_project(),
313                              each_tier.get_name(), "00:00", "SKIP"])
314         LOGGER.info("Xtesting report:\n\n%s\n", msg)
315
316
317 def main():
318     """Entry point"""
319     try:
320         os.makedirs(constants.RESULTS_DIR)
321     except OSError as ex:
322         if ex.errno != errno.EEXIST:
323             print(f"Cannot create {constants.RESULTS_DIR}")
324             return testcase.TestCase.EX_RUN_ERROR
325     if env.get('DEBUG').lower() == 'true':
326         logging.config.fileConfig(config.get_xtesting_config(
327             'logging.debug.ini', constants.DEBUG_INI_PATH_DEFAULT))
328     else:
329         logging.config.fileConfig(config.get_xtesting_config(
330             'logging.ini', constants.INI_PATH_DEFAULT))
331     logging.captureWarnings(True)
332     parser = RunTestsParser()
333     args = parser.parse_args(sys.argv[1:])
334     # Reset argv to prevent wrong usage by the underlying test framework
335     # e.g. pyats fails by expecting an arg to -p (publish to database) when
336     # called via Robot.run()
337     sys.argv = [sys.argv[0]]
338     runner = Runner()
339     return runner.main(**args).value