8c0b11b1b71a14fa3029807ad4da4459d8a31921
[functest.git] / functest / ci / run_tests.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Ericsson AB and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 """ The entry of running tests:
11 1) Parses functest/ci/testcases.yaml to check which testcase(s) to be run
12 2) Execute the common operations on every testcase (run, push results to db...)
13 3) Return the right status code
14 """
15
16 import argparse
17 import importlib
18 import logging
19 import logging.config
20 import os
21 import re
22 import sys
23 import textwrap
24 import pkg_resources
25
26 import enum
27 import prettytable
28 import yaml
29
30 import functest.ci.tier_builder as tb
31 import functest.core.testcase as testcase
32
33 LOGGER = logging.getLogger('functest.ci.run_tests')
34 ENV_FILE = "/home/opnfv/functest/conf/env_file"
35
36
37 class Result(enum.Enum):
38     """The overall result in enumerated type"""
39     # pylint: disable=too-few-public-methods
40     EX_OK = os.EX_OK
41     EX_ERROR = -1
42
43
44 class BlockingTestFailed(Exception):
45     """Exception when the blocking test fails"""
46     pass
47
48
49 class TestNotEnabled(Exception):
50     """Exception when the test is not enabled"""
51     pass
52
53
54 class RunTestsParser(object):
55     """Parser to run tests"""
56     # pylint: disable=too-few-public-methods
57
58     def __init__(self):
59         self.parser = argparse.ArgumentParser()
60         self.parser.add_argument("-t", "--test", dest="test", action='store',
61                                  help="Test case or tier (group of tests) "
62                                  "to be executed. It will run all the test "
63                                  "if not specified.")
64         self.parser.add_argument("-n", "--noclean", help="Do not clean "
65                                  "OpenStack resources after running each "
66                                  "test (default=false).",
67                                  action="store_true")
68         self.parser.add_argument("-r", "--report", help="Push results to "
69                                  "database (default=false).",
70                                  action="store_true")
71
72     def parse_args(self, argv=None):
73         """Parse arguments.
74
75         It can call sys.exit if arguments are incorrect.
76
77         Returns:
78             the arguments from cmdline
79         """
80         return vars(self.parser.parse_args(argv))
81
82
83 class Runner(object):
84     """Runner class"""
85
86     def __init__(self):
87         self.executed_test_cases = {}
88         self.overall_result = Result.EX_OK
89         self.clean_flag = True
90         self.report_flag = False
91         self.tiers = tb.TierBuilder(
92             os.environ.get('INSTALLER_TYPE', None),
93             os.environ.get('DEPLOY_SCENARIO', None),
94             pkg_resources.resource_filename('functest', 'ci/testcases.yaml'))
95
96     @staticmethod
97     def source_envfile(rc_file=ENV_FILE):
98         """Source the env file passed as arg"""
99         if not os.path.isfile(rc_file):
100             LOGGER.debug("No env file %s found", rc_file)
101             return
102         with open(rc_file, "r") as rcfd:
103             LOGGER.info("Sourcing env file %s", rc_file)
104             for line in rcfd:
105                 var = (line.rstrip('"\n').replace('export ', '').split(
106                     "=") if re.search(r'(.*)=(.*)', line) else None)
107                 # The two next lines should be modified as soon as rc_file
108                 # conforms with common rules. Be aware that it could induce
109                 # issues if value starts with '
110                 if var:
111                     key = re.sub(r'^["\' ]*|[ \'"]*$', '', var[0])
112                     value = re.sub(r'^["\' ]*|[ \'"]*$', '', "".join(var[1:]))
113                     os.environ[key] = value
114
115     @staticmethod
116     def get_dict_by_test(testname):
117         # pylint: disable=bad-continuation,missing-docstring
118         with open(pkg_resources.resource_filename(
119                 'functest', 'ci/testcases.yaml')) as tyaml:
120             testcases_yaml = yaml.safe_load(tyaml)
121         for dic_tier in testcases_yaml.get("tiers"):
122             for dic_testcase in dic_tier['testcases']:
123                 if dic_testcase['case_name'] == testname:
124                     return dic_testcase
125         LOGGER.error('Project %s is not defined in testcases.yaml', testname)
126         return None
127
128     @staticmethod
129     def get_run_dict(testname):
130         """Obtain the 'run' block of the testcase from testcases.yaml"""
131         try:
132             dic_testcase = Runner.get_dict_by_test(testname)
133             if not dic_testcase:
134                 LOGGER.error("Cannot get %s's config options", testname)
135             elif 'run' in dic_testcase:
136                 return dic_testcase['run']
137             return None
138         except Exception:  # pylint: disable=broad-except
139             LOGGER.exception("Cannot get %s's config options", testname)
140             return None
141
142     def run_test(self, test):
143         """Run one test case"""
144         if not test.is_enabled():
145             raise TestNotEnabled(
146                 "The test case {} is not enabled".format(test.get_name()))
147         LOGGER.info("Running test case '%s'...", test.get_name())
148         result = testcase.TestCase.EX_RUN_ERROR
149         run_dict = self.get_run_dict(test.get_name())
150         if run_dict:
151             try:
152                 module = importlib.import_module(run_dict['module'])
153                 cls = getattr(module, run_dict['class'])
154                 test_dict = Runner.get_dict_by_test(test.get_name())
155                 test_case = cls(**test_dict)
156                 self.executed_test_cases[test.get_name()] = test_case
157                 try:
158                     kwargs = run_dict['args']
159                     test_case.run(**kwargs)
160                 except KeyError:
161                     test_case.run()
162                 if self.report_flag:
163                     test_case.push_to_db()
164                 if test.get_project() == "functest":
165                     result = test_case.is_successful()
166                 else:
167                     result = testcase.TestCase.EX_OK
168                 LOGGER.info("Test result:\n\n%s\n", test_case)
169                 if self.clean_flag:
170                     test_case.clean()
171             except ImportError:
172                 LOGGER.exception("Cannot import module %s", run_dict['module'])
173             except AttributeError:
174                 LOGGER.exception("Cannot get class %s", run_dict['class'])
175         else:
176             raise Exception("Cannot import the class for the test case.")
177         return result
178
179     def run_tier(self, tier):
180         """Run one tier"""
181         tier_name = tier.get_name()
182         tests = tier.get_tests()
183         if not tests:
184             LOGGER.info("There are no supported test cases in this tier "
185                         "for the given scenario")
186             self.overall_result = Result.EX_ERROR
187         else:
188             LOGGER.info("Running tier '%s'", tier_name)
189             for test in tests:
190                 self.run_test(test)
191                 test_case = self.executed_test_cases[test.get_name()]
192                 if test_case.is_successful() != testcase.TestCase.EX_OK:
193                     LOGGER.error("The test case '%s' failed.", test.get_name())
194                     if test.get_project() == "functest":
195                         self.overall_result = Result.EX_ERROR
196                     if test.is_blocking():
197                         raise BlockingTestFailed(
198                             "The test case {} failed and is blocking".format(
199                                 test.get_name()))
200         return self.overall_result
201
202     def run_all(self):
203         """Run all available testcases"""
204         tiers_to_run = []
205         msg = prettytable.PrettyTable(
206             header_style='upper', padding_width=5,
207             field_names=['tiers', 'order', 'CI Loop', 'description',
208                          'testcases'])
209         for tier in self.tiers.get_tiers():
210             ci_loop = os.environ.get('CI_LOOP', None)
211             if (tier.get_tests() and ci_loop and
212                     re.search(ci_loop, tier.get_ci_loop()) is not None):
213                 tiers_to_run.append(tier)
214                 msg.add_row([tier.get_name(), tier.get_order(),
215                              tier.get_ci_loop(),
216                              textwrap.fill(tier.description, width=40),
217                              textwrap.fill(' '.join([str(x.get_name(
218                                  )) for x in tier.get_tests()]), width=40)])
219         LOGGER.info("TESTS TO BE EXECUTED:\n\n%s\n", msg)
220         for tier in tiers_to_run:
221             self.run_tier(tier)
222
223     def main(self, **kwargs):
224         """Entry point of class Runner"""
225         if 'noclean' in kwargs:
226             self.clean_flag = not kwargs['noclean']
227         if 'report' in kwargs:
228             self.report_flag = kwargs['report']
229         try:
230             if 'test' in kwargs:
231                 LOGGER.debug("Sourcing the credential file...")
232                 self.source_envfile()
233
234                 LOGGER.debug("Test args: %s", kwargs['test'])
235                 if self.tiers.get_tier(kwargs['test']):
236                     self.run_tier(self.tiers.get_tier(kwargs['test']))
237                 elif self.tiers.get_test(kwargs['test']):
238                     result = self.run_test(
239                         self.tiers.get_test(kwargs['test']))
240                     if result != testcase.TestCase.EX_OK:
241                         LOGGER.error("The test case '%s' failed.",
242                                      kwargs['test'])
243                         self.overall_result = Result.EX_ERROR
244                 elif kwargs['test'] == "all":
245                     self.run_all()
246                 else:
247                     LOGGER.error("Unknown test case or tier '%s', or not "
248                                  "supported by the given scenario '%s'.",
249                                  kwargs['test'],
250                                  os.environ.get('DEPLOY_SCENARIO', ""))
251                     LOGGER.debug("Available tiers are:\n\n%s",
252                                  self.tiers)
253                     return Result.EX_ERROR
254             else:
255                 self.run_all()
256         except BlockingTestFailed:
257             pass
258         except Exception:  # pylint: disable=broad-except
259             LOGGER.exception("Failures when running testcase(s)")
260             self.overall_result = Result.EX_ERROR
261         if not self.tiers.get_test(kwargs['test']):
262             self.summary(self.tiers.get_tier(kwargs['test']))
263         LOGGER.info("Execution exit value: %s", self.overall_result)
264         return self.overall_result
265
266     def summary(self, tier=None):
267         """To generate functest report showing the overall results"""
268         msg = prettytable.PrettyTable(
269             header_style='upper', padding_width=5,
270             field_names=['env var', 'value'])
271         for env_var in ['INSTALLER_TYPE', 'DEPLOY_SCENARIO', 'BUILD_TAG',
272                         'CI_LOOP']:
273             msg.add_row([env_var, os.environ.get(env_var, "")])
274         LOGGER.info("Deployment description:\n\n%s\n", msg)
275         msg = prettytable.PrettyTable(
276             header_style='upper', padding_width=5,
277             field_names=['test case', 'project', 'tier',
278                          'duration', 'result'])
279         tiers = [tier] if tier else self.tiers.get_tiers()
280         for each_tier in tiers:
281             for test in each_tier.get_tests():
282                 try:
283                     test_case = self.executed_test_cases[test.get_name()]
284                 except KeyError:
285                     msg.add_row([test.get_name(), test.get_project(),
286                                  each_tier.get_name(), "00:00", "SKIP"])
287                 else:
288                     result = 'PASS' if(test_case.is_successful(
289                         ) == test_case.EX_OK) else 'FAIL'
290                     msg.add_row(
291                         [test_case.case_name, test_case.project_name,
292                          self.tiers.get_tier_name(test_case.case_name),
293                          test_case.get_duration(), result])
294             for test in each_tier.get_skipped_test():
295                 msg.add_row([test.get_name(), test.get_project(),
296                              each_tier.get_name(), "00:00", "SKIP"])
297         LOGGER.info("FUNCTEST REPORT:\n\n%s\n", msg)
298
299
300 def main():
301     """Entry point"""
302     logging.config.fileConfig(pkg_resources.resource_filename(
303         'functest', 'ci/logging.ini'))
304     logging.captureWarnings(True)
305     parser = RunTestsParser()
306     args = parser.parse_args(sys.argv[1:])
307     runner = Runner()
308     return runner.main(**args).value