import traceback
from itertools import count
+# pylint: disable=import-error
+from scapy.contrib.mpls import MPLS # flake8: noqa
+# pylint: enable=import-error
from nfvbench.log import LOG
from nfvbench.traffic_server import TRexTrafficServer
from nfvbench.utils import cast_integer
from nfvbench.utils import timeout
from nfvbench.utils import TimeoutError
-from traffic_base import AbstractTrafficGenerator
-from traffic_base import TrafficGeneratorException
-import traffic_utils as utils
-from traffic_utils import IMIX_AVG_L2_FRAME_SIZE
-from traffic_utils import IMIX_L2_SIZES
-from traffic_utils import IMIX_RATIOS
# pylint: disable=import-error
from trex.common.services.trex_service_arp import ServiceARP
from trex.stl.api import UDP
from trex.stl.api import XByteField
-
# pylint: enable=import-error
+from .traffic_base import AbstractTrafficGenerator
+from .traffic_base import TrafficGeneratorException
+from . import traffic_utils as utils
+from .traffic_utils import IMIX_AVG_L2_FRAME_SIZE
+from .traffic_utils import IMIX_L2_SIZES
+from .traffic_utils import IMIX_RATIOS
+
class VXLAN(Packet):
"""VxLAN class."""
else:
latencies[port].min_usec = get_latency(lat['total_min'])
latencies[port].avg_usec = get_latency(lat['average'])
+ # pick up the HDR histogram if present (otherwise will raise KeyError)
+ latencies[port].hdrh = lat['hdrh']
except KeyError:
pass
def __combine_latencies(self, in_stats, results, port_handle):
- """Traverse TRex result dictionary and combines chosen latency stats."""
+ """Traverse TRex result dictionary and combines chosen latency stats.
+
+ example of latency dict returned by trex (2 chains):
+ 'latency': {256: {'err_cntrs': {'dropped': 0,
+ 'dup': 0,
+ 'out_of_order': 0,
+ 'seq_too_high': 0,
+ 'seq_too_low': 0},
+ 'latency': {'average': 26.5,
+ 'hdrh': u'HISTFAAAAEx4nJNpmSgz...bFRgxi',
+ 'histogram': {20: 303,
+ 30: 320,
+ 40: 300,
+ 50: 73,
+ 60: 4,
+ 70: 1},
+ 'jitter': 14,
+ 'last_max': 63,
+ 'total_max': 63,
+ 'total_min': 20}},
+ 257: {'err_cntrs': {'dropped': 0,
+ 'dup': 0,
+ 'out_of_order': 0,
+ 'seq_too_high': 0,
+ 'seq_too_low': 0},
+ 'latency': {'average': 29.75,
+ 'hdrh': u'HISTFAAAAEV4nJN...CALilDG0=',
+ 'histogram': {20: 261,
+ 30: 431,
+ 40: 3,
+ 50: 80,
+ 60: 225},
+ 'jitter': 23,
+ 'last_max': 67,
+ 'total_max': 67,
+ 'total_min': 20}},
+ 384: {'err_cntrs': {'dropped': 0,
+ 'dup': 0,
+ 'out_of_order': 0,
+ 'seq_too_high': 0,
+ 'seq_too_low': 0},
+ 'latency': {'average': 18.0,
+ 'hdrh': u'HISTFAAAADR4nJNpm...MjCwDDxAZG',
+ 'histogram': {20: 987, 30: 14},
+ 'jitter': 0,
+ 'last_max': 34,
+ 'total_max': 34,
+ 'total_min': 20}},
+ 385: {'err_cntrs': {'dropped': 0,
+ 'dup': 0,
+ 'out_of_order': 0,
+ 'seq_too_high': 0,
+ 'seq_too_low': 0},
+ 'latency': {'average': 19.0,
+ 'hdrh': u'HISTFAAAADR4nJNpm...NkYmJgDdagfK',
+ 'histogram': {20: 989, 30: 11},
+ 'jitter': 0,
+ 'last_max': 38,
+ 'total_max': 38,
+ 'total_min': 20}},
+ 'global': {'bad_hdr': 0, 'old_flow': 0}},
+ """
total_max = 0
average = 0
total_min = float("inf")
"""
# Trex will add the FCS field, so we need to remove 4 bytes from the l2 frame size
frame_size = int(l2frame_size) - 4
-
+ vm_param = []
if stream_cfg['vxlan'] is True:
self._bind_vxlan()
encap_level = '1'
pkt_base /= UDP(sport=random.randint(1337, 32767), dport=4789)
pkt_base /= VXLAN(vni=stream_cfg['net_vni'])
pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
+ # need to randomize the outer header UDP src port based on flow
+ vxlan_udp_src_fv = STLVmFlowVar(
+ name="vxlan_udp_src",
+ min_value=1337,
+ max_value=32767,
+ size=2,
+ op="random")
+ vm_param = [vxlan_udp_src_fv,
+ STLVmWrFlowVar(fv_name="vxlan_udp_src", pkt_offset="UDP.sport")]
+ elif stream_cfg['mpls'] is True:
+ encap_level = '0'
+ pkt_base = Ether(src=stream_cfg['vtep_src_mac'], dst=stream_cfg['vtep_dst_mac'])
+ if stream_cfg['vtep_vlan'] is not None:
+ pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan'])
+ if stream_cfg['mpls_outer_label'] is not None:
+ pkt_base /= MPLS(label=stream_cfg['mpls_outer_label'], cos=1, s=0, ttl=255)
+ if stream_cfg['mpls_inner_label'] is not None:
+ pkt_base /= MPLS(label=stream_cfg['mpls_inner_label'], cos=1, s=1, ttl=255)
+ # Flow stats and MPLS labels randomization TBD
+ pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
else:
encap_level = '0'
pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
op="inc",
step=stream_cfg['ip_addrs_step'])
- vm_param = [
+ vm_param.extend([
src_fv,
STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP:{}.src".format(encap_level)),
dst_fv,
- STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP:{}.dst".format(encap_level)),
- STLVmFixChecksumHw(l3_offset="IP:{}".format(encap_level),
- l4_offset="UDP:{}".format(encap_level),
- l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
- ]
+ STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP:{}.dst".format(encap_level))
+ ])
+
+ for encap in range(int(encap_level), -1, -1):
+ # Fixing the checksums for all encap levels
+ vm_param.append(STLVmFixChecksumHw(l3_offset="IP:{}".format(encap),
+ l4_offset="UDP:{}".format(encap),
+ l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP))
pad = max(0, frame_size - len(pkt_base)) * 'x'
- return STLPktBuilder(pkt=pkt_base / pad, vm=STLScVmRaw(vm_param))
+ return STLPktBuilder(pkt=pkt_base / pad,
+ vm=STLScVmRaw(vm_param, cache_size=int(self.config.cache_size)))
- def generate_streams(self, port, chain_id, stream_cfg, l2frame, latency=True):
+ def generate_streams(self, port, chain_id, stream_cfg, l2frame, latency=True,
+ e2e=False):
"""Create a list of streams corresponding to a given chain and stream config.
port: port where the streams originate (0 or 1)
stream_cfg: stream configuration
l2frame: L2 frame size (including 4-byte FCS) or 'IMIX'
latency: if True also create a latency stream
+ e2e: True if performing "end to end" connectivity check
"""
streams = []
pg_id, lat_pg_id = self.get_pg_id(port, chain_id)
+ if self.config.no_flow_stats:
+ LOG.info("Traffic flow statistics are disabled.")
if l2frame == 'IMIX':
for ratio, l2_frame_size in zip(IMIX_RATIOS, IMIX_L2_SIZES):
pkt = self._create_pkt(stream_cfg, l2_frame_size)
- streams.append(STLStream(packet=pkt,
- flow_stats=STLFlowStats(pg_id=pg_id),
- mode=STLTXCont(pps=ratio)))
+ if e2e or stream_cfg['mpls']:
+ streams.append(STLStream(packet=pkt,
+ mode=STLTXCont(pps=ratio)))
+ else:
+ if stream_cfg['vxlan'] is True:
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowStats(pg_id=pg_id,
+ vxlan=True)
+ if not self.config.no_flow_stats else None,
+ mode=STLTXCont(pps=ratio)))
+ else:
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowStats(pg_id=pg_id)
+ if not self.config.no_flow_stats else None,
+ mode=STLTXCont(pps=ratio)))
if latency:
# for IMIX, the latency packets have the average IMIX packet size
else:
l2frame_size = int(l2frame)
pkt = self._create_pkt(stream_cfg, l2frame_size)
- streams.append(STLStream(packet=pkt,
- flow_stats=STLFlowStats(pg_id=pg_id),
- mode=STLTXCont()))
+ if e2e or stream_cfg['mpls']:
+ streams.append(STLStream(packet=pkt,
+ # Flow stats is disabled for MPLS now
+ # flow_stats=STLFlowStats(pg_id=pg_id),
+ mode=STLTXCont()))
+ else:
+ if stream_cfg['vxlan'] is True:
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowStats(pg_id=pg_id,
+ vxlan=True)
+ if not self.config.no_flow_stats else None,
+ mode=STLTXCont()))
+ else:
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowStats(pg_id=pg_id)
+ if not self.config.no_flow_stats else None,
+ mode=STLTXCont()))
# for the latency stream, the minimum payload is 16 bytes even in case of vlan tagging
# without vlan, the min l2 frame size is 64
# with vlan it is 68
pkt = self._create_pkt(stream_cfg, 68)
if latency:
- streams.append(STLStream(packet=pkt,
- flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id),
- mode=STLTXCont(pps=self.LATENCY_PPS)))
+ if self.config.no_latency_stats:
+ LOG.info("Latency flow statistics are disabled.")
+ if stream_cfg['vxlan'] is True:
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id,
+ vxlan=True)
+ if not self.config.no_latency_stats else None,
+ mode=STLTXCont(pps=self.LATENCY_PPS)))
+ else:
+ streams.append(STLStream(packet=pkt,
+ flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id)
+ if not self.config.no_latency_stats else None,
+ mode=STLTXCont(pps=self.LATENCY_PPS)))
return streams
@timeout(5)
def __connect_after_start(self):
# after start, Trex may take a bit of time to initialize
# so we need to retry a few times
- for it in xrange(self.config.generic_retry_count):
+ for it in range(self.config.generic_retry_count):
try:
time.sleep(1)
self.client.connect()
except Exception as ex:
if it == (self.config.generic_retry_count - 1):
raise
- LOG.info("Retrying connection to TRex (%s)...", ex.message)
+ LOG.info("Retrying connection to TRex (%s)...", ex.msg)
def connect(self):
"""Connect to the TRex server."""
async_port=self.generator_config.zmq_pub_port)
try:
self.__connect(self.client)
+ if server_ip == '127.0.0.1':
+ config_updated = self.__check_config()
+ if config_updated or self.config.restart:
+ self.__restart()
except (TimeoutError, STLError) as e:
if server_ip == '127.0.0.1':
- try:
- self.__start_server()
- self.__connect_after_start()
- except (TimeoutError, STLError) as e:
- LOG.error('Cannot connect to TRex')
- LOG.error(traceback.format_exc())
- logpath = '/tmp/trex.log'
- if os.path.isfile(logpath):
- # Wait for TRex to finish writing error message
- last_size = 0
- for _ in xrange(self.config.generic_retry_count):
- size = os.path.getsize(logpath)
- if size == last_size:
- # probably not writing anymore
- break
- last_size = size
- time.sleep(1)
- with open(logpath, 'r') as f:
- message = f.read()
- else:
- message = e.message
- raise TrafficGeneratorException(message)
+ self.__start_local_server()
else:
raise TrafficGeneratorException(e.message)
(self.port_info[0]['speed'],
self.port_info[1]['speed']))
+ def __start_local_server(self):
+ try:
+ LOG.info("Starting TRex ...")
+ self.__start_server()
+ self.__connect_after_start()
+ except (TimeoutError, STLError) as e:
+ LOG.error('Cannot connect to TRex')
+ LOG.error(traceback.format_exc())
+ logpath = '/tmp/trex.log'
+ if os.path.isfile(logpath):
+ # Wait for TRex to finish writing error message
+ last_size = 0
+ for _ in range(self.config.generic_retry_count):
+ size = os.path.getsize(logpath)
+ if size == last_size:
+ # probably not writing anymore
+ break
+ last_size = size
+ time.sleep(1)
+ with open(logpath, 'r') as f:
+ message = f.read()
+ else:
+ message = e.message
+ raise TrafficGeneratorException(message)
+
def __start_server(self):
server = TRexTrafficServer()
server.run_server(self.generator_config)
+ def __check_config(self):
+ server = TRexTrafficServer()
+ return server.check_config_updated(self.generator_config)
+
+ def __restart(self):
+ LOG.info("Restarting TRex ...")
+ self.__stop_server()
+ # Wait for server stopped
+ for _ in range(self.config.generic_retry_count):
+ time.sleep(1)
+ if not self.client.is_connected():
+ LOG.info("TRex is stopped...")
+ break
+ self.__start_local_server()
+
+ def __stop_server(self):
+ if self.generator_config.ip == '127.0.0.1':
+ ports = self.client.get_acquired_ports()
+ LOG.info('Release ports %s and stopping TRex...', ports)
+ try:
+ if ports:
+ self.client.release(ports=ports)
+ self.client.server_shutdown()
+ except STLError as e:
+ LOG.warning('Unable to stop TRex. Error: %s', e)
+ else:
+ LOG.info('Using remote TRex. Unable to stop TRex')
+
def resolve_arp(self):
"""Resolve all configured remote IP addresses.
dst_macs = [None] * chain_count
dst_macs_count = 0
# the index in the list is the chain id
- if self.config.vxlan:
+ if self.config.vxlan or self.config.mpls:
arps = [
ServiceARP(ctx,
src_ip=device.vtep_src_ip,
arp_dest_macs[port] = dst_macs
LOG.info('ARP resolved successfully for port %s', port)
break
- else:
- retry = attempt + 1
- LOG.info('Retrying ARP for: %s (retry %d/%d)',
- unresolved, retry, self.config.generic_retry_count)
- if retry < self.config.generic_retry_count:
- time.sleep(self.config.generic_poll_sec)
+
+ retry = attempt + 1
+ LOG.info('Retrying ARP for: %s (retry %d/%d)',
+ unresolved, retry, self.config.generic_retry_count)
+ if retry < self.config.generic_retry_count:
+ time.sleep(self.config.generic_poll_sec)
else:
LOG.error('ARP timed out for port %s (resolved %d out of %d)',
port,
chain_count)
break
- self.client.set_service_mode(ports=self.port_handle, enabled=False)
+ # if the capture from the TRex console was started before the arp request step,
+ # it keeps 'service_mode' enabled, otherwise, it disables the 'service_mode'
+ if not self.config.service_mode:
+ self.client.set_service_mode(ports=self.port_handle, enabled=False)
if len(arp_dest_macs) == len(self.port_handle):
return arp_dest_macs
return None
return {'result': True}
- def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
+ def create_traffic(self, l2frame_size, rates, bidirectional, latency=True, e2e=False):
"""Program all the streams in Trex server.
l2frame_size: L2 frame size or IMIX
each rate is a dict like {'rate_pps': '10kpps'}
bidirectional: True if bidirectional
latency: True if latency measurement is needed
+ e2e: True if performing "end to end" connectivity check
"""
r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
if not r['result']:
chain_id,
fwd_stream_cfg,
l2frame_size,
- latency=latency))
+ latency=latency,
+ e2e=e2e))
if len(self.rates) > 1:
streamblock[1].extend(self.generate_streams(self.port_handle[1],
chain_id,
rev_stream_cfg,
l2frame_size,
- latency=bidirectional and latency))
+ latency=bidirectional and latency,
+ e2e=e2e))
for port in self.port_handle:
+ if self.config.vxlan:
+ self.client.set_port_attr(ports=port, vxlan_fs=[4789])
+ else:
+ self.client.set_port_attr(ports=port, vxlan_fs=None)
self.client.add_streams(streamblock[port], ports=port)
LOG.info('Created %d traffic streams for port %s.', len(streamblock[port]), port)
if self.capture_id:
self.client.stop_capture(capture_id=self.capture_id['id'])
self.capture_id = None
- self.client.set_service_mode(ports=self.port_handle, enabled=False)
+ # if the capture from TRex console was started before the connectivity step,
+ # it keeps 'service_mode' enabled, otherwise, it disables the 'service_mode'
+ if not self.config.service_mode:
+ self.client.set_service_mode(ports=self.port_handle, enabled=False)
def cleanup(self):
"""Cleanup Trex driver."""
except STLError:
# TRex does not like a reset while in disconnected state
pass
+
+ def set_service_mode(self, enabled=True):
+ """Enable/disable the 'service_mode'."""
+ self.client.set_service_mode(ports=self.port_handle, enabled=enabled)