-# Copyright 2015-2016 Intel Corporation.
+# Copyright 2015-2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
be expected that the user have access to the IxNetwork machine should
this trafficgen need to be debugged.
"""
-
import tkinter
import logging
import os
from collections import OrderedDict
from tools.pkt_gen import trafficgen
from conf import settings
+from conf import merge_spec
from core.results.results_constants import ResultsConstants
_ROOT_DIR = os.path.dirname(os.path.realpath(__file__))
for key in values:
value = values[key]
- # Not allowing derived dictionary types for now
- # pylint: disable=unidiomatic-typecheck
- if type(value) == dict:
+ if isinstance(value, dict):
_prefix = ' '.join([prefix, key]).strip()
for subkey in _build_set_cmds(value, _prefix):
yield subkey
continue
- # pylint: disable=unidiomatic-typecheck
# tcl doesn't recognise the strings "True" or "False", only "1"
# or "0". Special case to convert them
- if type(value) == bool:
+ if isinstance(value, bool):
value = str(int(value))
else:
value = str(value)
Currently only the RFC2544 tests are implemented.
"""
- if settings.getValue('TRAFFICGEN_IXNET_TCL_SCRIPT') == '':
- _script = os.path.join(os.path.dirname(__file__), 'ixnetrfc2544.tcl')
- else:
- _script = os.path.join(os.path.dirname(__file__),
- settings.getValue('TRAFFICGEN_IXNET_TCL_SCRIPT'))
- _tclsh = tkinter.Tcl()
- _cfg = None
- _logger = logging.getLogger(__name__)
- _params = None
- _bidir = None
+
+ def __init__(self):
+ """Initialize IXNET members
+ """
+ super().__init__()
+ self._tclsh = tkinter.Tcl()
+ self._script = None
+ self._cfg = None
+ self._logger = logging.getLogger(__name__)
+ self._params = None
+ self._bidir = None
def run_tcl(self, cmd):
"""Run a TCL script using the TCL interpreter found in ``tkinter``.
return output.split()
- def connect(self):
+ def configure(self):
"""Configure system for IxNetwork.
"""
+ self._script = os.path.join(settings.getValue('TRAFFICGEN_IXIA_3RD_PARTY'),
+ settings.getValue('TRAFFICGEN_IXNET_TCL_SCRIPT'))
self._cfg = {
'lib_path': settings.getValue('TRAFFICGEN_IXNET_LIB_PATH'),
# IxNetwork machine configuration
self._logger.debug('IXIA configuration configuration : %s', self._cfg)
- return self
+ def connect(self):
+ """Connect to IxNetwork - nothing to be done here
+ """
+ pass
def disconnect(self):
"""Disconnect from Ixia chassis.
def start_cont_traffic(self, traffic=None, duration=30):
"""Start transmission.
"""
+ self.configure()
self._bidir = traffic['bidir']
self._params = {}
'multipleStreams': traffic['multistream'],
'streamType': traffic['stream_type'],
'rfc2544TestType': 'throughput',
+ 'flowControl': "True" if traffic['flow_control'] else "False",
+ 'learningFrames': "True" if traffic['learning_frames'] else "False",
}
self._params['traffic'] = self.traffic_defaults.copy()
if traffic:
- self._params['traffic'] = trafficgen.merge_spec(
+ self._params['traffic'] = merge_spec(
self._params['traffic'], traffic)
self._cfg['bidir'] = self._bidir
"""
return self._wait_result()
- def send_rfc2544_throughput(self, traffic=None, trials=3, duration=20,
+ def send_rfc2544_throughput(self, traffic=None, tests=1, duration=20,
lossrate=0.0):
"""See ITrafficGenerator for description
"""
- self.start_rfc2544_throughput(traffic, trials, duration, lossrate)
+ self.start_rfc2544_throughput(traffic, tests, duration, lossrate)
return self.wait_rfc2544_throughput()
- def start_rfc2544_throughput(self, traffic=None, trials=3, duration=20,
+ def start_rfc2544_throughput(self, traffic=None, tests=1, duration=20,
lossrate=0.0):
"""Start transmission.
"""
+ self.configure()
self._bidir = traffic['bidir']
self._params = {}
self._params['config'] = {
'binary': True,
- 'trials': trials,
+ 'tests': tests,
'duration': duration,
'lossrate': lossrate,
'multipleStreams': traffic['multistream'],
'streamType': traffic['stream_type'],
'rfc2544TestType': 'throughput',
+ 'flowControl': "True" if traffic['flow_control'] else "False",
+ 'learningFrames': "True" if traffic['learning_frames'] else "False",
}
self._params['traffic'] = self.traffic_defaults.copy()
if traffic:
- self._params['traffic'] = trafficgen.merge_spec(
+ self._params['traffic'] = merge_spec(
self._params['traffic'], traffic)
self._cfg['bidir'] = self._bidir
# the results file
return parse_ixnet_rfc_results(parse_result_string(output[0]))
- def send_rfc2544_back2back(self, traffic=None, trials=50, duration=2,
+ def send_rfc2544_back2back(self, traffic=None, tests=1, duration=2,
lossrate=0.0):
"""See ITrafficGenerator for description
"""
# NOTE 2 seconds is the recommended duration for a back 2 back
# test in RFC2544. 50 trials is the recommended number from the
# RFC also.
- self.start_rfc2544_back2back(traffic, trials, duration, lossrate)
+ self.start_rfc2544_back2back(traffic, tests, duration, lossrate)
return self.wait_rfc2544_back2back()
- def start_rfc2544_back2back(self, traffic=None, trials=50, duration=2,
+ def start_rfc2544_back2back(self, traffic=None, tests=1, duration=2,
lossrate=0.0):
"""Start transmission.
"""
+ self.configure()
self._bidir = traffic['bidir']
self._params = {}
self._params['config'] = {
'binary': True,
- 'trials': trials,
+ 'tests': tests,
'duration': duration,
'lossrate': lossrate,
'multipleStreams': traffic['multistream'],
'streamType': traffic['stream_type'],
'rfc2544TestType': 'back2back',
+ 'flowControl': "True" if traffic['flow_control'] else "False",
+ 'learningFrames': "True" if traffic['learning_frames'] else "False",
}
self._params['traffic'] = self.traffic_defaults.copy()
if traffic:
- self._params['traffic'] = trafficgen.merge_spec(
+ self._params['traffic'] = merge_spec(
self._params['traffic'], traffic)
self._cfg['bidir'] = self._bidir
return parse_ixnet_rfc_results(parse_result_string(output[0]))
+ def send_burst_traffic(self, traffic=None, numpkts=100, duration=20):
+ return NotImplementedError('IxNet does not implement send_burst_traffic')
if __name__ == '__main__':
TRAFFIC = {