X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=testcases%2Fintegration.py;h=f2a5fecf424dfe24c13751b4f7afa45b9809035b;hb=966cfeac1cc6d947cd204613a0aef5f7ecb7816c;hp=53ba17f42e8369e90b652d8e1186d4a66ab04e57;hpb=55db32610210f3163971557382e653be6667e333;p=vswitchperf.git diff --git a/testcases/integration.py b/testcases/integration.py index 53ba17f4..f2a5fecf 100644 --- a/testcases/integration.py +++ b/testcases/integration.py @@ -14,15 +14,10 @@ """IntegrationTestCase class """ -import os -import time import logging -from testcases import TestCase -from conf import settings as S from collections import OrderedDict - -CHECK_PREFIX = 'validate_' +from testcases import TestCase class IntegrationTestCase(TestCase): """IntegrationTestCase class @@ -34,140 +29,20 @@ class IntegrationTestCase(TestCase): self._type = 'integration' super(IntegrationTestCase, self).__init__(cfg) self._logger = logging.getLogger(__name__) - self._inttest = None - - def report_status(self, label, status): - """ Log status of test step - """ - self._logger.debug("%s ... %s", label, 'OK' if status else 'FAILED') - - def run_initialize(self): - """ Prepare test execution environment - """ - super(IntegrationTestCase, self).run_initialize() - self._inttest = {'status' : True, 'details' : ''} - - def run(self): - """Run the test - - All setup and teardown through controllers is included. - """ - def eval_step_params(params, step_result): - """ Evaluates referrences to results from previous steps - """ - def eval_param(param, STEP): - """ Helper function - """ - if isinstance(param, str): - tmp_param = '' - # evaluate every #STEP reference inside parameter itself - for chunk in param.split('#'): - if chunk.startswith('STEP['): - tmp_param = tmp_param + str(eval(chunk)) - else: - tmp_param = tmp_param + chunk - return tmp_param - elif isinstance(param, list) or isinstance(param, tuple): - tmp_list = [] - for item in param: - tmp_list.append(eval_param(item, STEP)) - return tmp_list - elif isinstance(param, dict): - tmp_dict = {} - for (key, value) in param.items(): - tmp_dict[key] = eval_param(value, STEP) - return tmp_dict - else: - return param - - eval_params = [] - # evaluate all parameters if needed - for param in params: - eval_params.append(eval_param(param, step_result)) - return eval_params - - # prepare test execution environment - self.run_initialize() - - with self._vswitch_ctl, self._loadgen: - with self._vnf_ctl, self._collector: - if not self._vswitch_none: - self._add_flows() - - # run traffic generator if requested, otherwise wait for manual termination - if S.getValue('mode') == 'trafficgen-off': - time.sleep(2) - self._logger.debug("All is set. Please run traffic generator manually.") - input(os.linesep + "Press Enter to terminate vswitchperf..." + os.linesep + os.linesep) - else: - with self._traffic_ctl: - if not self.test: - self._traffic_ctl.send_traffic(self._traffic) - else: - # execute test based on TestSteps definition - if self.test: - step_result = [None] * len(self.test) - for i, step in enumerate(self.test): - step_ok = False - if step[0] == 'vswitch': - test_object = self._vswitch_ctl.get_vswitch() - elif step[0] == 'trafficgen': - test_object = self._traffic_ctl - else: - self._logger.error("Unsupported test object %s", step[0]) - self._inttest = {'status' : False, 'details' : ' '.join(step)} - self.report_status("Step '{}'".format(' '.join(step)), self._inttest['status']) - break - - test_method = getattr(test_object, step[1]) - test_method_check = getattr(test_object, CHECK_PREFIX + step[1]) - - step_params = [] - if test_method and test_method_check and \ - callable(test_method) and callable(test_method_check): - - try: - step_params = eval_step_params(step[2:], step_result) - step_log = '{} {}'.format(' '.join(step[:2]), step_params) - step_result[i] = test_method(*step_params) - self._logger.debug("Step {} '{}' results '{}'".format( - i, step_log, step_result[i])) - time.sleep(2) - step_ok = test_method_check(step_result[i], *step_params) - except AssertionError: - self._inttest = {'status' : False, 'details' : step_log} - self._logger.error("Step {} raised assertion error".format(i)) - break - except IndexError: - self._inttest = {'status' : False, 'details' : step_log} - self._logger.error("Step {} result index error {}".format( - i, ' '.join(step[2:]))) - break - - self.report_status("Step {} - '{}'".format(i, step_log), step_ok) - if not step_ok: - self._inttest = {'status' : False, 'details' : step_log} - break - - # dump vswitch flows before they are affected by VNF termination - if not self._vswitch_none: - self._vswitch_ctl.dump_vswitch_flows() - - # tear down test execution environment and log results - self.run_finalize() - - # report test results - self.run_report() + # enforce check of step result for step driven testcases + self._step_check = True def run_report(self): """ Report test results """ if self.test: results = OrderedDict() - results['status'] = 'OK' if self._inttest['status'] else 'FAILED' - results['details'] = self._inttest['details'] - TestCase._write_result_to_file([results], self._output_file) - self.report_status("Test '{}'".format(self.name), self._inttest['status']) + results['status'] = 'OK' if self._step_status['status'] else 'FAILED' + results['details'] = self._step_status['details'] + TestCase.write_result_to_file([results], self._output_file) + self.step_report_status("Test '{}'".format(self.name), self._step_status['status']) # inform vsperf about testcase failure - if not self._inttest['status']: + if not self._step_status['status']: raise Exception + else: + super(IntegrationTestCase, self).run_report()