X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=testcases%2Ftestcase.py;h=68b8aec4fd6d2590485f6a2d6e47eeba3c5764e6;hb=03c55c5804a60b19c182752d38d90a29eefdc84a;hp=55c940a463924488dbc05900c017592cc4f4ecc7;hpb=a09699652cdb1d68a333dcb9bffb2062d681f441;p=vswitchperf.git diff --git a/testcases/testcase.py b/testcases/testcase.py index 55c940a4..68b8aec4 100644 --- a/testcases/testcase.py +++ b/testcases/testcase.py @@ -1,4 +1,4 @@ -# Copyright 2015-2016 Intel Corporation. +# Copyright 2015-2018 Intel Corporation, Tieto and others. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,8 +24,9 @@ import re import time import subprocess +from datetime import datetime as dt from conf import settings as S -from conf import get_test_param +from conf import merge_spec import core.component_factory as component_factory from core.loader import Loader from core.results.results_constants import ResultsConstants @@ -35,24 +36,31 @@ from tools import functions from tools import namespace from tools import veth from tools.teststepstools import TestStepsTools -from tools.pkt_gen.trafficgen.trafficgenhelper import TRAFFIC_DEFAULTS +from tools.llc_management import rmd CHECK_PREFIX = 'validate_' +# pylint: disable=too-many-instance-attributes class TestCase(object): """TestCase base class In this basic form runs RFC2544 throughput test """ - def __init__(self, cfg): + # pylint: disable=too-many-statements + def __init__(self, test_cfg): """Pull out fields from test config - :param cfg: A dictionary of string-value pairs describing the test + :param test_cfg: A dictionary of string-value pairs describing the test configuration. Both the key and values strings use well-known values. :param results_dir: Where the csv formatted results are written. """ + # make a local copy of test configuration to avoid modification of + # original content used in vsperf main script + cfg = copy.deepcopy(test_cfg) + self._testcase_start_time = time.time() + self._testcase_stop_time = self._testcase_start_time self._hugepages_mounted = False self._traffic_ctl = None self._vnf_ctl = None @@ -61,28 +69,27 @@ class TestCase(object): self._loadgen = None self._output_file = None self._tc_results = None - self._settings_original = {} self._settings_paths_modified = False self._testcast_run_time = None + self._versions = [] # initialization of step driven specific members self._step_check = False # by default don't check result for step driven testcases self._step_vnf_list = {} self._step_result = [] + self._step_result_mapping = {} self._step_status = None + self._step_send_traffic = False # indication if send_traffic was called within test steps + self._vnf_list = [] self._testcase_run_time = None - # store all GUEST_ specific settings to keep original values before their expansion - for key in S.__dict__: - if key.startswith('GUEST_'): - self._settings_original[key] = S.getValue(key) - - self._update_settings('VSWITCH', cfg.get('vSwitch', S.getValue('VSWITCH'))) - self._update_settings('VNF', cfg.get('VNF', S.getValue('VNF'))) - self._update_settings('TRAFFICGEN', cfg.get('Trafficgen', S.getValue('TRAFFICGEN'))) + S.setValue('VSWITCH', cfg.get('vSwitch', S.getValue('VSWITCH'))) + S.setValue('VNF', cfg.get('VNF', S.getValue('VNF'))) + S.setValue('TRAFFICGEN', cfg.get('Trafficgen', S.getValue('TRAFFICGEN'))) + S.setValue('TUNNEL_TYPE', cfg.get('Tunnel Type', S.getValue('TUNNEL_TYPE'))) test_params = copy.deepcopy(S.getValue('TEST_PARAMS')) tc_test_params = cfg.get('Parameters', S.getValue('TEST_PARAMS')) - test_params.update(tc_test_params) - self._update_settings('TEST_PARAMS', test_params) + test_params = merge_spec(test_params, tc_test_params) + S.setValue('TEST_PARAMS', test_params) S.check_test_params() # override all redefined GUEST_ values to have them expanded correctly @@ -101,70 +108,43 @@ class TestCase(object): self.desc = cfg.get('Description', 'No description given.') self.test = cfg.get('TestSteps', None) - bidirectional = cfg.get('biDirectional', TRAFFIC_DEFAULTS['bidir']) - bidirectional = get_test_param('bidirectional', bidirectional) - if not isinstance(bidirectional, str): + # log testcase name and details + tmp_desc = functions.format_description(self.desc, 50) + self._logger.info('############################################################') + self._logger.info('# Test: %s', self.name) + self._logger.info('# Details: %s', tmp_desc[0]) + for i in range(1, len(tmp_desc)): + self._logger.info('# %s', tmp_desc[i]) + self._logger.info('############################################################') + + bidirectional = S.getValue('TRAFFIC')['bidir'] + if not isinstance(S.getValue('TRAFFIC')['bidir'], str): raise TypeError( - 'Bi-dir value must be of type string in testcase configuration') + 'Bi-dir value must be of type string') bidirectional = bidirectional.title() # Keep things consistent - traffic_type = cfg.get('Traffic Type', TRAFFIC_DEFAULTS['traffic_type']) - traffic_type = get_test_param('traffic_type', traffic_type) - - framerate = cfg.get('iLoad', TRAFFIC_DEFAULTS['frame_rate']) - framerate = get_test_param('iload', framerate) - self.deployment = cfg['Deployment'] self._frame_mod = cfg.get('Frame Modification', None) - self._tunnel_type = None - self._tunnel_operation = None - - if self.deployment == 'op2p': - self._tunnel_operation = cfg['Tunnel Operation'] - - if 'Tunnel Type' in cfg: - self._tunnel_type = cfg['Tunnel Type'] - self._tunnel_type = get_test_param('tunnel_type', - self._tunnel_type) - - # read configuration of streams; CLI parameter takes precedence to - # testcase definition - multistream = cfg.get('MultiStream', TRAFFIC_DEFAULTS['multistream']) - multistream = get_test_param('multistream', multistream) - stream_type = cfg.get('Stream Type', TRAFFIC_DEFAULTS['stream_type']) - stream_type = get_test_param('stream_type', stream_type) - pre_installed_flows = cfg.get('Pre-installed Flows', TRAFFIC_DEFAULTS['pre_installed_flows']) - pre_installed_flows = get_test_param('pre-installed_flows', pre_installed_flows) + self._tunnel_operation = cfg.get('Tunnel Operation', None) # check if test requires background load and which generator it uses self._load_cfg = cfg.get('Load', None) - if self._load_cfg and 'tool' in self._load_cfg: - self._loadgen = self._load_cfg['tool'] - else: - # background load is not requested, so use dummy implementation - self._loadgen = "Dummy" if self._frame_mod: self._frame_mod = self._frame_mod.lower() self._results_dir = S.getValue('RESULTS_PATH') # set traffic details, so they can be passed to vswitch and traffic ctls - self._traffic = copy.deepcopy(TRAFFIC_DEFAULTS) - self._traffic.update({'traffic_type': traffic_type, - 'flow_type': cfg.get('Flow Type', TRAFFIC_DEFAULTS['flow_type']), - 'bidir': bidirectional, - 'tunnel_type': self._tunnel_type, - 'multistream': int(multistream), - 'stream_type': stream_type, - 'pre_installed_flows' : pre_installed_flows, - 'frame_rate': int(framerate)}) + self._traffic = copy.deepcopy(S.getValue('TRAFFIC')) + self._traffic.update({'bidir': bidirectional}) # Packet Forwarding mode - self._vswitch_none = 'none' == S.getValue('VSWITCH').strip().lower() + self._vswitch_none = str(S.getValue('VSWITCH')).strip().lower() == 'none' # trafficgen configuration required for tests of tunneling protocols - if self.deployment == "op2p": + if self._tunnel_operation: + self._traffic.update({'tunnel_type': S.getValue('TUNNEL_TYPE')}) self._traffic['l2'].update({'srcmac': S.getValue('TRAFFICGEN_PORT1_MAC'), 'dstmac': @@ -176,10 +156,11 @@ class TestCase(object): S.getValue('TRAFFICGEN_PORT2_IP')}) if self._tunnel_operation == "decapsulation": - self._traffic['l2'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L2') - self._traffic['l3'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L3') - self._traffic['l4'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L4') - elif len(S.getValue('NICS')) and \ + self._traffic['l2'].update(S.getValue(S.getValue('TUNNEL_TYPE').upper() + '_FRAME_L2')) + self._traffic['l3'].update(S.getValue(S.getValue('TUNNEL_TYPE').upper() + '_FRAME_L3')) + self._traffic['l4'].update(S.getValue(S.getValue('TUNNEL_TYPE').upper() + '_FRAME_L4')) + self._traffic['l2']['dstmac'] = S.getValue('NICS')[1]['mac'] + elif len(S.getValue('NICS')) >= 2 and \ (S.getValue('NICS')[0]['type'] == 'vf' or S.getValue('NICS')[1]['type'] == 'vf'): mac1 = S.getValue('NICS')[0]['mac'] @@ -189,17 +170,21 @@ class TestCase(object): else: self._logger.debug("MAC addresses can not be read") + self._traffic = functions.check_traffic(self._traffic) + # count how many VNFs are involved in TestSteps if self.test: for step in self.test: if step[0].startswith('vnf'): self._step_vnf_list[step[0]] = None + # if llc allocation is required, initialize it. + if S.getValue('LLC_ALLOCATION'): + self._rmd = rmd.CacheAllocator() + def run_initialize(self): """ Prepare test execution environment """ - self._logger.debug(self.name) - # mount hugepages if needed self._mount_hugepages() @@ -214,6 +199,8 @@ class TestCase(object): loader.get_vnf_class(), len(self._step_vnf_list)) + self._vnf_list = self._vnf_ctl.get_vnfs() + # verify enough hugepages are free to run the testcase if not self._check_for_enough_hugepages(): raise RuntimeError('Not enough hugepages free to run test.') @@ -259,14 +246,18 @@ class TestCase(object): loader.get_collector_class(), self._results_dir, self.name) self._loadgen = component_factory.create_loadgen( - self._loadgen, + loader.get_loadgen_class(), self._load_cfg) - self._output_file = os.path.join(self._results_dir, "result_" + self.name + - "_" + self.deployment + ".csv") + self._output_file = os.path.join(self._results_dir, "result_{}_{}_{}.csv".format( + str(S.getValue('_TEST_INDEX')), self.name, self.deployment)) self._step_status = {'status' : True, 'details' : ''} + # Perform LLC-allocations + if S.getValue('LLC_ALLOCATION'): + self._rmd.setup_llc_allocation() + self._logger.debug("Setup:") def run_finalize(self): @@ -275,16 +266,20 @@ class TestCase(object): # Stop all VNFs started by TestSteps in case that something went wrong self.step_stop_vnfs() + # Cleanup any LLC-allocations + if S.getValue('LLC_ALLOCATION'): + self._rmd.cleanup_llc_allocation() + + # Stop all processes executed by testcase + tasks.terminate_all_tasks(self._logger) + # umount hugepages if mounted self._umount_hugepages() - # restore original settings - S.load_from_dict(self._settings_original) - # cleanup any namespaces created if os.path.isdir('/tmp/namespaces'): namespace_list = os.listdir('/tmp/namespaces') - if len(namespace_list): + if namespace_list: self._logger.info('Cleaning up namespaces') for name in namespace_list: namespace.delete_namespace(name) @@ -292,7 +287,7 @@ class TestCase(object): # cleanup any veth ports created if os.path.isdir('/tmp/veth'): veth_list = os.listdir('/tmp/veth') - if len(veth_list): + if veth_list: self._logger.info('Cleaning up veth ports') for eth in veth_list: port1, port2 = eth.split('-') @@ -310,8 +305,31 @@ class TestCase(object): self._logger.debug("Traffic Results:") self._traffic_ctl.print_results() + if self._tc_results is None: self._tc_results = self._append_results(results) - TestCase.write_result_to_file(self._tc_results, self._output_file) + else: + # integration step driven tests have their status and possible + # failure details stored inside self._tc_results + results = self._append_results(results) + if len(self._tc_results) < len(results): + if len(self._tc_results) > 1: + raise RuntimeError('Testcase results do not match:' + 'results: %s\n' + 'trafficgen results: %s\n' % + self._tc_results, + results) + else: + tmp_results = copy.deepcopy(self._tc_results[0]) + self._tc_results = [] + for res in results: + tmp_res = copy.deepcopy(tmp_results) + tmp_res.update(res) + self._tc_results.append(tmp_res) + else: + for i, result in enumerate(results): + self._tc_results[i].update(result) + + TestCase.write_result_to_file(self._tc_results, self._output_file) def run(self): """Run the test @@ -322,18 +340,20 @@ class TestCase(object): self.run_initialize() try: - with self._vswitch_ctl, self._loadgen: - with self._vnf_ctl, self._collector: + with self._vswitch_ctl: + with self._vnf_ctl, self._collector, self._loadgen: if not self._vswitch_none: self._add_flows() + self._versions += self._vswitch_ctl.get_vswitch().get_version() + with self._traffic_ctl: # execute test based on TestSteps definition if needed... if self.step_run(): # ...and continue with traffic generation, but keep # in mind, that clean deployment does not configure # OVS nor executes the traffic - if self.deployment != 'clean': + if self.deployment != 'clean' and not self._step_send_traffic: self._traffic_ctl.send_traffic(self._traffic) # dump vswitch flows before they are affected by VNF termination @@ -347,26 +367,14 @@ class TestCase(object): # tear down test execution environment and log results self.run_finalize() + self._testcase_stop_time = time.time() self._testcase_run_time = time.strftime("%H:%M:%S", - time.gmtime(time.time() - + time.gmtime(self._testcase_stop_time - self._testcase_start_time)) - logging.info("Testcase execution time: " + self._testcase_run_time) + logging.info("Testcase execution time: %s", self._testcase_run_time) # report test results self.run_report() - def _update_settings(self, param, value): - """ Check value of given configuration parameter - In case that new value is different, then testcase - specific settings is updated and original value stored - - :param param: Name of parameter inside settings - :param value: Disired parameter value - """ - orig_value = S.getValue(param) - if orig_value != value: - self._settings_original[param] = copy.deepcopy(orig_value) - S.setValue(param, value) - def _append_results(self, results): """ Method appends mandatory Test Case results to list of dictionaries. @@ -379,16 +387,22 @@ class TestCase(object): for item in results: item[ResultsConstants.ID] = self.name item[ResultsConstants.DEPLOYMENT] = self.deployment + item[ResultsConstants.VSWITCH] = S.getValue('VSWITCH') item[ResultsConstants.TRAFFIC_TYPE] = self._traffic['l3']['proto'] item[ResultsConstants.TEST_RUN_TIME] = self._testcase_run_time + # convert timestamps to human readable format + item[ResultsConstants.TEST_START_TIME] = dt.fromtimestamp( + self._testcase_start_time).strftime('%Y-%m-%d %H:%M:%S') + item[ResultsConstants.TEST_STOP_TIME] = dt.fromtimestamp( + self._testcase_stop_time).strftime('%Y-%m-%d %H:%M:%S') if self._traffic['multistream']: item[ResultsConstants.SCAL_STREAM_COUNT] = self._traffic['multistream'] item[ResultsConstants.SCAL_STREAM_TYPE] = self._traffic['stream_type'] item[ResultsConstants.SCAL_PRE_INSTALLED_FLOWS] = self._traffic['pre_installed_flows'] if self._vnf_ctl.get_vnfs_number(): item[ResultsConstants.GUEST_LOOPBACK] = ' '.join(S.getValue('GUEST_LOOPBACK')) - if self._tunnel_type: - item[ResultsConstants.TUNNEL_TYPE] = self._tunnel_type + if self._tunnel_operation: + item[ResultsConstants.TUNNEL_TYPE] = S.getValue('TUNNEL_TYPE') return results def _copy_fwd_tools_for_all_guests(self, vm_count): @@ -451,10 +465,11 @@ class TestCase(object): def _mount_hugepages(self): """Mount hugepages if usage of DPDK or Qemu is detected """ + # pylint: disable=too-many-boolean-expressions # hugepages are needed by DPDK and Qemu if not self._hugepages_mounted and \ (self.deployment.count('v') or \ - S.getValue('VSWITCH').lower().count('dpdk') or \ + str(S.getValue('VSWITCH')).lower().count('dpdk') or \ self._vswitch_none or \ self.test and 'vnf' in [step[0][0:3] for step in self.test]): hugepages.mount_hugepages() @@ -480,26 +495,11 @@ class TestCase(object): # get hugepage amounts for each socket on dpdk sock0_mem, sock1_mem = 0, 0 - if S.getValue('VSWITCH').lower().count('dpdk'): - # the import below needs to remain here and not put into the module - # imports because of an exception due to settings not yet loaded - from vswitches import ovs_dpdk_vhost - if ovs_dpdk_vhost.OvsDpdkVhost.old_dpdk_config(): - match = re.search( - r'-socket-mem\s+(\d+),(\d+)', - ''.join(S.getValue('VSWITCHD_DPDK_ARGS'))) - if match: - sock0_mem, sock1_mem = (int(match.group(1)) * 1024 / hugepage_size, - int(match.group(2)) * 1024 / hugepage_size) - else: - logging.info( - 'Could not parse socket memory config in dpdk params.') - else: - sock0_mem, sock1_mem = ( - S.getValue( - 'VSWITCHD_DPDK_CONFIG')['dpdk-socket-mem'].split(',')) - sock0_mem, sock1_mem = (int(sock0_mem) * 1024 / hugepage_size, - int(sock1_mem) * 1024 / hugepage_size) + + if str(S.getValue('VSWITCH')).lower().count('dpdk'): + sock_mem = S.getValue('DPDK_SOCKET_MEM') + sock0_mem, sock1_mem = (int(sock_mem[0]) * 1024 / hugepage_size, + int(sock_mem[1]) * 1024 / hugepage_size) # If hugepages needed, verify the amounts are free if any([hugepages_needed, sock0_mem, sock1_mem]): @@ -528,8 +528,8 @@ class TestCase(object): else: result3 = True - logging.info('Need a total of {} total hugepages'.format( - hugepages_needed + sock1_mem + sock0_mem)) + logging.info('Need a total of %s total hugepages', + hugepages_needed + sock1_mem + sock0_mem) # The only drawback here is sometimes dpdk doesn't release # its hugepages on a test failure. This could cause a test @@ -554,7 +554,7 @@ class TestCase(object): """ with open(output, 'a') as csvfile: - logging.info("Write results to file: " + output) + logging.info("Write results to file: %s", output) fieldnames = TestCase._get_unique_keys(results) writer = csv.DictWriter(csvfile, fieldnames) @@ -584,7 +584,7 @@ class TestCase(object): """Add flows to the vswitch """ vswitch = self._vswitch_ctl.get_vswitch() - # TODO BOM 15-08-07 the frame mod code assumes that the + # NOTE BOM 15-08-07 the frame mod code assumes that the # physical ports are ports 1 & 2. The actual numbers # need to be retrived from the vSwitch and the metadata value # updated accordingly. @@ -650,7 +650,7 @@ class TestCase(object): 'goto_table:3']} vswitch.add_flow(bridge, flow) elif self._frame_mod == "ip_port": - # TODO BOM 15-08-27 The traffic generated is assumed + # NOTE BOM 15-08-27 The traffic generated is assumed # to be UDP (nw_proto 17d) which is the default case but # we will need to pick up the actual traffic params in use. flow = {'table':'2', 'priority':'1000', 'metadata':'2', @@ -682,43 +682,59 @@ class TestCase(object): if self._step_vnf_list[vnf]: self._step_vnf_list[vnf].stop() - @staticmethod - def step_eval_param(param, STEP): - # pylint: disable=invalid-name + def step_eval_param(self, param, step_result): """ Helper function for #STEP macro evaluation """ if isinstance(param, str): # evaluate every #STEP reference inside parameter itself - macros = re.findall(r'#STEP\[[\w\[\]\-\'\"]+\]', param) + macros = re.findall(r'(#STEP\[([\w\-:]+)\]((\[[\w\-\'\"]+\])*))', param) + if macros: for macro in macros: + if macro[1] in self._step_result_mapping: + key = self._step_result_mapping[macro[1]] + else: + key = macro[1] # pylint: disable=eval-used - tmp_val = str(eval(macro[1:])) - param = param.replace(macro, tmp_val) + tmp_val = str(eval('step_result[{}]{}'.format(key, macro[2]))) + param = param.replace(macro[0], tmp_val) + + # evaluate references to vsperf configuration options + macros = re.findall(r'\$(([\w\-]+)(\[[\w\[\]\-\'\"]+\])*)', param) + if macros: + for macro in macros: + # pylint: disable=eval-used + try: + tmp_val = str(eval("S.getValue('{}'){}".format(macro[1], macro[2]))) + param = param.replace('${}'.format(macro[0]), tmp_val) + # ignore that required option can't be evaluated + except (IndexError, KeyError, AttributeError): + self._logger.debug("Skipping %s as it isn't a configuration " + "parameter.", '${}'.format(macro[0])) return param - elif isinstance(param, list) or isinstance(param, tuple): + elif isinstance(param, (list, tuple)): tmp_list = [] for item in param: - tmp_list.append(TestCase.step_eval_param(item, STEP)) + tmp_list.append(self.step_eval_param(item, step_result)) return tmp_list elif isinstance(param, dict): tmp_dict = {} for (key, value) in param.items(): - tmp_dict[key] = TestCase.step_eval_param(value, STEP) + tmp_dict[key] = self.step_eval_param(value, step_result) return tmp_dict else: return param - @staticmethod - def step_eval_params(params, step_result): + def step_eval_params(self, params, step_result): """ Evaluates referrences to results from previous steps """ eval_params = [] # evaluate all parameters if needed for param in params: - eval_params.append(TestCase.step_eval_param(param, step_result)) + eval_params.append(self.step_eval_param(param, step_result)) return eval_params + # pylint: disable=too-many-locals, too-many-branches, too-many-statements def step_run(self): """ Execute actions specified by TestSteps list @@ -737,6 +753,30 @@ class TestCase(object): # run test step by step... for i, step in enumerate(self.test): step_ok = not self._step_check + step_check = self._step_check + regex = None + # configure step result mapping if step alias/label is detected + if step[0].startswith('#'): + key = step[0][1:] + if key.isdigit(): + raise RuntimeError('Step alias can\'t be an integer value {}'.format(key)) + if key in self._step_result_mapping: + raise RuntimeError('Step alias {} has been used already for step ' + '{}'.format(key, self._step_result_mapping[key])) + self._step_result_mapping[step[0][1:]] = i + step = step[1:] + + # store regex filter if it is specified + if isinstance(step[-1], str) and step[-1].startswith('|'): + # evalute macros and variables used in regex + regex = self.step_eval_params([step[-1][1:]], self._step_result[:i])[0] + step = step[:-1] + + # check if step verification should be suppressed + if step[0].startswith('!'): + step_check = False + step_ok = True + step[0] = step[0][1:] if step[0] == 'vswitch': test_object = self._vswitch_ctl.get_vswitch() elif step[0] == 'namespace': @@ -756,15 +796,42 @@ class TestCase(object): tmp_traffic = copy.deepcopy(self._traffic) tmp_traffic.update(step[2]) step[2] = tmp_traffic + # store indication that traffic has been sent + # so it is not sent again after the execution of teststeps + self._step_send_traffic = True elif step[0].startswith('vnf'): + # use vnf started within TestSteps if not self._step_vnf_list[step[0]]: # initialize new VM self._step_vnf_list[step[0]] = loader.get_vnf_class()() test_object = self._step_vnf_list[step[0]] + elif step[0].startswith('VNF'): + if step[1] in ('start', 'stop'): + raise RuntimeError("Cannot execute start() or stop() method of " + "VNF deployed automatically by scenario.") + # use vnf started by scenario deployment (e.g. pvp) + vnf_index = int(step[0][3:]) + try: + test_object = self._vnf_list[vnf_index] + except IndexError: + raise RuntimeError("VNF with index {} is not running.".format(vnf_index)) elif step[0] == 'wait': input(os.linesep + "Step {}: Press Enter to continue with " "the next step...".format(i) + os.linesep + os.linesep) continue + elif step[0] == 'sleep': + self._logger.debug("Sleep %s seconds", step[1]) + time.sleep(int(step[1])) + continue + elif step[0] == 'log': + test_object = self._logger + # there isn't a need for validation of log entry + step_check = False + step_ok = True + elif step[0] == 'pdb': + import pdb + pdb.set_trace() + continue else: self._logger.error("Unsupported test object %s", step[0]) self._step_status = {'status' : False, 'details' : ' '.join(step)} @@ -773,7 +840,7 @@ class TestCase(object): return False test_method = getattr(test_object, step[1]) - if self._step_check: + if step_check: test_method_check = getattr(test_object, CHECK_PREFIX + step[1]) else: test_method_check = None @@ -782,20 +849,26 @@ class TestCase(object): try: # eval parameters, but use only valid step_results # to support negative indexes - step_params = TestCase.step_eval_params(step[2:], self._step_result[:i]) + step_params = self.step_eval_params(step[2:], self._step_result[:i]) step_log = '{} {}'.format(' '.join(step[:2]), step_params) + step_log += ' filter "{}"'.format(regex) if regex else '' self._logger.debug("Step %s '%s' start", i, step_log) self._step_result[i] = test_method(*step_params) + if regex: + # apply regex to step output + self._step_result[i] = functions.filter_output( + self._step_result[i], regex) + self._logger.debug("Step %s '%s' results '%s'", i, step_log, self._step_result[i]) - time.sleep(5) - if self._step_check: + time.sleep(S.getValue('TEST_STEP_DELAY')) + if step_check: step_ok = test_method_check(self._step_result[i], *step_params) except (AssertionError, AttributeError, IndexError) as ex: step_ok = False self._logger.error("Step %s raised %s", i, type(ex).__name__) - if self._step_check: + if step_check: self.step_report_status("Step {} - '{}'".format(i, step_log), step_ok) if not step_ok: @@ -806,3 +879,36 @@ class TestCase(object): # all steps processed without any issue return True + + # + # get methods for TestCase members, which needs to be publicly available + # + def get_output_file(self): + """Return content of self._output_file member + """ + return self._output_file + + def get_desc(self): + """Return content of self.desc member + """ + return self.desc + + def get_versions(self): + """Return content of self.versions member + """ + return self._versions + + def get_traffic(self): + """Return content of self._traffic member + """ + return self._traffic + + def get_tc_results(self): + """Return content of self._tc_results member + """ + return self._tc_results + + def get_collector(self): + """Return content of self._collector member + """ + return self._collector