e.g. --test-params "['x=z; y=(a,b)','x=z']"
"""
def __call__(self, parser, namespace, values, option_string=None):
-
if values[0] == '[':
input_list = ast.literal_eval(values)
parameter_list = []
def configure_logging(level):
"""Configure logging.
"""
+ name, ext = os.path.splitext(settings.getValue('LOG_FILE_DEFAULT'))
+ rename_default = "{name}_{uid}{ex}".format(name=name,
+ uid=settings.getValue(
+ 'LOG_TIMESTAMP'),
+ ex=ext)
log_file_default = os.path.join(
- settings.getValue('LOG_DIR'), settings.getValue('LOG_FILE_DEFAULT'))
+ settings.getValue('LOG_DIR'), rename_default)
+ name, ext = os.path.splitext(settings.getValue('LOG_FILE_HOST_CMDS'))
+ rename_hostcmd = "{name}_{uid}{ex}".format(name=name,
+ uid=settings.getValue(
+ 'LOG_TIMESTAMP'),
+ ex=ext)
log_file_host_cmds = os.path.join(
- settings.getValue('LOG_DIR'), settings.getValue('LOG_FILE_HOST_CMDS'))
+ settings.getValue('LOG_DIR'), rename_hostcmd)
+ name, ext = os.path.splitext(settings.getValue('LOG_FILE_TRAFFIC_GEN'))
+ rename_traffic = "{name}_{uid}{ex}".format(name=name,
+ uid=settings.getValue(
+ 'LOG_TIMESTAMP'),
+ ex=ext)
log_file_traffic_gen = os.path.join(
- settings.getValue('LOG_DIR'),
- settings.getValue('LOG_FILE_TRAFFIC_GEN'))
+ settings.getValue('LOG_DIR'), rename_traffic)
+ metrics_file = (settings.getValue('LOG_FILE_INFRA_METRICS_PFX') +
+ settings.getValue('LOG_TIMESTAMP') + '.log')
+ log_file_infra_metrics = os.path.join(settings.getValue('LOG_DIR'),
+ metrics_file)
_LOGGER.setLevel(logging.DEBUG)
file_logger = logging.FileHandler(filename=log_file_default)
file_logger.setLevel(logging.DEBUG)
+ file_logger.setFormatter(logging.Formatter(
+ '%(asctime)s : %(message)s'))
_LOGGER.addHandler(file_logger)
class CommandFilter(logging.Filter):
def filter(self, record):
return record.getMessage().startswith(trafficgen.CMD_PREFIX)
+ class CollectdMetricsFilter(logging.Filter):
+ """Filter out strings beginning with 'COLLECTD' :'"""
+ def filter(self, record):
+ return record.getMessage().startswith('COLLECTD')
+
cmd_logger = logging.FileHandler(filename=log_file_host_cmds)
cmd_logger.setLevel(logging.DEBUG)
cmd_logger.addFilter(CommandFilter())
gen_logger.addFilter(TrafficGenCommandFilter())
_LOGGER.addHandler(gen_logger)
+ if settings.getValue('COLLECTOR') == 'Collectd':
+ met_logger = logging.FileHandler(filename=log_file_infra_metrics)
+ met_logger.setLevel(logging.DEBUG)
+ met_logger.addFilter(CollectdMetricsFilter())
+ _LOGGER.addHandler(met_logger)
+
def apply_filter(tests, tc_filter):
"""Allow a subset of tests to be conveniently selected
""" Function will return a list of vSwitches detected in given ``rst_files``.
"""
vswitch_names = set()
- if len(rst_files):
+ if rst_files:
try:
output = subprocess.check_output(['grep', '-h', '^* vSwitch'] + rst_files).decode().splitlines()
for line in output:
if match:
vswitch_names.add(match.group(1))
- if len(vswitch_names):
+ if vswitch_names:
return list(vswitch_names)
except subprocess.CalledProcessError:
# check if there are any results in rst format
rst_results = glob.glob(os.path.join(path, 'result*rst'))
pkt_processors = get_vswitch_names(rst_results)
- if len(rst_results):
+ if rst_results:
try:
test_report = os.path.join(path, '{}_{}'.format('_'.join(pkt_processors), _TEMPLATE_RST['final']))
# create report caption directly - it is not worth to execute jinja machinery
sriov_nic.update({tmp_nic[0] : int(tmp_nic[1][2:])})
# sriov is required for some NICs
- if len(sriov_nic):
+ if sriov_nic:
for nic in sriov_nic:
# check if SRIOV is supported and enough virt interfaces are available
if not networkcard.is_sriov_supported(nic) \
if os.path.exists(results_path):
files_list = os.listdir(results_path)
if files_list == []:
- _LOGGER.info("Removing empty result directory: " + results_path)
+ _LOGGER.info("Removing empty result directory: %s", results_path)
shutil.rmtree(results_path)
except AttributeError:
# skip it if parameter doesn't exist
settings.load_from_dir(os.path.join(_CURR_DIR, 'conf'))
+ # Define the timestamp to be used by logs and results
+ date = datetime.datetime.fromtimestamp(time.time())
+ timestamp = date.strftime('%Y-%m-%d_%H-%M-%S')
+ settings.setValue('LOG_TIMESTAMP', timestamp)
+
# Load non performance/integration tests
if args['integration']:
settings.load_from_dir(os.path.join(_CURR_DIR, 'conf/integration'))
settings.setValue('WHITELIST_NICS', list(nic['pci'] for nic in nic_list))
# generate results directory name
- date = datetime.datetime.fromtimestamp(time.time())
- results_dir = "results_" + date.strftime('%Y-%m-%d_%H-%M-%S')
+ # date = datetime.datetime.fromtimestamp(time.time())
+ results_dir = "results_" + timestamp
results_path = os.path.join(settings.getValue('LOG_DIR'), results_dir)
settings.setValue('RESULTS_PATH', results_path)
# create results directory
if not os.path.exists(results_path):
- _LOGGER.info("Creating result directory: " + results_path)
+ _LOGGER.info("Creating result directory: %s", results_path)
os.makedirs(results_path)
# pylint: disable=too-many-nested-blocks
if settings.getValue('mode') == 'trafficgen':
# Default - run all tests
selected_tests = testcases
- if not len(selected_tests):
+ if not selected_tests:
_LOGGER.error("No tests matched --tests option or positional args. Done.")
vsperf_finalize()
sys.exit(1)
- # run tests
- # Add pylint exception: Redefinition of test type from
- # testcases.integration.IntegrationTestCase to testcases.performance.PerformanceTestCase
- # pylint: disable=redefined-variable-type
suite = unittest.TestSuite()
settings_snapshot = copy.deepcopy(settings.__dict__)