-# Copyright 2015-2016 Intel Corporation.
+# Copyright 2015-2018 Intel Corporation, Tieto and others.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
import time
import subprocess
+from datetime import datetime as dt
from conf import settings as S
-from conf import get_test_param
+from conf import merge_spec
import core.component_factory as component_factory
from core.loader import Loader
from core.results.results_constants import ResultsConstants
from tools import namespace
from tools import veth
from tools.teststepstools import TestStepsTools
-from tools.pkt_gen.trafficgen.trafficgenhelper import TRAFFIC_DEFAULTS
+from tools.llc_management import rmd
CHECK_PREFIX = 'validate_'
+# pylint: disable=too-many-instance-attributes
class TestCase(object):
"""TestCase base class
In this basic form runs RFC2544 throughput test
"""
- def __init__(self, cfg):
+ # pylint: disable=too-many-statements
+ def __init__(self, test_cfg):
"""Pull out fields from test config
- :param cfg: A dictionary of string-value pairs describing the test
+ :param test_cfg: A dictionary of string-value pairs describing the test
configuration. Both the key and values strings use well-known
values.
:param results_dir: Where the csv formatted results are written.
"""
+ # make a local copy of test configuration to avoid modification of
+ # original content used in vsperf main script
+ cfg = copy.deepcopy(test_cfg)
+
self._testcase_start_time = time.time()
+ self._testcase_stop_time = self._testcase_start_time
self._hugepages_mounted = False
self._traffic_ctl = None
self._vnf_ctl = None
self._loadgen = None
self._output_file = None
self._tc_results = None
- self._settings_original = {}
self._settings_paths_modified = False
self._testcast_run_time = None
+ self._versions = []
# initialization of step driven specific members
self._step_check = False # by default don't check result for step driven testcases
self._step_vnf_list = {}
self._step_result = []
+ self._step_result_mapping = {}
self._step_status = None
+ self._step_send_traffic = False # indication if send_traffic was called within test steps
+ self._vnf_list = []
self._testcase_run_time = None
- # store all GUEST_ specific settings to keep original values before their expansion
- for key in S.__dict__:
- if key.startswith('GUEST_'):
- self._settings_original[key] = S.getValue(key)
-
- self._update_settings('VSWITCH', cfg.get('vSwitch', S.getValue('VSWITCH')))
- self._update_settings('VNF', cfg.get('VNF', S.getValue('VNF')))
- self._update_settings('TRAFFICGEN', cfg.get('Trafficgen', S.getValue('TRAFFICGEN')))
+ S.setValue('VSWITCH', cfg.get('vSwitch', S.getValue('VSWITCH')))
+ S.setValue('VNF', cfg.get('VNF', S.getValue('VNF')))
+ S.setValue('TRAFFICGEN', cfg.get('Trafficgen', S.getValue('TRAFFICGEN')))
+ S.setValue('TUNNEL_TYPE', cfg.get('Tunnel Type', S.getValue('TUNNEL_TYPE')))
test_params = copy.deepcopy(S.getValue('TEST_PARAMS'))
tc_test_params = cfg.get('Parameters', S.getValue('TEST_PARAMS'))
- test_params.update(tc_test_params)
- self._update_settings('TEST_PARAMS', test_params)
+ test_params = merge_spec(test_params, tc_test_params)
+ S.setValue('TEST_PARAMS', test_params)
S.check_test_params()
# override all redefined GUEST_ values to have them expanded correctly
self.desc = cfg.get('Description', 'No description given.')
self.test = cfg.get('TestSteps', None)
- bidirectional = cfg.get('biDirectional', TRAFFIC_DEFAULTS['bidir'])
- bidirectional = get_test_param('bidirectional', bidirectional)
- if not isinstance(bidirectional, str):
+ # log testcase name and details
+ tmp_desc = functions.format_description(self.desc, 50)
+ self._logger.info('############################################################')
+ self._logger.info('# Test: %s', self.name)
+ self._logger.info('# Details: %s', tmp_desc[0])
+ for i in range(1, len(tmp_desc)):
+ self._logger.info('# %s', tmp_desc[i])
+ self._logger.info('############################################################')
+
+ bidirectional = S.getValue('TRAFFIC')['bidir']
+ if not isinstance(S.getValue('TRAFFIC')['bidir'], str):
raise TypeError(
- 'Bi-dir value must be of type string in testcase configuration')
+ 'Bi-dir value must be of type string')
bidirectional = bidirectional.title() # Keep things consistent
- traffic_type = cfg.get('Traffic Type', TRAFFIC_DEFAULTS['traffic_type'])
- traffic_type = get_test_param('traffic_type', traffic_type)
-
- framerate = cfg.get('iLoad', TRAFFIC_DEFAULTS['frame_rate'])
- framerate = get_test_param('iload', framerate)
-
self.deployment = cfg['Deployment']
self._frame_mod = cfg.get('Frame Modification', None)
- self._tunnel_type = None
- self._tunnel_operation = None
-
- if self.deployment == 'op2p':
- self._tunnel_operation = cfg['Tunnel Operation']
-
- if 'Tunnel Type' in cfg:
- self._tunnel_type = cfg['Tunnel Type']
- self._tunnel_type = get_test_param('tunnel_type',
- self._tunnel_type)
-
- # read configuration of streams; CLI parameter takes precedence to
- # testcase definition
- multistream = cfg.get('MultiStream', TRAFFIC_DEFAULTS['multistream'])
- multistream = get_test_param('multistream', multistream)
- stream_type = cfg.get('Stream Type', TRAFFIC_DEFAULTS['stream_type'])
- stream_type = get_test_param('stream_type', stream_type)
- pre_installed_flows = cfg.get('Pre-installed Flows', TRAFFIC_DEFAULTS['pre_installed_flows'])
- pre_installed_flows = get_test_param('pre-installed_flows', pre_installed_flows)
+ self._tunnel_operation = cfg.get('Tunnel Operation', None)
# check if test requires background load and which generator it uses
self._load_cfg = cfg.get('Load', None)
- if self._load_cfg and 'tool' in self._load_cfg:
- self._loadgen = self._load_cfg['tool']
- else:
- # background load is not requested, so use dummy implementation
- self._loadgen = "Dummy"
if self._frame_mod:
self._frame_mod = self._frame_mod.lower()
self._results_dir = S.getValue('RESULTS_PATH')
# set traffic details, so they can be passed to vswitch and traffic ctls
- self._traffic = copy.deepcopy(TRAFFIC_DEFAULTS)
- self._traffic.update({'traffic_type': traffic_type,
- 'flow_type': cfg.get('Flow Type', TRAFFIC_DEFAULTS['flow_type']),
- 'bidir': bidirectional,
- 'tunnel_type': self._tunnel_type,
- 'multistream': int(multistream),
- 'stream_type': stream_type,
- 'pre_installed_flows' : pre_installed_flows,
- 'frame_rate': int(framerate)})
+ self._traffic = copy.deepcopy(S.getValue('TRAFFIC'))
+ self._traffic.update({'bidir': bidirectional})
# Packet Forwarding mode
- self._vswitch_none = 'none' == S.getValue('VSWITCH').strip().lower()
+ self._vswitch_none = str(S.getValue('VSWITCH')).strip().lower() == 'none'
# trafficgen configuration required for tests of tunneling protocols
- if self.deployment == "op2p":
+ if self._tunnel_operation:
+ self._traffic.update({'tunnel_type': S.getValue('TUNNEL_TYPE')})
self._traffic['l2'].update({'srcmac':
S.getValue('TRAFFICGEN_PORT1_MAC'),
'dstmac':
S.getValue('TRAFFICGEN_PORT2_IP')})
if self._tunnel_operation == "decapsulation":
- self._traffic['l2'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L2')
- self._traffic['l3'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L3')
- self._traffic['l4'] = S.getValue(self._tunnel_type.upper() + '_FRAME_L4')
- elif len(S.getValue('NICS')) and \
+ self._traffic['l2'].update(S.getValue(S.getValue('TUNNEL_TYPE').upper() + '_FRAME_L2'))
+ self._traffic['l3'].update(S.getValue(S.getValue('TUNNEL_TYPE').upper() + '_FRAME_L3'))
+ self._traffic['l4'].update(S.getValue(S.getValue('TUNNEL_TYPE').upper() + '_FRAME_L4'))
+ self._traffic['l2']['dstmac'] = S.getValue('NICS')[1]['mac']
+ elif len(S.getValue('NICS')) >= 2 and \
(S.getValue('NICS')[0]['type'] == 'vf' or
S.getValue('NICS')[1]['type'] == 'vf'):
mac1 = S.getValue('NICS')[0]['mac']
else:
self._logger.debug("MAC addresses can not be read")
+ self._traffic = functions.check_traffic(self._traffic)
+
# count how many VNFs are involved in TestSteps
if self.test:
for step in self.test:
if step[0].startswith('vnf'):
self._step_vnf_list[step[0]] = None
+ # if llc allocation is required, initialize it.
+ if S.getValue('LLC_ALLOCATION'):
+ self._rmd = rmd.CacheAllocator()
+
def run_initialize(self):
""" Prepare test execution environment
"""
- self._logger.debug(self.name)
-
# mount hugepages if needed
self._mount_hugepages()
loader.get_vnf_class(),
len(self._step_vnf_list))
+ self._vnf_list = self._vnf_ctl.get_vnfs()
+
# verify enough hugepages are free to run the testcase
if not self._check_for_enough_hugepages():
raise RuntimeError('Not enough hugepages free to run test.')
loader.get_collector_class(),
self._results_dir, self.name)
self._loadgen = component_factory.create_loadgen(
- self._loadgen,
+ loader.get_loadgen_class(),
self._load_cfg)
- self._output_file = os.path.join(self._results_dir, "result_" + self.name +
- "_" + self.deployment + ".csv")
+ self._output_file = os.path.join(self._results_dir, "result_{}_{}_{}.csv".format(
+ str(S.getValue('_TEST_INDEX')), self.name, self.deployment))
self._step_status = {'status' : True, 'details' : ''}
+ # Perform LLC-allocations
+ if S.getValue('LLC_ALLOCATION'):
+ self._rmd.setup_llc_allocation()
+
self._logger.debug("Setup:")
def run_finalize(self):
# Stop all VNFs started by TestSteps in case that something went wrong
self.step_stop_vnfs()
+ # Cleanup any LLC-allocations
+ if S.getValue('LLC_ALLOCATION'):
+ self._rmd.cleanup_llc_allocation()
+
+ # Stop all processes executed by testcase
+ tasks.terminate_all_tasks(self._logger)
+
# umount hugepages if mounted
self._umount_hugepages()
- # restore original settings
- S.load_from_dict(self._settings_original)
-
# cleanup any namespaces created
if os.path.isdir('/tmp/namespaces'):
namespace_list = os.listdir('/tmp/namespaces')
- if len(namespace_list):
+ if namespace_list:
self._logger.info('Cleaning up namespaces')
for name in namespace_list:
namespace.delete_namespace(name)
# cleanup any veth ports created
if os.path.isdir('/tmp/veth'):
veth_list = os.listdir('/tmp/veth')
- if len(veth_list):
+ if veth_list:
self._logger.info('Cleaning up veth ports')
for eth in veth_list:
port1, port2 = eth.split('-')
self._logger.debug("Traffic Results:")
self._traffic_ctl.print_results()
+ if self._tc_results is None:
self._tc_results = self._append_results(results)
- TestCase.write_result_to_file(self._tc_results, self._output_file)
+ else:
+ # integration step driven tests have their status and possible
+ # failure details stored inside self._tc_results
+ results = self._append_results(results)
+ if len(self._tc_results) < len(results):
+ if len(self._tc_results) > 1:
+ raise RuntimeError('Testcase results do not match:'
+ 'results: %s\n'
+ 'trafficgen results: %s\n' %
+ self._tc_results,
+ results)
+ else:
+ tmp_results = copy.deepcopy(self._tc_results[0])
+ self._tc_results = []
+ for res in results:
+ tmp_res = copy.deepcopy(tmp_results)
+ tmp_res.update(res)
+ self._tc_results.append(tmp_res)
+ else:
+ for i, result in enumerate(results):
+ self._tc_results[i].update(result)
+
+ TestCase.write_result_to_file(self._tc_results, self._output_file)
def run(self):
"""Run the test
self.run_initialize()
try:
- with self._vswitch_ctl, self._loadgen:
- with self._vnf_ctl, self._collector:
+ with self._vswitch_ctl:
+ with self._vnf_ctl, self._collector, self._loadgen:
if not self._vswitch_none:
self._add_flows()
+ self._versions += self._vswitch_ctl.get_vswitch().get_version()
+
with self._traffic_ctl:
# execute test based on TestSteps definition if needed...
if self.step_run():
# ...and continue with traffic generation, but keep
# in mind, that clean deployment does not configure
# OVS nor executes the traffic
- if self.deployment != 'clean':
+ if self.deployment != 'clean' and not self._step_send_traffic:
self._traffic_ctl.send_traffic(self._traffic)
# dump vswitch flows before they are affected by VNF termination
# tear down test execution environment and log results
self.run_finalize()
+ self._testcase_stop_time = time.time()
self._testcase_run_time = time.strftime("%H:%M:%S",
- time.gmtime(time.time() -
+ time.gmtime(self._testcase_stop_time -
self._testcase_start_time))
- logging.info("Testcase execution time: " + self._testcase_run_time)
+ logging.info("Testcase execution time: %s", self._testcase_run_time)
# report test results
self.run_report()
- def _update_settings(self, param, value):
- """ Check value of given configuration parameter
- In case that new value is different, then testcase
- specific settings is updated and original value stored
-
- :param param: Name of parameter inside settings
- :param value: Disired parameter value
- """
- orig_value = S.getValue(param)
- if orig_value != value:
- self._settings_original[param] = copy.deepcopy(orig_value)
- S.setValue(param, value)
-
def _append_results(self, results):
"""
Method appends mandatory Test Case results to list of dictionaries.
for item in results:
item[ResultsConstants.ID] = self.name
item[ResultsConstants.DEPLOYMENT] = self.deployment
+ item[ResultsConstants.VSWITCH] = S.getValue('VSWITCH')
item[ResultsConstants.TRAFFIC_TYPE] = self._traffic['l3']['proto']
item[ResultsConstants.TEST_RUN_TIME] = self._testcase_run_time
+ # convert timestamps to human readable format
+ item[ResultsConstants.TEST_START_TIME] = dt.fromtimestamp(
+ self._testcase_start_time).strftime('%Y-%m-%d %H:%M:%S')
+ item[ResultsConstants.TEST_STOP_TIME] = dt.fromtimestamp(
+ self._testcase_stop_time).strftime('%Y-%m-%d %H:%M:%S')
if self._traffic['multistream']:
item[ResultsConstants.SCAL_STREAM_COUNT] = self._traffic['multistream']
item[ResultsConstants.SCAL_STREAM_TYPE] = self._traffic['stream_type']
item[ResultsConstants.SCAL_PRE_INSTALLED_FLOWS] = self._traffic['pre_installed_flows']
if self._vnf_ctl.get_vnfs_number():
item[ResultsConstants.GUEST_LOOPBACK] = ' '.join(S.getValue('GUEST_LOOPBACK'))
- if self._tunnel_type:
- item[ResultsConstants.TUNNEL_TYPE] = self._tunnel_type
+ if self._tunnel_operation:
+ item[ResultsConstants.TUNNEL_TYPE] = S.getValue('TUNNEL_TYPE')
return results
def _copy_fwd_tools_for_all_guests(self, vm_count):
def _mount_hugepages(self):
"""Mount hugepages if usage of DPDK or Qemu is detected
"""
+ # pylint: disable=too-many-boolean-expressions
# hugepages are needed by DPDK and Qemu
if not self._hugepages_mounted and \
(self.deployment.count('v') or \
- S.getValue('VSWITCH').lower().count('dpdk') or \
+ str(S.getValue('VSWITCH')).lower().count('dpdk') or \
self._vswitch_none or \
self.test and 'vnf' in [step[0][0:3] for step in self.test]):
hugepages.mount_hugepages()
# get hugepage amounts for each socket on dpdk
sock0_mem, sock1_mem = 0, 0
- if S.getValue('VSWITCH').lower().count('dpdk'):
- # the import below needs to remain here and not put into the module
- # imports because of an exception due to settings not yet loaded
- from vswitches import ovs_dpdk_vhost
- if ovs_dpdk_vhost.OvsDpdkVhost.old_dpdk_config():
- match = re.search(
- r'-socket-mem\s+(\d+),(\d+)',
- ''.join(S.getValue('VSWITCHD_DPDK_ARGS')))
- if match:
- sock0_mem, sock1_mem = (int(match.group(1)) * 1024 / hugepage_size,
- int(match.group(2)) * 1024 / hugepage_size)
- else:
- logging.info(
- 'Could not parse socket memory config in dpdk params.')
- else:
- sock0_mem, sock1_mem = (
- S.getValue(
- 'VSWITCHD_DPDK_CONFIG')['dpdk-socket-mem'].split(','))
- sock0_mem, sock1_mem = (int(sock0_mem) * 1024 / hugepage_size,
- int(sock1_mem) * 1024 / hugepage_size)
+
+ if str(S.getValue('VSWITCH')).lower().count('dpdk'):
+ sock_mem = S.getValue('DPDK_SOCKET_MEM')
+ sock0_mem, sock1_mem = (int(sock_mem[0]) * 1024 / hugepage_size,
+ int(sock_mem[1]) * 1024 / hugepage_size)
# If hugepages needed, verify the amounts are free
if any([hugepages_needed, sock0_mem, sock1_mem]):
else:
result3 = True
- logging.info('Need a total of {} total hugepages'.format(
- hugepages_needed + sock1_mem + sock0_mem))
+ logging.info('Need a total of %s total hugepages',
+ hugepages_needed + sock1_mem + sock0_mem)
# The only drawback here is sometimes dpdk doesn't release
# its hugepages on a test failure. This could cause a test
"""
with open(output, 'a') as csvfile:
- logging.info("Write results to file: " + output)
+ logging.info("Write results to file: %s", output)
fieldnames = TestCase._get_unique_keys(results)
writer = csv.DictWriter(csvfile, fieldnames)
"""Add flows to the vswitch
"""
vswitch = self._vswitch_ctl.get_vswitch()
- # TODO BOM 15-08-07 the frame mod code assumes that the
+ # NOTE BOM 15-08-07 the frame mod code assumes that the
# physical ports are ports 1 & 2. The actual numbers
# need to be retrived from the vSwitch and the metadata value
# updated accordingly.
'goto_table:3']}
vswitch.add_flow(bridge, flow)
elif self._frame_mod == "ip_port":
- # TODO BOM 15-08-27 The traffic generated is assumed
+ # NOTE BOM 15-08-27 The traffic generated is assumed
# to be UDP (nw_proto 17d) which is the default case but
# we will need to pick up the actual traffic params in use.
flow = {'table':'2', 'priority':'1000', 'metadata':'2',
""" Stop all VNFs started by TestSteps
"""
for vnf in self._step_vnf_list:
- self._step_vnf_list[vnf].stop()
+ if self._step_vnf_list[vnf]:
+ self._step_vnf_list[vnf].stop()
- @staticmethod
- def step_eval_param(param, STEP):
- # pylint: disable=invalid-name
+ def step_eval_param(self, param, step_result):
""" Helper function for #STEP macro evaluation
"""
if isinstance(param, str):
# evaluate every #STEP reference inside parameter itself
- macros = re.findall(r'#STEP\[[\w\[\]\-\'\"]+\]', param)
+ macros = re.findall(r'(#STEP\[([\w\-:]+)\]((\[[\w\-\'\"]+\])*))', param)
+
if macros:
for macro in macros:
+ if macro[1] in self._step_result_mapping:
+ key = self._step_result_mapping[macro[1]]
+ else:
+ key = macro[1]
# pylint: disable=eval-used
- tmp_val = str(eval(macro[1:]))
- param = param.replace(macro, tmp_val)
+ tmp_val = str(eval('step_result[{}]{}'.format(key, macro[2])))
+ param = param.replace(macro[0], tmp_val)
+
+ # evaluate references to vsperf configuration options
+ macros = re.findall(r'\$(([\w\-]+)(\[[\w\[\]\-\'\"]+\])*)', param)
+ if macros:
+ for macro in macros:
+ # pylint: disable=eval-used
+ try:
+ tmp_val = str(eval("S.getValue('{}'){}".format(macro[1], macro[2])))
+ param = param.replace('${}'.format(macro[0]), tmp_val)
+ # ignore that required option can't be evaluated
+ except (IndexError, KeyError, AttributeError):
+ self._logger.debug("Skipping %s as it isn't a configuration "
+ "parameter.", '${}'.format(macro[0]))
return param
- elif isinstance(param, list) or isinstance(param, tuple):
+ elif isinstance(param, (list, tuple)):
tmp_list = []
for item in param:
- tmp_list.append(TestCase.step_eval_param(item, STEP))
+ tmp_list.append(self.step_eval_param(item, step_result))
return tmp_list
elif isinstance(param, dict):
tmp_dict = {}
for (key, value) in param.items():
- tmp_dict[key] = TestCase.step_eval_param(value, STEP)
+ tmp_dict[key] = self.step_eval_param(value, step_result)
return tmp_dict
else:
return param
- @staticmethod
- def step_eval_params(params, step_result):
+ def step_eval_params(self, params, step_result):
""" Evaluates referrences to results from previous steps
"""
eval_params = []
# evaluate all parameters if needed
for param in params:
- eval_params.append(TestCase.step_eval_param(param, step_result))
+ eval_params.append(self.step_eval_param(param, step_result))
return eval_params
+ # pylint: disable=too-many-locals, too-many-branches, too-many-statements
def step_run(self):
""" Execute actions specified by TestSteps list
# run test step by step...
for i, step in enumerate(self.test):
step_ok = not self._step_check
+ step_check = self._step_check
+ regex = None
+ # configure step result mapping if step alias/label is detected
+ if step[0].startswith('#'):
+ key = step[0][1:]
+ if key.isdigit():
+ raise RuntimeError('Step alias can\'t be an integer value {}'.format(key))
+ if key in self._step_result_mapping:
+ raise RuntimeError('Step alias {} has been used already for step '
+ '{}'.format(key, self._step_result_mapping[key]))
+ self._step_result_mapping[step[0][1:]] = i
+ step = step[1:]
+
+ # store regex filter if it is specified
+ if isinstance(step[-1], str) and step[-1].startswith('|'):
+ # evalute macros and variables used in regex
+ regex = self.step_eval_params([step[-1][1:]], self._step_result[:i])[0]
+ step = step[:-1]
+
+ # check if step verification should be suppressed
+ if step[0].startswith('!'):
+ step_check = False
+ step_ok = True
+ step[0] = step[0][1:]
if step[0] == 'vswitch':
test_object = self._vswitch_ctl.get_vswitch()
elif step[0] == 'namespace':
tmp_traffic = copy.deepcopy(self._traffic)
tmp_traffic.update(step[2])
step[2] = tmp_traffic
+ # store indication that traffic has been sent
+ # so it is not sent again after the execution of teststeps
+ self._step_send_traffic = True
elif step[0].startswith('vnf'):
+ # use vnf started within TestSteps
if not self._step_vnf_list[step[0]]:
# initialize new VM
self._step_vnf_list[step[0]] = loader.get_vnf_class()()
test_object = self._step_vnf_list[step[0]]
+ elif step[0].startswith('VNF'):
+ if step[1] in ('start', 'stop'):
+ raise RuntimeError("Cannot execute start() or stop() method of "
+ "VNF deployed automatically by scenario.")
+ # use vnf started by scenario deployment (e.g. pvp)
+ vnf_index = int(step[0][3:])
+ try:
+ test_object = self._vnf_list[vnf_index]
+ except IndexError:
+ raise RuntimeError("VNF with index {} is not running.".format(vnf_index))
elif step[0] == 'wait':
input(os.linesep + "Step {}: Press Enter to continue with "
"the next step...".format(i) + os.linesep + os.linesep)
continue
+ elif step[0] == 'sleep':
+ self._logger.debug("Sleep %s seconds", step[1])
+ time.sleep(int(step[1]))
+ continue
+ elif step[0] == 'log':
+ test_object = self._logger
+ # there isn't a need for validation of log entry
+ step_check = False
+ step_ok = True
+ elif step[0] == 'pdb':
+ import pdb
+ pdb.set_trace()
+ continue
else:
self._logger.error("Unsupported test object %s", step[0])
self._step_status = {'status' : False, 'details' : ' '.join(step)}
return False
test_method = getattr(test_object, step[1])
- if self._step_check:
+ if step_check:
test_method_check = getattr(test_object, CHECK_PREFIX + step[1])
else:
test_method_check = None
try:
# eval parameters, but use only valid step_results
# to support negative indexes
- step_params = TestCase.step_eval_params(step[2:], self._step_result[:i])
+ step_params = self.step_eval_params(step[2:], self._step_result[:i])
step_log = '{} {}'.format(' '.join(step[:2]), step_params)
+ step_log += ' filter "{}"'.format(regex) if regex else ''
self._logger.debug("Step %s '%s' start", i, step_log)
self._step_result[i] = test_method(*step_params)
+ if regex:
+ # apply regex to step output
+ self._step_result[i] = functions.filter_output(
+ self._step_result[i], regex)
+
self._logger.debug("Step %s '%s' results '%s'", i,
step_log, self._step_result[i])
- time.sleep(5)
- if self._step_check:
+ time.sleep(S.getValue('TEST_STEP_DELAY'))
+ if step_check:
step_ok = test_method_check(self._step_result[i], *step_params)
except (AssertionError, AttributeError, IndexError) as ex:
step_ok = False
self._logger.error("Step %s raised %s", i, type(ex).__name__)
- if self._step_check:
+ if step_check:
self.step_report_status("Step {} - '{}'".format(i, step_log), step_ok)
if not step_ok:
# all steps processed without any issue
return True
+
+ #
+ # get methods for TestCase members, which needs to be publicly available
+ #
+ def get_output_file(self):
+ """Return content of self._output_file member
+ """
+ return self._output_file
+
+ def get_desc(self):
+ """Return content of self.desc member
+ """
+ return self.desc
+
+ def get_versions(self):
+ """Return content of self.versions member
+ """
+ return self._versions
+
+ def get_traffic(self):
+ """Return content of self._traffic member
+ """
+ return self._traffic
+
+ def get_tc_results(self):
+ """Return content of self._tc_results member
+ """
+ return self._tc_results
+
+ def get_collector(self):
+ """Return content of self._collector member
+ """
+ return self._collector