except AttributeError:
pass
return param
- elif isinstance(param, list) or isinstance(param, tuple):
+ elif isinstance(param, (list, tuple)):
tmp_list = []
for item in param:
tmp_list.append(self._eval_param(item))
if key not in self.__dict__ and key not in _EXTRA_TEST_PARAMS:
unknown_keys.append(key)
- if len(unknown_keys):
+ if unknown_keys:
raise RuntimeError('Test parameters contain unknown configuration '
'parameter(s): {}'.format(', '.join(unknown_keys)))
for vmindex in range(vm_number):
value = master_value_str.replace('#VMINDEX', str(vmindex))
for macro, args, param, _, step in re.findall(_PARSE_PATTERN, value):
- multi = int(step) if len(step) and int(step) else 1
+ multi = int(step) if step and int(step) else 1
if macro == '#EVAL':
# pylint: disable=eval-used
tmp_result = str(eval(param))
assert result == self.getValue(attr)
return True
- def validate_setValue(self, dummy_result, name, value):
+ def validate_setValue(self, _dummy_result, name, value):
"""Verifies, that value was correctly set
"""
assert value == self.__dict__[name]
return True
- def validate_resetValue(self, dummy_result, attr):
+ def validate_resetValue(self, _dummy_result, attr):
"""Verifies, that value was correctly reset
"""
return 'TEST_PARAMS' not in self.__dict__ or \
:param loadgen_cfg: Configuration for the loadgen
:return: A new ILoadGenerator class
"""
- # pylint: disable=too-many-function-args
return loadgen_class(loadgen_cfg)
def create_pktfwd(deployment, pktfwd_class):
if class_name in results:
logging.info(
- "Class found: " + class_name + ".")
+ "Class found: %s.", class_name)
return results.get(class_name)
return None
mod = imp.load_module(
modname, *imp.find_module(modname, [root]))
except ImportError:
- logging.error('Could not import file ' + filename)
+ logging.error('Could not import file %s', filename)
raise
mods.append((modname, mod))
self._pktfwd_class = pktfwd_class
self._pktfwd = pktfwd_class(guest=True if deployment == "pvp" and
settings.getValue('VNF') != "QemuPciPassthrough" else False)
- self._logger.debug('Creation using ' + str(self._pktfwd_class))
+ self._logger.debug('Creation using %s', str(self._pktfwd_class))
def setup(self):
"""Sets up the packet forwarder for p2p.
"""
- self._logger.debug('Setup using ' + str(self._pktfwd_class))
+ self._logger.debug('Setup using %s', str(self._pktfwd_class))
try:
self._pktfwd.start()
def setup_for_guest(self):
"""Sets up the packet forwarder for pvp.
"""
- self._logger.debug('Setup using ' + str(self._pktfwd_class))
+ self._logger.debug('Setup using %s', str(self._pktfwd_class))
try:
self._pktfwd.start_for_guest()
def stop(self):
"""Tears down the packet forwarder created in setup().
"""
- self._logger.debug('Stop using ' + str(self._pktfwd_class))
+ self._logger.debug('Stop using %s', str(self._pktfwd_class))
self._pktfwd.stop()
def __enter__(self):
:param traffic: A dictionary describing the traffic to send.
"""
- self._logger.debug('send_traffic with ' +
+ self._logger.debug('send_traffic with %s',
str(self._traffic_gen_class))
self.configure(traffic)
If this function requires more than one argument, all should be
should be passed using the args list and appropriately handled.
"""
- self._logger.debug('send_traffic_async with ' +
+ # pylint: disable=unused-argument
+ self._logger.debug('send_traffic_async with %s',
str(self._traffic_gen_class))
self.configure(traffic)
"""
counter = 0
for item in self._results:
- logging.info("Record: " + str(counter))
+ logging.info("Record: %s", str(counter))
counter += 1
for(key, value) in list(item.items()):
logging.info(" Key: " + str(key) +
"""
return self._results
- def validate_send_traffic(self, dummy_result, dummy_traffic):
+ def validate_send_traffic(self, _dummy_result, _dummy_traffic):
"""Verify that send traffic has succeeded
"""
if self._results:
tests=self._tests,
duration=self._duration)
self._traffic_started = True
- if len(function['args']) > 0:
+ if function['args']:
function['function'](function['args'])
else:
function['function']()
trials=self._trials,
duration=self._duration)
self._traffic_started = True
- if len(function['args']) > 0:
+ if function['args']:
function['function'](function['args'])
else:
function['function']()
def get_vnfs_number(self):
"""Returns a number of vnfs controlled by this controller.
"""
- self._logger.debug('get_vnfs_number ' + str(len(self._vnfs)) +
- ' VNF[s]')
+ self._logger.debug('get_vnfs_number %s VNF[s]', str(len(self._vnfs)))
return len(self._vnfs)
def start(self):
self._vswitch_class = vswitch_class
self._vswitch = vswitch_class()
self._deployment_scenario = "Clean"
- self._logger.debug('Creation using ' + str(self._vswitch_class))
+ self._logger.debug('Creation using %s', str(self._vswitch_class))
self._traffic = traffic.copy()
def setup(self):
"""Sets up the switch for Clean.
"""
- self._logger.debug('Setup using ' + str(self._vswitch_class))
+ self._logger.debug('Setup using %s', str(self._vswitch_class))
try:
self._vswitch.start()
def stop(self):
"""Tears down the switch created in setup().
"""
- self._logger.debug('Stop using ' + str(self._vswitch_class))
+ self._logger.debug('Stop using %s', str(self._vswitch_class))
self._vswitch.stop()
def __enter__(self):
self._deployment_scenario = "OP2P"
self._traffic = traffic.copy()
self._tunnel_operation = tunnel_operation
- self._logger.debug('Creation using ' + str(self._vswitch_class))
+ self._logger.debug('Creation using %s', str(self._vswitch_class))
def setup(self):
""" Sets up the switch for overlay P2P (tunnel encap or decap)
"""
- self._logger.debug('Setting up ' + str(self._tunnel_operation))
+ self._logger.debug('Setting up %s', str(self._tunnel_operation))
if self._tunnel_operation == "encapsulation":
self._setup_encap()
else:
Create 2 bridges br0 (integration bridge) and br-ext and a VXLAN port
for encapsulation.
"""
- self._logger.debug('Setup using ' + str(self._vswitch_class))
+ self._logger.debug('Setup using %s', str(self._vswitch_class))
try:
self._vswitch.start()
def _setup_decap(self):
""" Sets up the switch for overlay P2P decapsulation test
"""
- self._logger.debug('Setup using ' + str(self._vswitch_class))
+ self._logger.debug('Setup using %s', str(self._vswitch_class))
try:
self._vswitch.start()
def _setup_decap_vanilla(self):
""" Sets up the switch for overlay P2P decapsulation test
"""
- self._logger.debug('Setup decap vanilla ' + str(self._vswitch_class))
+ self._logger.debug('Setup decap vanilla %s', str(self._vswitch_class))
try:
self._vswitch.start()
def stop(self):
"""Tears down the switch created in setup().
"""
- self._logger.debug('Stop using ' + str(self._vswitch_class))
+ self._logger.debug('Stop using %s', str(self._vswitch_class))
self._vswitch.stop()
def __enter__(self):
self._vswitch_class = vswitch_class
self._vswitch = vswitch_class()
self._deployment_scenario = "P2P"
- self._logger.debug('Creation using ' + str(self._vswitch_class))
+ self._logger.debug('Creation using %s', str(self._vswitch_class))
self._traffic = traffic.copy()
def setup(self):
"""Sets up the switch for p2p.
"""
- self._logger.debug('Setup using ' + str(self._vswitch_class))
+ self._logger.debug('Setup using %s', str(self._vswitch_class))
try:
self._vswitch.start()
def stop(self):
"""Tears down the switch created in setup().
"""
- self._logger.debug('Stop using ' + str(self._vswitch_class))
+ self._logger.debug('Stop using %s', str(self._vswitch_class))
self._vswitch.stop()
def __enter__(self):
def get_ports_info(self):
"""See IVswitchController for description
"""
- self._logger.debug('get_ports_info using ' + str(self._vswitch_class))
+ self._logger.debug('get_ports_info using %s', str(self._vswitch_class))
return self._vswitch.get_ports(settings.getValue('VSWITCH_BRIDGE_NAME'))
def dump_vswitch_flows(self):
self.br_mod_ip1 = settings.getValue('TUNNEL_MODIFY_BRIDGE_IP1')
self.br_mod_ip2 = settings.getValue('TUNNEL_MODIFY_BRIDGE_IP2')
self.tunnel_type = settings.getValue('TUNNEL_TYPE')
- self._logger.debug('Creation using ' + str(self._vswitch_class))
+ self._logger.debug('Creation using %s', str(self._vswitch_class))
def setup(self):
""" Sets up the switch for VxLAN overlay PTUNP (tunnel encap or decap)
"""
self._logger.debug('Setting up phy-tun-phy tunneling scenario')
- if self.tunnel_type is 'vxlan':
+ if self.tunnel_type == 'vxlan':
self._setup_vxlan_encap_decap()
else:
self._logger.error("Only VxLAN is supported for now")
physical ports. Two more bridges br-mod1 and br-mod2 to mangle
and redirect the packets from one tunnel port to other.
"""
- self._logger.debug('Setup using ' + str(self._vswitch_class))
+ self._logger.debug('Setup using %s', str(self._vswitch_class))
try:
self._vswitch.start()
self._vswitch.add_switch(self.bridge_phy1)
def stop(self):
"""Tears down the switch created in setup().
"""
- self._logger.debug('Stop using ' + str(self._vswitch_class))
+ self._logger.debug('Stop using %s', str(self._vswitch_class))
self._vswitch.stop()
def __enter__(self):
def get_ports_info(self):
"""See IVswitchController for description
"""
- self._logger.debug('get_ports_info using ' + str(self._vswitch_class))
+ self._logger.debug('get_ports_info using %s', str(self._vswitch_class))
ports = self._vswitch.get_ports(self.bridge_phy1) +\
self._vswitch.get_ports(self.bridge_mod1) +\
self._vswitch.get_ports(self.bridge_phy2) +\
def dump_vswitch_flows(self):
"""See IVswitchController for description
"""
- self._logger.debug('dump_flows using ' + str(self._vswitch_class))
+ self._logger.debug('dump_flows using %s', str(self._vswitch_class))
self._vswitch.dump_flows(self.bridge_phy1)
self._vswitch.dump_flows(self.bridge_mod1)
self._vswitch.dump_flows(self.bridge_phy2)
self._traffic = traffic.copy()
self._bidir = True if self._traffic['bidir'] == 'True' else False
- self._logger.debug('Creation using ' + str(self._vswitch_class))
+ self._logger.debug('Creation using %s', str(self._vswitch_class))
self._bridge = settings.getValue('VSWITCH_BRIDGE_NAME')
def setup(self):
""" Sets up the switch for PXP
"""
- self._logger.debug('Setup using ' + str(self._vswitch_class))
+ self._logger.debug('Setup using %s', str(self._vswitch_class))
try:
self._vswitch.start()
def stop(self):
"""Tears down the switch created in setup().
"""
- self._logger.debug('Stop using ' + str(self._vswitch_class))
+ self._logger.debug('Stop using %s', str(self._vswitch_class))
self._vswitch.stop()
def _add_flow(self, flow, port1, port2, reverse_flow=False):
def get_ports_info(self):
"""See IVswitchController for description
"""
- self._logger.debug('get_ports_info using ' + str(self._vswitch_class))
+ self._logger.debug('get_ports_info using %s', str(self._vswitch_class))
return self._vswitch.get_ports(self._bridge)
def dump_vswitch_flows(self):
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
-disable=E1602,E1603,E1601,E1606,E1607,E1604,E1605,E1608,W0401,W1604,W1605,W1606,W1607,W1601,W1602,W1603,W1622,W1623,W1620,W1621,W1608,W1609,W1624,W1625,W1618,W1626,W1627,I0021,I0020,W0704,R0903,W1613,W1638,W1611,W1610,W1617,W1616,W1615,W1614,W1630,W1619,W1632,W1635,W1634,W1637,W1636,W1639,W1612,W1628,W1633,W1629,I0011,W1640
+disable=E1602,E1603,E1601,E1606,E1607,E1604,E1605,E1608,W0401,W1604,W1605,W1606,W1607,W1601,W1602,W1603,W1622,W1623,W1620,W1621,W1608,W1609,W1624,W1625,W1618,W1626,W1627,I0021,I0020,W0704,R0903,W1613,W1638,W1611,W1610,W1617,W1616,W1615,W1614,W1630,W1619,W1632,W1635,W1634,W1637,W1636,W1639,W1612,W1628,W1633,W1629,I0011,W1640,R1705
[REPORTS]
requests==2.8.1
netaddr==0.7.18
scapy-python3==0.18
-pylint==1.5.6
+pylint==1.8.2
pyzmq==14.5.0
distro
stcrestclient
keep Python package structure intact without extra requirements for
PYTHONPATH.
"""
-
def _bind_nics():
"""Bind NICs using the bind tool specified in the configuration.
"""
- if not len(_NICS_PCI):
+ if not _NICS_PCI:
_LOGGER.info('NICs are not configured - nothing to bind')
return
try:
def _unbind_nics():
"""Unbind NICs using the bind tool specified in the configuration.
"""
- if not len(_NICS_PCI):
+ if not _NICS_PCI:
_LOGGER.info('NICs are not configured - nothing to unbind')
return
try:
:return: None
"""
- self.logger.debug('delete datapath ' + dp_name)
+ self.logger.debug('delete datapath %s', dp_name)
self.run_dpctl(['del-dp', dp_name])
for rule in flow_src_list:
if rule in flow_dump_list:
flow_src_ctrl.remove(rule)
- return True if not len(flow_src_ctrl) else False
+ return True if not flow_src_ctrl else False
import logging
from collections import OrderedDict
-from testcases import TestCase
+from testcases.testcase import TestCase
class IntegrationTestCase(TestCase):
"""IntegrationTestCase class
import logging
-from testcases import TestCase
+from testcases.testcase import TestCase
from tools.report import report
class PerformanceTestCase(TestCase):
# cleanup any namespaces created
if os.path.isdir('/tmp/namespaces'):
namespace_list = os.listdir('/tmp/namespaces')
- if len(namespace_list):
+ if namespace_list:
self._logger.info('Cleaning up namespaces')
for name in namespace_list:
namespace.delete_namespace(name)
# cleanup any veth ports created
if os.path.isdir('/tmp/veth'):
veth_list = os.listdir('/tmp/veth')
- if len(veth_list):
+ if veth_list:
self._logger.info('Cleaning up veth ports')
for eth in veth_list:
port1, port2 = eth.split('-')
if len(self._tc_results) < len(results):
if len(self._tc_results) > 1:
raise RuntimeError('Testcase results do not match:'
- 'results: {}\n'
- 'trafficgen results: {}\n',
+ 'results: %s\n'
+ 'trafficgen results: %s\n' %
self._tc_results,
results)
else:
self._testcase_run_time = time.strftime("%H:%M:%S",
time.gmtime(self._testcase_stop_time -
self._testcase_start_time))
- logging.info("Testcase execution time: " + self._testcase_run_time)
+ logging.info("Testcase execution time: %s", self._testcase_run_time)
# report test results
self.run_report()
"""
with open(output, 'a') as csvfile:
- logging.info("Write results to file: " + output)
+ logging.info("Write results to file: %s", output)
fieldnames = TestCase._get_unique_keys(results)
writer = csv.DictWriter(csvfile, fieldnames)
self._logger.debug("Skipping %s as it isn't a configuration "
"parameter.", '${}'.format(macro[0]))
return param
- elif isinstance(param, list) or isinstance(param, tuple):
+ elif isinstance(param, (list, tuple)):
tmp_list = []
for item in param:
tmp_list.append(self.step_eval_param(item, step_result))
# initialize list with results
self._step_result = [None] * len(self.test)
- # We have to suppress pylint report, because test_object has to be set according
- # to the test step definition
- # pylint: disable=redefined-variable-type
# run test step by step...
for i, step in enumerate(self.test):
step_ok = not self._step_check
for label in YLABELS:
if any(r in sample for r in YLABELS[label]):
return label
-
+ return None
def plot_graphs(dict_of_arrays):
"""
plot_graphs(self.results)
proc_stats = get_results_to_print(self.results)
for process in proc_stats:
- logging.info("Process: " + '_'.join(process.split('_')[:-1]))
+ logging.info("Process: %s", '_'.join(process.split('_')[:-1]))
for(key, value) in proc_stats[process].items():
logging.info(" Statistic: " + str(key) +
", Value: " + str(value))
return self.parse_signed(part_len, data)
if sec_level == 2:
return self.parse_encrypted(part_len, data)
+ return None
def parse_signed(self, part_len, data):
"""
try:
name_parts = handler(sample)
if name_parts is None:
- return # treat None as "ignore sample"
+ return None # treat None as "ignore sample"
name = '.'.join(name_parts)
except (AttributeError, IndexError, MemoryError, RuntimeError):
LOG.exception("Exception in sample handler %s (%s):",
sample["plugin"], handler)
- return
+ return None
host = sample.get("host", "")
return (
host,
Check the value range
"""
if val is None:
- return
+ return None
try:
vmin, vmax = self.parser.types.type_ranges[stype][vname]
except KeyError:
if vmin is not None and val < vmin:
LOG.debug("Invalid value %s (<%s) for %s", val, vmin, vname)
LOG.debug("Last sample: %s", self.last_sample)
- return
+ return None
if vmax is not None and val > vmax:
LOG.debug("Invalid value %s (>%s) for %s", val, vmax, vname)
LOG.debug("Last sample: %s", self.last_sample)
- return
+ return None
return val
def calculate(self, host, name, vtype, val, time):
if vtype not in handlers:
LOG.error("Invalid value type %s for %s", vtype, name)
LOG.info("Last sample: %s", self.last_sample)
- return
+ return None
return handlers[vtype](host, name, val, time)
def _calc_counter(self, host, name, val, time):
key = (host, name)
if key not in self.prev_samples:
self.prev_samples[key] = (val, time)
- return
+ return None
pval, ptime = self.prev_samples[key]
self.prev_samples[key] = (val, time)
if time <= ptime:
LOG.error("Invalid COUNTER update for: %s:%s", key[0], key[1])
LOG.info("Last sample: %s", self.last_sample)
- return
+ return None
if val < pval:
# this is supposed to handle counter wrap around
# see https://collectd.org/wiki/index.php/Data_source
key = (host, name)
if key not in self.prev_samples:
self.prev_samples[key] = (val, time)
- return
+ return None
pval, ptime = self.prev_samples[key]
self.prev_samples[key] = (val, time)
if time <= ptime:
LOG.debug("Invalid DERIVE update for: %s:%s", key[0], key[1])
LOG.debug("Last sample: %s", self.last_sample)
- return
+ return None
return float(abs(val - pval)) / (time - ptime)
def _calc_absolute(self, host, name, val, time):
key = (host, name)
if key not in self.prev_samples:
self.prev_samples[key] = (val, time)
- return
+ return None
_, ptime = self.prev_samples[key]
self.prev_samples[key] = (val, time)
if time <= ptime:
LOG.error("Invalid ABSOLUTE update for: %s:%s", key[0], key[1])
LOG.info("Last sample: %s", self.last_sample)
- return
+ return None
return float(val) / (time - ptime)
into the file in directory with test results
"""
monitor = settings.getValue('PIDSTAT_MONITOR')
- self._logger.info('Statistics are requested for: ' + ', '.join(monitor))
+ self._logger.info('Statistics are requested for: %s', ', '.join(monitor))
pids = systeminfo.get_pids(monitor)
if pids:
with open(self._log, 'w') as logfile:
"""Logs collected statistics.
"""
for process in self._results:
- logging.info("Process: " + '_'.join(process.split('_')[:-1]))
+ logging.info("Process: %s", '_'.join(process.split('_')[:-1]))
for(key, value) in self._results[process].items():
logging.info(" Statistic: " + str(key) +
", Value: " + str(value))
# expand OS wildcards in paths if needed
if glob.has_magic(tmp_tool):
tmp_glob = glob.glob(tmp_tool)
- if len(tmp_glob) == 0:
+ if not tmp_glob:
raise RuntimeError('Path to the {} is not valid: {}.'.format(tool, tmp_tool))
elif len(tmp_glob) > 1:
raise RuntimeError('Path to the {} is ambiguous {}'.format(tool, tmp_glob))
'name': 'stress-ng'
}
_logger = logging.getLogger(__name__)
-
- def __init__(self, stress_config):
- super(StressNg, self).__init__(stress_config)
try:
os.makedirs(self._shared_dir)
except OSError as exp:
- raise OSError("Failed to create shared directory %s: %s",
+ raise OSError("Failed to create shared directory %s: %s" %
self._shared_dir, exp)
self.nics_nr = S.getValue('NN_NICS_NR')[self._number]
"""
Wrapper Class for Load-Generation through stressor-vm
"""
- # pylint: disable=unused-argument
- def __init__(self, config):
+ def __init__(self, _config):
self.qvm_list = []
for vmindex in range(int(S.getValue('NN_COUNT'))):
qvm = QemuVM(vmindex)
self._logger.info('Unable to get list of dependecies for module \'%s\'.', module)
# ...and try to continue, just for case that dependecies are already loaded
- if len(deps):
+ if deps:
return deps.split(',')
else:
return []
port, name), False)
-# pylint: disable=unused-argument
# pylint: disable=invalid-name
-def validate_add_ip_to_namespace_eth(result, port, name, ip_addr, cidr):
+def validate_add_ip_to_namespace_eth(_result, port, name, ip_addr, cidr):
"""
Validation function for integration testcases
"""
_LOGGER, 'Validating ip address in namespace...', False))
-def validate_assign_port_to_namespace(result, port, name, port_up=False):
+def validate_assign_port_to_namespace(_result, port, name, _port_up=False):
"""
Validation function for integration testcases
"""
_LOGGER, 'Validating port in namespace...'))
-def validate_create_namespace(result, name):
+def validate_create_namespace(_result, name):
"""
Validation function for integration testcases
"""
return name in get_system_namespace_list()
-def validate_delete_namespace(result, name):
+def validate_delete_namespace(_result, name):
"""
Validation function for integration testcases
"""
"""
mac_path = glob.glob(os.path.join(_PCI_DIR, _PCI_NET, '*', 'address').format(pci_handle))
# kernel driver is loaded and MAC can be read
- if len(mac_path) and os.path.isfile(mac_path[0]):
+ if mac_path and os.path.isfile(mac_path[0]):
with open(mac_path[0], 'r') as _file:
return _file.readline().rstrip('\n')
print(dev.send_cont_traffic(traffic=TRAFFIC))
print(dev.send_rfc2544_throughput(traffic=TRAFFIC))
print(dev.send_rfc2544_back2back(traffic=TRAFFIC))
+ # pylint: disable=no-member
print(dev.send_rfc(traffic=TRAFFIC))
return NotImplementedError(
'Ixia start back2back traffic not implemented')
- def send_rfc2544_back2back(self, traffic=None, duration=60,
- lossrate=0.0, tests=1):
+ def send_rfc2544_back2back(self, traffic=None, tests=1, duration=60,
+ lossrate=0.0):
return NotImplementedError(
'Ixia send back2back traffic not implemented')
next(reader)
for row in reader:
#Replace null entries added by Ixia with 0s.
- row = [entry if len(entry) > 0 else '0' for entry in row]
+ row = [entry if entry else '0' for entry in row]
# tx_fps and tx_mps cannot be reliably calculated
# as the DUT may be modifying the frame size
:param one_shot: No RFC 2544 binary search,
just packet flow at traffic specifics
"""
- logging.debug("traffic['frame_rate'] = " + \
+ logging.debug("traffic['frame_rate'] = %s", \
str(traffic['frame_rate']))
- logging.debug("traffic['multistream'] = " + \
+ logging.debug("traffic['multistream'] = %s", \
str(traffic['multistream']))
- logging.debug("traffic['stream_type'] = " + \
+ logging.debug("traffic['stream_type'] = %s", \
str(traffic['stream_type']))
- logging.debug("traffic['l2']['srcmac'] = " + \
+ logging.debug("traffic['l2']['srcmac'] = %s", \
str(traffic['l2']['srcmac']))
- logging.debug("traffic['l2']['dstmac'] = " + \
+ logging.debug("traffic['l2']['dstmac'] = %s", \
str(traffic['l2']['dstmac']))
- logging.debug("traffic['l3']['proto'] = " + \
+ logging.debug("traffic['l3']['proto'] = %s", \
str(traffic['l3']['proto']))
- logging.debug("traffic['l3']['srcip'] = " + \
+ logging.debug("traffic['l3']['srcip'] = %s", \
str(traffic['l3']['srcip']))
- logging.debug("traffic['l3']['dstip'] = " + \
+ logging.debug("traffic['l3']['dstip'] = %s", \
str(traffic['l3']['dstip']))
- logging.debug("traffic['l4']['srcport'] = " + \
+ logging.debug("traffic['l4']['srcport'] = %s", \
str(traffic['l4']['srcport']))
- logging.debug("traffic['l4']['dstport'] = " + \
+ logging.debug("traffic['l4']['dstport'] = %s", \
str(traffic['l4']['dstport']))
- logging.debug("traffic['vlan']['enabled'] = " + \
+ logging.debug("traffic['vlan']['enabled'] = %s", \
str(traffic['vlan']['enabled']))
- logging.debug("traffic['vlan']['id'] = " + \
+ logging.debug("traffic['vlan']['id'] = %s", \
str(traffic['vlan']['id']))
- logging.debug("traffic['vlan']['priority'] = " + \
+ logging.debug("traffic['vlan']['priority'] = %s", \
str(traffic['vlan']['priority']))
- logging.debug("traffic['vlan']['cfi'] = " + \
+ logging.debug("traffic['vlan']['cfi'] = %s", \
str(traffic['vlan']['cfi']))
logging.debug(traffic['l2']['framesize'])
(traffic['frame_rate'] / 100) * (self._moongen_line_speed / \
(8 * (traffic['l2']['framesize'] + 20)) / math.pow(10, 6)))
- logging.debug("startRate = " + start_rate)
+ logging.debug("startRate = %s", start_rate)
- out_file.write("startRate = " + \
+ out_file.write("startRate = %s" % \
start_rate + "\n")
out_file.write("}" + "\n")
return moongen_results
- def send_rfc2544_throughput(self, traffic=None, duration=20,
- lossrate=0.0, tests=1):
+ def send_rfc2544_throughput(self, traffic=None, tests=1, duration=20,
+ lossrate=0.0):
#
# Send traffic per RFC2544 throughput test specifications.
#
"""
self._logger.info('In moongen wait_rfc2544_throughput')
- def send_rfc2544_back2back(self, traffic=None, duration=60,
- lossrate=0.0, tests=1):
+ def send_rfc2544_back2back(self, traffic=None, tests=1, duration=60,
+ lossrate=0.0):
"""Send traffic per RFC2544 back2back test specifications.
Send packets at a fixed rate, using ``traffic``
row["ForwardingRate(fps)"])
return result
- # pylint: disable=unused-argument
- def send_rfc2889_forwarding(self, traffic=None, tests=1, duration=20):
+ def send_rfc2889_forwarding(self, traffic=None, tests=1, _duration=20):
"""
Send traffic per RFC2889 Forwarding test specifications.
"""
framesize = traffic['l2']['framesize']
args = get_rfc2889_common_settings(framesize, tests,
traffic['traffic_type'])
- if settings.getValue("TRAFFICGEN_STC_VERBOSE") is "True":
+ if settings.getValue("TRAFFICGEN_STC_VERBOSE") == "True":
args.append("--verbose")
verbose = True
self._logger.debug("Arguments used to call test: %s", args)
return self.get_rfc2889_forwarding_results(filec)
- def send_rfc2889_caching(self, traffic=None, tests=1, duration=20):
+ def send_rfc2889_caching(self, traffic=None, tests=1, _duration=20):
"""
Send as per RFC2889 Addr-Caching test specifications.
"""
custom_args = get_rfc2889_custom_settings()
args = common_args + custom_args
- if settings.getValue("TRAFFICGEN_STC_VERBOSE") is "True":
+ if settings.getValue("TRAFFICGEN_STC_VERBOSE") == "True":
args.append("--verbose")
verbose = True
self._logger.debug("Arguments used to call test: %s", args)
return self.get_rfc2889_addr_caching_results(filec)
- def send_rfc2889_learning(self, traffic=None, tests=1, duration=20):
+ def send_rfc2889_learning(self, traffic=None, tests=1, _duration=20):
"""
Send traffic per RFC2889 Addr-Learning test specifications.
"""
custom_args = get_rfc2889_custom_settings()
args = common_args + custom_args
- if settings.getValue("TRAFFICGEN_STC_VERBOSE") is "True":
+ if settings.getValue("TRAFFICGEN_STC_VERBOSE") == "True":
args.append("--verbose")
verbose = True
self._logger.debug("Arguments used to call test: %s", args)
custom, 1)
args = rfc2544_common_args + stc_common_args + rfc2544_custom_args
- if settings.getValue("TRAFFICGEN_STC_VERBOSE") is "True":
+ if settings.getValue("TRAFFICGEN_STC_VERBOSE") == "True":
args.append("--verbose")
verbose = True
self._logger.debug("Arguments used to call test: %s", args)
tests)
args = rfc2544_common_args + stc_common_args + rfc2544_custom_args
- if settings.getValue("TRAFFICGEN_STC_VERBOSE") is "True":
+ if settings.getValue("TRAFFICGEN_STC_VERBOSE") == "True":
args.append("--verbose")
verbose = True
self._logger.debug("Arguments used to call test: %s", args)
tests)
args = rfc2544_common_args + stc_common_args + rfc2544_custom_args
- if settings.getValue("TRAFFICGEN_STC_VERBOSE") is "True":
+ if settings.getValue("TRAFFICGEN_STC_VERBOSE") == "True":
args.append("--verbose")
verbose = True
self._logger.info("Arguments used to call test: %s", args)
}
with TestCenter() as dev:
print(dev.send_rfc2544_throughput(traffic=TRAFFIC))
+ # pylint: disable=no-member
print(dev.send_rfc2544_backtoback(traffic=TRAFFIC))
self.finished = threading.Event()
self.setDaemon(True)
_LOGGER.debug(
- 'Xena Socket keep alive thread initiated, interval ' +
- '{} seconds'.format(self.interval))
+ 'Xena Socket keep alive thread initiated, interval %s seconds', self.interval)
def stop(self):
""" Thread stop. See python thread docs for more info
statdict[entry_id] = self._pack_stats(param, 3)
elif param[1] == 'PR_TPLDS':
tid_list = self._pack_tplds_stats(param, 2)
- if len(tid_list):
+ if tid_list:
statdict['pr_tplds'] = tid_list
elif param[1] == 'PR_TPLDTRAFFIC':
if 'pr_tpldstraffic' in statdict:
enable the pairs topology
:return: None
"""
- # set duplex mode, this code is valid, pylint complaining with a
- # warning that many have complained about online.
- # pylint: disable=redefined-variable-type
-
try:
if self._params['traffic']['bidir'] == "True":
j_file = XenaJSONMesh()
'tests': tests,
}
i = 0
- # pylint: disable=no-member
for output_file in output_files:
template = template_env.get_template(_TEMPLATE_FILES[i])
output_text = template.render(template_vars)
return None
versions = re.findall(regex, output)
- if len(versions):
+ if versions:
return versions[0]
else:
return None
if not '16' in release:
tmp_ver[2] += line.rstrip('\n').split(' ')[2]
- if len(tmp_ver[0]):
+ if tmp_ver[0]:
app_version = '.'.join(tmp_ver)
app_git_tag = get_git_tag(S.getValue('TOOLS')['dpdk_src'])
elif app_name.lower().startswith('qemu'):
return True
@staticmethod
- def validate_Assert(result, dummy_condition):
+ def validate_Assert(result, _dummy_condition):
""" Validate evaluation of given `condition'
"""
return result
return eval(expression)
@staticmethod
- def validate_Eval(result, dummy_expression):
+ def validate_Eval(result, _dummy_expression):
""" Validate result of python `expression' evaluation
"""
return result is not None
return True
@staticmethod
- def validate_Exec_Python(result, dummy_code):
+ def validate_Exec_Python(result, _dummy_code):
""" Validate result of python `code' execution
"""
return result
return output
@staticmethod
- def validate_Exec_Shell(result, dummy_command, dummy_regex=None):
+ def validate_Exec_Shell(result, _dummy_command, _dummy_regex=None):
""" validate result of shell `command' execution
"""
return result is not None
return None
@staticmethod
- def validate_Exec_Shell_Background(result, dummy_command, dummy_regex=None):
+ def validate_Exec_Shell_Background(result, _dummy_command, _dummy_regex=None):
""" validate result of shell `command' execution on the background
"""
return result is not None
port, peer_port), False)
-# pylint: disable=unused-argument
-def validate_add_veth_port(result, port, peer_port):
+def validate_add_veth_port(_result, port, peer_port):
"""
Validation function for integration testcases
"""
return all([port in devs, peer_port in devs])
-def validate_bring_up_eth_port(result, eth_port, namespace=None):
+def validate_bring_up_eth_port(_result, eth_port, namespace=None):
"""
Validation function for integration testcases
"""
return True
-def validate_del_veth_port(result, port, peer_port):
+def validate_del_veth_port(_result, port, peer_port):
"""
Validation function for integration testcases
"""
This package contains an interface the VSPERF core uses for controlling
VNFs and VNF-specific implementation modules of this interface.
"""
-
This package contains an implementation of the interface the VSPERF core
uses for controlling VNFs using QEMU and DPDK's testpmd application.
"""
-
elif self._guest_loopback == 'linux_bridge':
self._configure_linux_bridge()
elif self._guest_loopback != 'clean':
- raise RuntimeError('Unsupported guest loopback method "%s" was specified.',
+ raise RuntimeError('Unsupported guest loopback method "%s" was specified.' %
self._guest_loopback)
def wait(self, prompt=None, timeout=30):
"""VNF interface and helpers.
"""
-from vnfs import *
+import vnfs
self.execute(cmd)
return self.wait(prompt=prompt, timeout=timeout)
- def validate_start(self, dummyresult):
+ def validate_start(self, _dummyresult):
""" Validate call of VNF start()
"""
if self._child and self._child.isalive():
return not self.validate_start(result)
@staticmethod
- def validate_execute_and_wait(result, dummy_cmd, dummy_timeout=30, dummy_prompt=''):
+ def validate_execute_and_wait(result, _dummy_cmd, _dummy_timeout=30, _dummy_prompt=''):
""" Validate command execution within VNF
"""
return len(result) > 0
e.g. --test-params "['x=z; y=(a,b)','x=z']"
"""
def __call__(self, parser, namespace, values, option_string=None):
-
if values[0] == '[':
input_list = ast.literal_eval(values)
parameter_list = []
""" Function will return a list of vSwitches detected in given ``rst_files``.
"""
vswitch_names = set()
- if len(rst_files):
+ if rst_files:
try:
output = subprocess.check_output(['grep', '-h', '^* vSwitch'] + rst_files).decode().splitlines()
for line in output:
if match:
vswitch_names.add(match.group(1))
- if len(vswitch_names):
+ if vswitch_names:
return list(vswitch_names)
except subprocess.CalledProcessError:
# check if there are any results in rst format
rst_results = glob.glob(os.path.join(path, 'result*rst'))
pkt_processors = get_vswitch_names(rst_results)
- if len(rst_results):
+ if rst_results:
try:
test_report = os.path.join(path, '{}_{}'.format('_'.join(pkt_processors), _TEMPLATE_RST['final']))
# create report caption directly - it is not worth to execute jinja machinery
sriov_nic.update({tmp_nic[0] : int(tmp_nic[1][2:])})
# sriov is required for some NICs
- if len(sriov_nic):
+ if sriov_nic:
for nic in sriov_nic:
# check if SRIOV is supported and enough virt interfaces are available
if not networkcard.is_sriov_supported(nic) \
if os.path.exists(results_path):
files_list = os.listdir(results_path)
if files_list == []:
- _LOGGER.info("Removing empty result directory: " + results_path)
+ _LOGGER.info("Removing empty result directory: %s", results_path)
shutil.rmtree(results_path)
except AttributeError:
# skip it if parameter doesn't exist
# create results directory
if not os.path.exists(results_path):
- _LOGGER.info("Creating result directory: " + results_path)
+ _LOGGER.info("Creating result directory: %s", results_path)
os.makedirs(results_path)
# pylint: disable=too-many-nested-blocks
if settings.getValue('mode') == 'trafficgen':
# Default - run all tests
selected_tests = testcases
- if not len(selected_tests):
+ if not selected_tests:
_LOGGER.error("No tests matched --tests option or positional args. Done.")
vsperf_finalize()
sys.exit(1)
- # run tests
- # Add pylint exception: Redefinition of test type from
- # testcases.integration.IntegrationTestCase to testcases.performance.PerformanceTestCase
- # pylint: disable=redefined-variable-type
suite = unittest.TestSuite()
settings_snapshot = copy.deepcopy(settings.__dict__)
This package contains an interface the VSPERF core uses for controlling
vSwitches and vSwitch-specific implementation modules of this interface.
"""
-
"""
if switch_name is None or remote_switch_name is None:
- return
+ return None
bridge = self._bridges[switch_name]
remote_bridge = self._bridges[remote_switch_name]
with open(self._ovsdb_pidfile_path, "r") as pidfile:
ovsdb_pid = pidfile.read().strip()
- self._logger.info("Killing ovsdb with pid: " + ovsdb_pid)
+ self._logger.info("Killing ovsdb with pid: %s", ovsdb_pid)
if ovsdb_pid:
tasks.terminate_task(ovsdb_pid, logger=self._logger)
#
# validate methods required for integration testcases
#
- def validate_add_switch(self, dummy_result, switch_name, dummy_params=None):
+ def validate_add_switch(self, _dummy_result, switch_name, _dummy_params=None):
"""Validate - Create a new logical switch with no ports
"""
bridge = self._bridges[switch_name]
# Method could be a function
# pylint: disable=no-self-use
- def validate_del_switch(self, dummy_result, switch_name):
+ def validate_del_switch(self, _dummy_result, switch_name):
"""Validate removal of switch
"""
bridge = OFBridge('tmp')
"""
return self.validate_add_phy_port(result, switch_name)
- def validate_del_port(self, dummy_result, switch_name, port_name):
+ def validate_del_port(self, _dummy_result, switch_name, port_name):
""" Validate that port_name was removed from bridge.
"""
bridge = self._bridges[switch_name]
assert 'Port "%s"' % port_name not in output[0]
return True
- def validate_add_flow(self, dummy_result, switch_name, flow, dummy_cache='off'):
+ def validate_add_flow(self, _dummy_result, switch_name, flow, _dummy_cache='off'):
""" Validate insertion of the flow into the switch
"""
return True
return False
- def validate_del_flow(self, dummy_result, switch_name, flow=None):
+ def validate_del_flow(self, _dummy_result, switch_name, flow=None):
""" Validate removal of the flow
"""
if not flow:
# what else we can do?
return True
- return not self.validate_add_flow(dummy_result, switch_name, flow)
+ return not self.validate_add_flow(_dummy_result, switch_name, flow)
- def validate_dump_flows(self, dummy_result, dummy_switch_name):
+ def validate_dump_flows(self, _dummy_result, _dummy_switch_name):
""" Validate call of flow dump
"""
return True
- def validate_disable_rstp(self, dummy_result, switch_name):
+ def validate_disable_rstp(self, _dummy_result, switch_name):
""" Validate rstp disable
"""
bridge = self._bridges[switch_name]
return 'rstp_enable : false' in ''.join(bridge.bridge_info())
- def validate_enable_rstp(self, dummy_result, switch_name):
+ def validate_enable_rstp(self, _dummy_result, switch_name):
""" Validate rstp enable
"""
bridge = self._bridges[switch_name]
return 'rstp_enable : true' in ''.join(bridge.bridge_info())
- def validate_disable_stp(self, dummy_result, switch_name):
+ def validate_disable_stp(self, _dummy_result, switch_name):
""" Validate stp disable
"""
bridge = self._bridges[switch_name]
return 'stp_enable : false' in ''.join(bridge.bridge_info())
- def validate_enable_stp(self, dummy_result, switch_name):
+ def validate_enable_stp(self, _dummy_result, switch_name):
""" Validate stp enable
"""
bridge = self._bridges[switch_name]
return 'stp_enable : true' in ''.join(bridge.bridge_info())
- def validate_restart(self, dummy_result):
+ def validate_restart(self, _dummy_result):
""" Validate restart
"""
return True
def add_switch(self, switch_name, dummy_params=None):
"""See IVswitch for general description
"""
+ # pylint: disable=unused-argument
if switch_name in self._switches:
self._logger.warning("switch %s already exists...", switch_name)
else:
"""See IVswitch for general description
:raises: RuntimeError
"""
+ # pylint: disable=unused-argument
# get list of physical interfaces with PCI addresses
vpp_nics = self._get_nic_info(key='Pci')
# check if there are any NICs left
def add_vport(self, dummy_switch_name):
"""See IVswitch for general description
"""
+ # pylint: disable=unused-argument
socket_name = S.getValue('TOOLS')['ovs_var_tmp'] + 'dpdkvhostuser' + str(len(self._virt_ports))
if S.getValue('VSWITCH_VHOSTUSER_SERVER_MODE'):
mode = ['server']
if bidir:
self.run_vppctl(['set', 'interface', 'l2', 'xconnect', port2, port1])
- def add_bridge(self, switch_name, port1, port2, dummy_bidir=False):
+ def add_bridge(self, switch_name, port1, port2, _dummy_bidir=False):
"""Add given ports to bridge ``switch_name``
"""
self.run_vppctl(['set', 'interface', 'l2', 'bridge', port1,
elif mode == 'bridge':
self.add_bridge(switch_name, port1, port2)
else:
- raise RuntimeError('VPP: Unsupported l2 connection mode detected %s', mode)
+ raise RuntimeError('VPP: Unsupported l2 connection mode detected %s' % mode)
def del_l2patch(self, port1, port2, bidir=False):
"""Remove l2patch connection between given ports
if bidir:
self.run_vppctl(['test', 'l2patch', 'rx', port2, 'tx', port1, 'del'])
- def del_xconnect(self, dummy_port1, dummy_port2, dummy_bidir=False):
+ def del_xconnect(self, _dummy_port1, _dummy_port2, _dummy_bidir=False):
"""Remove xconnect connection between given ports
"""
self._logger.warning('VPP: Removal of l2 xconnect is not implemented.')
- def del_bridge(self, dummy_switch_name, dummy_port1, dummy_port2):
+ def del_bridge(self, _dummy_switch_name, _dummy_port1, _dummy_port2):
"""Remove given ports from the bridge
"""
self._logger.warning('VPP: Removal of interfaces from bridge is not implemented.')
elif mode == 'bridge':
self.del_bridge(switch_name, port1, port2)
else:
- raise RuntimeError('VPP: Unsupported l2 connection mode detected %s', mode)
+ raise RuntimeError('VPP: Unsupported l2 connection mode detected %s' % mode)
def dump_l2patch(self):
"""Dump l2patch connections
elif mode == 'bridge':
self.dump_bridge(switch_name)
else:
- raise RuntimeError('VPP: Unsupported l2 connection mode detected %s', mode)
+ raise RuntimeError('VPP: Unsupported l2 connection mode detected %s' % mode)
def run_vppctl(self, args, check_error=False):
"""Run ``vppctl`` with supplied arguments.
#
# Validate methods
#
- def validate_add_switch(self, dummy_result, switch_name, dummy_params=None):
+ def validate_add_switch(self, _dummy_result, switch_name, _dummy_params=None):
"""Validate - Create a new logical switch with no ports
"""
return switch_name in self._switches
- def validate_del_switch(self, dummy_result, switch_name):
+ def validate_del_switch(self, _dummy_result, switch_name):
"""Validate removal of switch
"""
- return not self.validate_add_switch(dummy_result, switch_name)
+ return not self.validate_add_switch(_dummy_result, switch_name)
- def validate_add_phy_port(self, result, dummy_switch_name):
+ def validate_add_phy_port(self, result, _dummy_switch_name):
""" Validate that physical port was added to bridge.
"""
return result[0] in self._phy_ports
- def validate_add_vport(self, result, dummy_switch_name):
+ def validate_add_vport(self, result, _dummy_switch_name):
""" Validate that virtual port was added to bridge.
"""
return result[0] in self._virt_ports
- def validate_del_port(self, dummy_result, dummy_switch_name, port_name):
+ def validate_del_port(self, _dummy_result, _dummy_switch_name, port_name):
""" Validate that port_name was removed from bridge.
"""
return not (port_name in self._phy_ports or port_name in self._virt_ports)
# pylint: disable=no-self-use
- def validate_add_connection(self, dummy_result, dummy_switch_name, dummy_port1,
- dummy_port2, dummy_bidir=False):
+ def validate_add_connection(self, _dummy_result, _dummy_switch_name, _dummy_port1,
+ _dummy_port2, _dummy_bidir=False):
""" Validate that connection was added
"""
return True
- def validate_del_connection(self, dummy_result, dummy_switch_name, dummy_port1,
- dummy_port2, dummy_bidir=False):
+ def validate_del_connection(self, _dummy_result, _dummy_switch_name, _dummy_port1,
+ _dummy_port2, _dummy_bidir=False):
""" Validate that connection was deleted
"""
return True
- def validate_dump_connections(self, dummy_result, dummy_switch_name):
+ def validate_dump_connections(self, _dummy_result, _dummy_switch_name):
""" Validate dump connections call
"""
return True
- def validate_run_vppctl(self, result, dummy_args, dummy_check_error=False):
+ def validate_run_vppctl(self, result, _dummy_args, _dummy_check_error=False):
"""validate execution of ``vppctl`` with supplied arguments.
"""
# there shouldn't be any stderr