1 # Copyright 2015-2016 Intel Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """IntegrationTestCase class
22 from testcases import TestCase
23 from conf import settings as S
24 from collections import OrderedDict
25 from tools import namespace
26 from tools import veth
27 from core.loader import Loader
29 CHECK_PREFIX = 'validate_'
32 class IntegrationTestCase(TestCase):
33 """IntegrationTestCase class
36 def __init__(self, cfg):
37 """ Testcase initialization
39 self._type = 'integration'
40 super(IntegrationTestCase, self).__init__(cfg)
41 self._logger = logging.getLogger(__name__)
44 def report_status(self, label, status):
45 """ Log status of test step
47 self._logger.info("%s ... %s", label, 'OK' if status else 'FAILED')
49 def run_initialize(self):
50 """ Prepare test execution environment
52 super(IntegrationTestCase, self).run_initialize()
53 self._inttest = {'status' : True, 'details' : ''}
58 All setup and teardown through controllers is included.
60 def eval_step_params(params, step_result):
61 """ Evaluates referrences to results from previous steps
63 def eval_param(param, STEP):
66 if isinstance(param, str):
68 # evaluate every #STEP reference inside parameter itself
69 for chunk in param.split('#'):
70 if chunk.startswith('STEP['):
71 tmp_param = tmp_param + str(eval(chunk))
73 tmp_param = tmp_param + chunk
75 elif isinstance(param, list) or isinstance(param, tuple):
78 tmp_list.append(eval_param(item, STEP))
80 elif isinstance(param, dict):
82 for (key, value) in param.items():
83 tmp_dict[key] = eval_param(value, STEP)
89 # evaluate all parameters if needed
91 eval_params.append(eval_param(param, step_result))
94 # prepare test execution environment
98 with self._vswitch_ctl, self._loadgen:
99 with self._vnf_ctl, self._collector:
100 if not self._vswitch_none:
103 # run traffic generator if requested, otherwise wait for manual termination
104 if S.getValue('mode') == 'trafficgen-off':
106 self._logger.debug("All is set. Please run traffic generator manually.")
107 input(os.linesep + "Press Enter to terminate vswitchperf..." + os.linesep + os.linesep)
109 with self._traffic_ctl:
111 self._traffic_ctl.send_traffic(self._traffic)
115 # execute test based on TestSteps definition
117 # initialize list with results
118 step_result = [None] * len(self.test)
120 # count how many VNFs are involved in the test
121 for step in self.test:
122 if step[0].startswith('vnf'):
123 vnf_list[step[0]] = None
125 # check/expand GUEST configuration and copy data to shares
127 S.check_vm_settings(len(vnf_list))
128 self._copy_fwd_tools_for_all_guests(len(vnf_list))
130 # run test step by step...
131 for i, step in enumerate(self.test):
133 if step[0] == 'vswitch':
134 test_object = self._vswitch_ctl.get_vswitch()
135 elif step[0] == 'namespace':
136 test_object = namespace
137 elif step[0] == 'veth':
139 elif step[0] == 'trafficgen':
140 test_object = self._traffic_ctl
141 # in case of send_traffic method, ensure that specified
142 # traffic values are merged with existing self._traffic
143 if step[1] == 'send_traffic':
144 tmp_traffic = copy.deepcopy(self._traffic)
145 tmp_traffic.update(step[2])
146 step[2] = tmp_traffic
147 elif step[0].startswith('vnf'):
148 if not vnf_list[step[0]]:
150 vnf_list[step[0]] = loader.get_vnf_class()()
151 test_object = vnf_list[step[0]]
153 self._logger.error("Unsupported test object %s", step[0])
154 self._inttest = {'status' : False, 'details' : ' '.join(step)}
155 self.report_status("Step '{}'".format(' '.join(step)), self._inttest['status'])
158 test_method = getattr(test_object, step[1])
159 test_method_check = getattr(test_object, CHECK_PREFIX + step[1])
162 if test_method and test_method_check and \
163 callable(test_method) and callable(test_method_check):
166 step_params = eval_step_params(step[2:], step_result)
167 step_log = '{} {}'.format(' '.join(step[:2]), step_params)
168 step_result[i] = test_method(*step_params)
169 self._logger.debug("Step %s '%s' results '%s'", i,
170 step_log, step_result[i])
172 step_ok = test_method_check(step_result[i], *step_params)
173 except AssertionError:
174 self._inttest = {'status' : False, 'details' : step_log}
175 self._logger.error("Step %s raised assertion error", i)
176 # stop vnfs in case of error
181 self._inttest = {'status' : False, 'details' : step_log}
182 self._logger.error("Step %s result index error %s", i,
184 # stop vnfs in case of error
189 self.report_status("Step {} - '{}'".format(i, step_log), step_ok)
191 self._inttest = {'status' : False, 'details' : step_log}
192 # stop vnfs in case of error
197 # dump vswitch flows before they are affected by VNF termination
198 if not self._vswitch_none:
199 self._vswitch_ctl.dump_vswitch_flows()
201 # tear down test execution environment and log results
204 # report test results
207 def run_report(self):
208 """ Report test results
211 results = OrderedDict()
212 results['status'] = 'OK' if self._inttest['status'] else 'FAILED'
213 results['details'] = self._inttest['details']
214 TestCase.write_result_to_file([results], self._output_file)
215 self.report_status("Test '{}'".format(self.name), self._inttest['status'])
216 # inform vsperf about testcase failure
217 if not self._inttest['status']: