-# Copyright 2015 Intel Corporation.
+# Copyright 2015-2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
* TRAFFICGEN_IXNET_LIB_PATH
IxNetwork libraries path
-* TRAFFICGEN_IXNET_HOST
- IxNetwork host IP address
* TRAFFICGEN_IXNET_PORT
IxNetwork host port number
* TRAFFICGEN_IXNET_USER
as the previous one
The following settings are also required. These can likely be shared
-an 'Ixia' traffic generator instance:
+with an 'Ixia' traffic generator instance:
* TRAFFICGEN_IXIA_HOST
IXIA chassis IP address
be expected that the user have access to the IxNetwork machine should
this trafficgen need to be debugged.
"""
-
import tkinter
import logging
import os
from collections import OrderedDict
from tools.pkt_gen import trafficgen
from conf import settings
+from conf import merge_spec
from core.results.results_constants import ResultsConstants
_ROOT_DIR = os.path.dirname(os.path.realpath(__file__))
for key in values:
value = values[key]
- # Not allowing derived dictionary types for now
- # pylint: disable=unidiomatic-typecheck
- if type(value) == dict:
+ if isinstance(value, dict):
_prefix = ' '.join([prefix, key]).strip()
for subkey in _build_set_cmds(value, _prefix):
yield subkey
continue
- # pylint: disable=unidiomatic-typecheck
# tcl doesn't recognise the strings "True" or "False", only "1"
# or "0". Special case to convert them
- if type(value) == bool:
+ if isinstance(value, bool):
value = str(int(value))
else:
value = str(value)
Currently only the RFC2544 tests are implemented.
"""
- _script = os.path.join(os.path.dirname(__file__), 'ixnetrfc2544.tcl')
- _tclsh = tkinter.Tcl()
- _cfg = None
- _logger = logging.getLogger(__name__)
- _params = None
- _bidir = None
+
+ def __init__(self):
+ """Initialize IXNET members
+ """
+ super().__init__()
+ self._tclsh = tkinter.Tcl()
+ self._script = None
+ self._cfg = None
+ self._logger = logging.getLogger(__name__)
+ self._params = None
+ self._bidir = None
def run_tcl(self, cmd):
"""Run a TCL script using the TCL interpreter found in ``tkinter``.
return output.split()
- def connect(self):
+ def configure(self):
"""Configure system for IxNetwork.
"""
+ self._script = os.path.join(settings.getValue('TRAFFICGEN_IXIA_3RD_PARTY'),
+ settings.getValue('TRAFFICGEN_IXNET_TCL_SCRIPT'))
self._cfg = {
'lib_path': settings.getValue('TRAFFICGEN_IXNET_LIB_PATH'),
# IxNetwork machine configuration
self._logger.debug('IXIA configuration configuration : %s', self._cfg)
- return self
+ def connect(self):
+ """Connect to IxNetwork - nothing to be done here
+ """
+ pass
def disconnect(self):
"""Disconnect from Ixia chassis.
"""
pass
- def send_cont_traffic(self, traffic=None, duration=30, multistream=False):
+ def send_cont_traffic(self, traffic=None, duration=30):
"""See ITrafficGenerator for description
"""
self.start_cont_traffic(traffic, duration)
def start_cont_traffic(self, traffic=None, duration=30):
"""Start transmission.
"""
+ self.configure()
self._bidir = traffic['bidir']
self._params = {}
'multipleStreams': traffic['multistream'],
'streamType': traffic['stream_type'],
'rfc2544TestType': 'throughput',
+ 'flowControl': "True" if traffic['flow_control'] else "False",
+ 'learningFrames': "True" if traffic['learning_frames'] else "False",
}
self._params['traffic'] = self.traffic_defaults.copy()
if traffic:
- self._params['traffic'] = trafficgen.merge_spec(
+ self._params['traffic'] = merge_spec(
self._params['traffic'], traffic)
self._cfg['bidir'] = self._bidir
"""
return self._wait_result()
- def send_rfc2544_throughput(self, traffic=None, trials=3, duration=20,
- lossrate=0.0, multistream=False):
+ def send_rfc2544_throughput(self, traffic=None, tests=1, duration=20,
+ lossrate=0.0):
"""See ITrafficGenerator for description
"""
- self.start_rfc2544_throughput(traffic, trials, duration, lossrate)
+ self.start_rfc2544_throughput(traffic, tests, duration, lossrate)
return self.wait_rfc2544_throughput()
- def start_rfc2544_throughput(self, traffic=None, trials=3, duration=20,
+ def start_rfc2544_throughput(self, traffic=None, tests=1, duration=20,
lossrate=0.0):
"""Start transmission.
"""
+ self.configure()
self._bidir = traffic['bidir']
self._params = {}
self._params['config'] = {
'binary': True,
- 'trials': trials,
+ 'tests': tests,
'duration': duration,
'lossrate': lossrate,
'multipleStreams': traffic['multistream'],
'streamType': traffic['stream_type'],
'rfc2544TestType': 'throughput',
+ 'flowControl': "True" if traffic['flow_control'] else "False",
+ 'learningFrames': "True" if traffic['learning_frames'] else "False",
}
self._params['traffic'] = self.traffic_defaults.copy()
if traffic:
- self._params['traffic'] = trafficgen.merge_spec(
+ self._params['traffic'] = merge_spec(
self._params['traffic'], traffic)
self._cfg['bidir'] = self._bidir
tx_mbps = 'Unknown'
if bool(results.get(ResultsConstants.THROUGHPUT_RX_FPS)) \
- == False:
+ is False:
prev_percent_rx = 0.0
else:
prev_percent_rx = \
results[ResultsConstants.THROUGHPUT_RX_MBPS] = row[6]
results[ResultsConstants.TX_RATE_PERCENT] = row[3]
results[ResultsConstants.THROUGHPUT_RX_PERCENT] = row[4]
+ results[ResultsConstants.FRAME_LOSS_PERCENT] = row[10]
results[ResultsConstants.MIN_LATENCY_NS] = row[11]
results[ResultsConstants.MAX_LATENCY_NS] = row[12]
results[ResultsConstants.AVG_LATENCY_NS] = row[13]
# the results file
return parse_ixnet_rfc_results(parse_result_string(output[0]))
- def send_rfc2544_back2back(self, traffic=None, trials=1, duration=20,
+ def send_rfc2544_back2back(self, traffic=None, tests=1, duration=2,
lossrate=0.0):
"""See ITrafficGenerator for description
"""
- self.start_rfc2544_back2back(traffic, trials, duration, lossrate)
+ # NOTE 2 seconds is the recommended duration for a back 2 back
+ # test in RFC2544. 50 trials is the recommended number from the
+ # RFC also.
+ self.start_rfc2544_back2back(traffic, tests, duration, lossrate)
return self.wait_rfc2544_back2back()
- def start_rfc2544_back2back(self, traffic=None, trials=1, duration=20,
+ def start_rfc2544_back2back(self, traffic=None, tests=1, duration=2,
lossrate=0.0):
"""Start transmission.
"""
+ self.configure()
self._bidir = traffic['bidir']
self._params = {}
self._params['config'] = {
'binary': True,
- 'trials': trials,
+ 'tests': tests,
'duration': duration,
'lossrate': lossrate,
'multipleStreams': traffic['multistream'],
'streamType': traffic['stream_type'],
'rfc2544TestType': 'back2back',
+ 'flowControl': "True" if traffic['flow_control'] else "False",
+ 'learningFrames': "True" if traffic['learning_frames'] else "False",
}
self._params['traffic'] = self.traffic_defaults.copy()
if traffic:
- self._params['traffic'] = trafficgen.merge_spec(
+ self._params['traffic'] = merge_spec(
self._params['traffic'], traffic)
self._cfg['bidir'] = self._bidir
"""
results = OrderedDict()
results[ResultsConstants.B2B_FRAMES] = 0
+ results[ResultsConstants.B2B_FRAME_LOSS_PERCENT] = 100
with open(path, 'r') as in_file:
reader = csv.reader(in_file, delimiter=',')
if int(row[12]) > \
int(results[ResultsConstants.B2B_FRAMES]):
results[ResultsConstants.B2B_FRAMES] = int(row[12])
+ results[ResultsConstants.B2B_FRAME_LOSS_PERCENT] = float(row[14])
return results
return parse_ixnet_rfc_results(parse_result_string(output[0]))
+ def send_burst_traffic(self, traffic=None, numpkts=100, duration=20):
+ return NotImplementedError('IxNet does not implement send_burst_traffic')
if __name__ == '__main__':
TRAFFIC = {