1 # Copyright 2015-2016 Intel Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """IntegrationTestCase class
21 from testcases import TestCase
22 from conf import settings as S
23 from collections import OrderedDict
25 CHECK_PREFIX = 'validate_'
27 class IntegrationTestCase(TestCase):
28 """IntegrationTestCase class
31 def __init__(self, cfg):
32 """ Testcase initialization
34 self._type = 'integration'
35 super(IntegrationTestCase, self).__init__(cfg)
36 self._logger = logging.getLogger(__name__)
39 def report_status(self, label, status):
40 """ Log status of test step
42 self._logger.debug("%s ... %s", label, 'OK' if status else 'FAILED')
44 def run_initialize(self):
45 """ Prepare test execution environment
47 super(IntegrationTestCase, self).run_initialize()
48 self._inttest = {'status' : True, 'details' : ''}
53 All setup and teardown through controllers is included.
55 def eval_step_params(params, step_result):
56 """ Evaluates referrences to results from previous steps
58 def eval_param(param, STEP):
61 if isinstance(param, str):
63 # evaluate every #STEP reference inside parameter itself
64 for chunk in param.split('#'):
65 if chunk.startswith('STEP['):
66 tmp_param = tmp_param + str(eval(chunk))
68 tmp_param = tmp_param + chunk
70 elif isinstance(param, list) or isinstance(param, tuple):
73 tmp_list.append(eval_param(item, STEP))
75 elif isinstance(param, dict):
77 for (key, value) in param.items():
78 tmp_dict[key] = eval_param(value, STEP)
84 # evaluate all parameters if needed
86 eval_params.append(eval_param(param, step_result))
89 # prepare test execution environment
92 with self._vswitch_ctl, self._loadgen:
93 with self._vnf_ctl, self._collector:
94 if not self._vswitch_none:
97 # run traffic generator if requested, otherwise wait for manual termination
98 if S.getValue('mode') == 'trafficgen-off':
100 self._logger.debug("All is set. Please run traffic generator manually.")
101 input(os.linesep + "Press Enter to terminate vswitchperf..." + os.linesep + os.linesep)
103 with self._traffic_ctl:
105 self._traffic_ctl.send_traffic(self._traffic)
107 # execute test based on TestSteps definition
109 step_result = [None] * len(self.test)
110 for i, step in enumerate(self.test):
112 if step[0] == 'vswitch':
113 test_object = self._vswitch_ctl.get_vswitch()
114 elif step[0] == 'trafficgen':
115 test_object = self._traffic_ctl
117 self._logger.error("Unsupported test object %s", step[0])
118 self._inttest = {'status' : False, 'details' : ' '.join(step)}
119 self.report_status("Step '{}'".format(' '.join(step)), self._inttest['status'])
122 test_method = getattr(test_object, step[1])
123 test_method_check = getattr(test_object, CHECK_PREFIX + step[1])
126 if test_method and test_method_check and \
127 callable(test_method) and callable(test_method_check):
130 step_params = eval_step_params(step[2:], step_result)
131 step_log = '{} {}'.format(' '.join(step[:2]), step_params)
132 step_result[i] = test_method(*step_params)
133 self._logger.debug("Step {} '{}' results '{}'".format(
134 i, step_log, step_result[i]))
136 step_ok = test_method_check(step_result[i], *step_params)
137 except AssertionError:
138 self._inttest = {'status' : False, 'details' : step_log}
139 self._logger.error("Step {} raised assertion error".format(i))
142 self._inttest = {'status' : False, 'details' : step_log}
143 self._logger.error("Step {} result index error {}".format(
144 i, ' '.join(step[2:])))
147 self.report_status("Step {} - '{}'".format(i, step_log), step_ok)
149 self._inttest = {'status' : False, 'details' : step_log}
152 # dump vswitch flows before they are affected by VNF termination
153 if not self._vswitch_none:
154 self._vswitch_ctl.dump_vswitch_flows()
156 # tear down test execution environment and log results
159 # report test results
162 def run_report(self):
163 """ Report test results
166 results = OrderedDict()
167 results['status'] = 'OK' if self._inttest['status'] else 'FAILED'
168 results['details'] = self._inttest['details']
169 TestCase._write_result_to_file([results], self._output_file)
170 self.report_status("Test '{}'".format(self.name), self._inttest['status'])
171 # inform vsperf about testcase failure
172 if not self._inttest['status']: