X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=nfvbench%2Ftraffic_gen%2Ftrex_gen.py;h=189c3e599aa98e22dc534b2ed5bf54543cb2f8e2;hb=057486b092e0a4bb1989121588eb5f8afdb8e1d3;hp=10937de2b2f4f78ec6dd9d0f6c69dcb6b0a1cd4b;hpb=e1887dcf96c21e96f259a94f5e6835d45b27dfdf;p=nfvbench.git diff --git a/nfvbench/traffic_gen/trex_gen.py b/nfvbench/traffic_gen/trex_gen.py index 10937de..189c3e5 100644 --- a/nfvbench/traffic_gen/trex_gen.py +++ b/nfvbench/traffic_gen/trex_gen.py @@ -20,17 +20,14 @@ import time import traceback from itertools import count +# pylint: disable=import-error +from scapy.contrib.mpls import MPLS # flake8: noqa +# pylint: enable=import-error from nfvbench.log import LOG from nfvbench.traffic_server import TRexTrafficServer from nfvbench.utils import cast_integer from nfvbench.utils import timeout from nfvbench.utils import TimeoutError -from traffic_base import AbstractTrafficGenerator -from traffic_base import TrafficGeneratorException -import traffic_utils as utils -from traffic_utils import IMIX_AVG_L2_FRAME_SIZE -from traffic_utils import IMIX_L2_SIZES -from traffic_utils import IMIX_RATIOS # pylint: disable=import-error from trex.common.services.trex_service_arp import ServiceARP @@ -57,9 +54,15 @@ from trex.stl.api import ThreeBytesField from trex.stl.api import UDP from trex.stl.api import XByteField - # pylint: enable=import-error +from .traffic_base import AbstractTrafficGenerator +from .traffic_base import TrafficGeneratorException +from . import traffic_utils as utils +from .traffic_utils import IMIX_AVG_L2_FRAME_SIZE +from .traffic_utils import IMIX_L2_SIZES +from .traffic_utils import IMIX_RATIOS + class VXLAN(Packet): """VxLAN class.""" @@ -245,11 +248,74 @@ class TRex(AbstractTrafficGenerator): else: latencies[port].min_usec = get_latency(lat['total_min']) latencies[port].avg_usec = get_latency(lat['average']) + # pick up the HDR histogram if present (otherwise will raise KeyError) + latencies[port].hdrh = lat['hdrh'] except KeyError: pass def __combine_latencies(self, in_stats, results, port_handle): - """Traverse TRex result dictionary and combines chosen latency stats.""" + """Traverse TRex result dictionary and combines chosen latency stats. + + example of latency dict returned by trex (2 chains): + 'latency': {256: {'err_cntrs': {'dropped': 0, + 'dup': 0, + 'out_of_order': 0, + 'seq_too_high': 0, + 'seq_too_low': 0}, + 'latency': {'average': 26.5, + 'hdrh': u'HISTFAAAAEx4nJNpmSgz...bFRgxi', + 'histogram': {20: 303, + 30: 320, + 40: 300, + 50: 73, + 60: 4, + 70: 1}, + 'jitter': 14, + 'last_max': 63, + 'total_max': 63, + 'total_min': 20}}, + 257: {'err_cntrs': {'dropped': 0, + 'dup': 0, + 'out_of_order': 0, + 'seq_too_high': 0, + 'seq_too_low': 0}, + 'latency': {'average': 29.75, + 'hdrh': u'HISTFAAAAEV4nJN...CALilDG0=', + 'histogram': {20: 261, + 30: 431, + 40: 3, + 50: 80, + 60: 225}, + 'jitter': 23, + 'last_max': 67, + 'total_max': 67, + 'total_min': 20}}, + 384: {'err_cntrs': {'dropped': 0, + 'dup': 0, + 'out_of_order': 0, + 'seq_too_high': 0, + 'seq_too_low': 0}, + 'latency': {'average': 18.0, + 'hdrh': u'HISTFAAAADR4nJNpm...MjCwDDxAZG', + 'histogram': {20: 987, 30: 14}, + 'jitter': 0, + 'last_max': 34, + 'total_max': 34, + 'total_min': 20}}, + 385: {'err_cntrs': {'dropped': 0, + 'dup': 0, + 'out_of_order': 0, + 'seq_too_high': 0, + 'seq_too_low': 0}, + 'latency': {'average': 19.0, + 'hdrh': u'HISTFAAAADR4nJNpm...NkYmJgDdagfK', + 'histogram': {20: 989, 30: 11}, + 'jitter': 0, + 'last_max': 38, + 'total_max': 38, + 'total_min': 20}}, + 'global': {'bad_hdr': 0, 'old_flow': 0}}, + """ total_max = 0 average = 0 total_min = float("inf") @@ -280,7 +346,7 @@ class TRex(AbstractTrafficGenerator): """ # Trex will add the FCS field, so we need to remove 4 bytes from the l2 frame size frame_size = int(l2frame_size) - 4 - + vm_param = [] if stream_cfg['vxlan'] is True: self._bind_vxlan() encap_level = '1' @@ -291,6 +357,26 @@ class TRex(AbstractTrafficGenerator): pkt_base /= UDP(sport=random.randint(1337, 32767), dport=4789) pkt_base /= VXLAN(vni=stream_cfg['net_vni']) pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) + # need to randomize the outer header UDP src port based on flow + vxlan_udp_src_fv = STLVmFlowVar( + name="vxlan_udp_src", + min_value=1337, + max_value=32767, + size=2, + op="random") + vm_param = [vxlan_udp_src_fv, + STLVmWrFlowVar(fv_name="vxlan_udp_src", pkt_offset="UDP.sport")] + elif stream_cfg['mpls'] is True: + encap_level = '0' + pkt_base = Ether(src=stream_cfg['vtep_src_mac'], dst=stream_cfg['vtep_dst_mac']) + if stream_cfg['vtep_vlan'] is not None: + pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan']) + if stream_cfg['mpls_outer_label'] is not None: + pkt_base /= MPLS(label=stream_cfg['mpls_outer_label'], cos=1, s=0, ttl=255) + if stream_cfg['mpls_inner_label'] is not None: + pkt_base /= MPLS(label=stream_cfg['mpls_inner_label'], cos=1, s=1, ttl=255) + # Flow stats and MPLS labels randomization TBD + pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) else: encap_level = '0' pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst']) @@ -301,19 +387,28 @@ class TRex(AbstractTrafficGenerator): udp_args = {} if stream_cfg['udp_src_port']: udp_args['sport'] = int(stream_cfg['udp_src_port']) + udp_args['sport_step'] = int(stream_cfg['udp_port_step']) + udp_args['sport_max'] = int(stream_cfg['udp_src_port_max']) if stream_cfg['udp_dst_port']: udp_args['dport'] = int(stream_cfg['udp_dst_port']) - pkt_base /= IP() / UDP(**udp_args) + udp_args['dport_step'] = int(stream_cfg['udp_port_step']) + udp_args['dport_max'] = int(stream_cfg['udp_dst_port_max']) + pkt_base /= IP(src=stream_cfg['ip_src_addr'], dst=stream_cfg['ip_dst_addr']) / \ + UDP(dport=udp_args['dport'], sport=udp_args['sport']) + if stream_cfg['ip_src_static'] is True: + src_max_ip_value = stream_cfg['ip_src_addr'] + else: + src_max_ip_value = stream_cfg['ip_src_addr_max'] if stream_cfg['ip_addrs_step'] == 'random': - src_fv = STLVmFlowVarRepeatableRandom( + src_fv_ip = STLVmFlowVarRepeatableRandom( name="ip_src", min_value=stream_cfg['ip_src_addr'], - max_value=stream_cfg['ip_src_addr_max'], + max_value=src_max_ip_value, size=4, seed=random.randint(0, 32767), limit=stream_cfg['ip_src_count']) - dst_fv = STLVmFlowVarRepeatableRandom( + dst_fv_ip = STLVmFlowVarRepeatableRandom( name="ip_dst", min_value=stream_cfg['ip_dst_addr'], max_value=stream_cfg['ip_dst_addr_max'], @@ -321,14 +416,14 @@ class TRex(AbstractTrafficGenerator): seed=random.randint(0, 32767), limit=stream_cfg['ip_dst_count']) else: - src_fv = STLVmFlowVar( + src_fv_ip = STLVmFlowVar( name="ip_src", min_value=stream_cfg['ip_src_addr'], - max_value=stream_cfg['ip_src_addr'], + max_value=src_max_ip_value, size=4, op="inc", step=stream_cfg['ip_addrs_step']) - dst_fv = STLVmFlowVar( + dst_fv_ip = STLVmFlowVar( name="ip_dst", min_value=stream_cfg['ip_dst_addr'], max_value=stream_cfg['ip_dst_addr_max'], @@ -336,20 +431,56 @@ class TRex(AbstractTrafficGenerator): op="inc", step=stream_cfg['ip_addrs_step']) + if stream_cfg['udp_port_step'] == 'random': + src_fv_port = STLVmFlowVarRepeatableRandom( + name="p_src", + min_value=udp_args['sport'], + max_value=udp_args['sport_max'], + size=2, + seed=random.randint(0, 32767), + limit=udp_args['udp_src_count']) + dst_fv_port = STLVmFlowVarRepeatableRandom( + name="p_dst", + min_value=udp_args['dport'], + max_value=udp_args['dport_max'], + size=2, + seed=random.randint(0, 32767), + limit=stream_cfg['udp_dst_count']) + else: + src_fv_port = STLVmFlowVar( + name="p_src", + min_value=udp_args['sport'], + max_value=udp_args['sport_max'], + size=2, + op="inc", + step=udp_args['sport_step']) + dst_fv_port = STLVmFlowVar( + name="p_dst", + min_value=udp_args['dport'], + max_value=udp_args['dport_max'], + size=2, + op="inc", + step=udp_args['dport_step']) vm_param = [ - src_fv, + src_fv_ip, STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP:{}.src".format(encap_level)), - dst_fv, + src_fv_port, + STLVmWrFlowVar(fv_name="p_src", pkt_offset="UDP:{}.sport".format(encap_level)), + dst_fv_ip, STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP:{}.dst".format(encap_level)), + dst_fv_port, + STLVmWrFlowVar(fv_name="p_dst", pkt_offset="UDP:{}.dport".format(encap_level)), STLVmFixChecksumHw(l3_offset="IP:{}".format(encap_level), l4_offset="UDP:{}".format(encap_level), l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP) ] pad = max(0, frame_size - len(pkt_base)) * 'x' - return STLPktBuilder(pkt=pkt_base / pad, vm=STLScVmRaw(vm_param)) + return STLPktBuilder(pkt=pkt_base / pad, + vm=STLScVmRaw(vm_param, cache_size=int(self.config.cache_size))) - def generate_streams(self, port, chain_id, stream_cfg, l2frame, latency=True): + def generate_streams(self, port, chain_id, stream_cfg, l2frame, latency=True, + e2e=False): """Create a list of streams corresponding to a given chain and stream config. port: port where the streams originate (0 or 1) @@ -357,15 +488,30 @@ class TRex(AbstractTrafficGenerator): stream_cfg: stream configuration l2frame: L2 frame size (including 4-byte FCS) or 'IMIX' latency: if True also create a latency stream + e2e: True if performing "end to end" connectivity check """ streams = [] pg_id, lat_pg_id = self.get_pg_id(port, chain_id) + if self.config.no_flow_stats: + LOG.info("Traffic flow statistics are disabled.") if l2frame == 'IMIX': for ratio, l2_frame_size in zip(IMIX_RATIOS, IMIX_L2_SIZES): pkt = self._create_pkt(stream_cfg, l2_frame_size) - streams.append(STLStream(packet=pkt, - flow_stats=STLFlowStats(pg_id=pg_id), - mode=STLTXCont(pps=ratio))) + if e2e or stream_cfg['mpls']: + streams.append(STLStream(packet=pkt, + mode=STLTXCont(pps=ratio))) + else: + if stream_cfg['vxlan'] is True: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=pg_id, + vxlan=True) + if not self.config.no_flow_stats else None, + mode=STLTXCont(pps=ratio))) + else: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=pg_id) + if not self.config.no_flow_stats else None, + mode=STLTXCont(pps=ratio))) if latency: # for IMIX, the latency packets have the average IMIX packet size @@ -374,9 +520,23 @@ class TRex(AbstractTrafficGenerator): else: l2frame_size = int(l2frame) pkt = self._create_pkt(stream_cfg, l2frame_size) - streams.append(STLStream(packet=pkt, - flow_stats=STLFlowStats(pg_id=pg_id), - mode=STLTXCont())) + if e2e or stream_cfg['mpls']: + streams.append(STLStream(packet=pkt, + # Flow stats is disabled for MPLS now + # flow_stats=STLFlowStats(pg_id=pg_id), + mode=STLTXCont())) + else: + if stream_cfg['vxlan'] is True: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=pg_id, + vxlan=True) + if not self.config.no_flow_stats else None, + mode=STLTXCont())) + else: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowStats(pg_id=pg_id) + if not self.config.no_flow_stats else None, + mode=STLTXCont())) # for the latency stream, the minimum payload is 16 bytes even in case of vlan tagging # without vlan, the min l2 frame size is 64 # with vlan it is 68 @@ -385,9 +545,19 @@ class TRex(AbstractTrafficGenerator): pkt = self._create_pkt(stream_cfg, 68) if latency: - streams.append(STLStream(packet=pkt, - flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id), - mode=STLTXCont(pps=self.LATENCY_PPS))) + if self.config.no_latency_stats: + LOG.info("Latency flow statistics are disabled.") + if stream_cfg['vxlan'] is True: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id, + vxlan=True) + if not self.config.no_latency_stats else None, + mode=STLTXCont(pps=self.LATENCY_PPS))) + else: + streams.append(STLStream(packet=pkt, + flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id) + if not self.config.no_latency_stats else None, + mode=STLTXCont(pps=self.LATENCY_PPS))) return streams @timeout(5) @@ -397,7 +567,7 @@ class TRex(AbstractTrafficGenerator): def __connect_after_start(self): # after start, Trex may take a bit of time to initialize # so we need to retry a few times - for it in xrange(self.config.generic_retry_count): + for it in range(self.config.generic_retry_count): try: time.sleep(1) self.client.connect() @@ -405,7 +575,7 @@ class TRex(AbstractTrafficGenerator): except Exception as ex: if it == (self.config.generic_retry_count - 1): raise - LOG.info("Retrying connection to TRex (%s)...", ex.message) + LOG.info("Retrying connection to TRex (%s)...", ex.msg) def connect(self): """Connect to the TRex server.""" @@ -417,30 +587,13 @@ class TRex(AbstractTrafficGenerator): async_port=self.generator_config.zmq_pub_port) try: self.__connect(self.client) + if server_ip == '127.0.0.1': + config_updated = self.__check_config() + if config_updated or self.config.restart: + self.__restart() except (TimeoutError, STLError) as e: if server_ip == '127.0.0.1': - try: - self.__start_server() - self.__connect_after_start() - except (TimeoutError, STLError) as e: - LOG.error('Cannot connect to TRex') - LOG.error(traceback.format_exc()) - logpath = '/tmp/trex.log' - if os.path.isfile(logpath): - # Wait for TRex to finish writing error message - last_size = 0 - for _ in xrange(self.config.generic_retry_count): - size = os.path.getsize(logpath) - if size == last_size: - # probably not writing anymore - break - last_size = size - time.sleep(1) - with open(logpath, 'r') as f: - message = f.read() - else: - message = e.message - raise TrafficGeneratorException(message) + self.__start_local_server() else: raise TrafficGeneratorException(e.message) @@ -475,10 +628,63 @@ class TRex(AbstractTrafficGenerator): (self.port_info[0]['speed'], self.port_info[1]['speed'])) + def __start_local_server(self): + try: + LOG.info("Starting TRex ...") + self.__start_server() + self.__connect_after_start() + except (TimeoutError, STLError) as e: + LOG.error('Cannot connect to TRex') + LOG.error(traceback.format_exc()) + logpath = '/tmp/trex.log' + if os.path.isfile(logpath): + # Wait for TRex to finish writing error message + last_size = 0 + for _ in range(self.config.generic_retry_count): + size = os.path.getsize(logpath) + if size == last_size: + # probably not writing anymore + break + last_size = size + time.sleep(1) + with open(logpath, 'r') as f: + message = f.read() + else: + message = e.message + raise TrafficGeneratorException(message) + def __start_server(self): server = TRexTrafficServer() server.run_server(self.generator_config) + def __check_config(self): + server = TRexTrafficServer() + return server.check_config_updated(self.generator_config) + + def __restart(self): + LOG.info("Restarting TRex ...") + self.__stop_server() + # Wait for server stopped + for _ in range(self.config.generic_retry_count): + time.sleep(1) + if not self.client.is_connected(): + LOG.info("TRex is stopped...") + break + self.__start_local_server() + + def __stop_server(self): + if self.generator_config.ip == '127.0.0.1': + ports = self.client.get_acquired_ports() + LOG.info('Release ports %s and stopping TRex...', ports) + try: + if ports: + self.client.release(ports=ports) + self.client.server_shutdown() + except STLError as e: + LOG.warning('Unable to stop TRex. Error: %s', e) + else: + LOG.info('Using remote TRex. Unable to stop TRex') + def resolve_arp(self): """Resolve all configured remote IP addresses. @@ -498,7 +704,7 @@ class TRex(AbstractTrafficGenerator): dst_macs = [None] * chain_count dst_macs_count = 0 # the index in the list is the chain id - if self.config.vxlan: + if self.config.vxlan or self.config.mpls: arps = [ ServiceARP(ctx, src_ip=device.vtep_src_ip, @@ -540,12 +746,12 @@ class TRex(AbstractTrafficGenerator): arp_dest_macs[port] = dst_macs LOG.info('ARP resolved successfully for port %s', port) break - else: - retry = attempt + 1 - LOG.info('Retrying ARP for: %s (retry %d/%d)', - unresolved, retry, self.config.generic_retry_count) - if retry < self.config.generic_retry_count: - time.sleep(self.config.generic_poll_sec) + + retry = attempt + 1 + LOG.info('Retrying ARP for: %s (retry %d/%d)', + unresolved, retry, self.config.generic_retry_count) + if retry < self.config.generic_retry_count: + time.sleep(self.config.generic_poll_sec) else: LOG.error('ARP timed out for port %s (resolved %d out of %d)', port, @@ -553,7 +759,10 @@ class TRex(AbstractTrafficGenerator): chain_count) break - self.client.set_service_mode(ports=self.port_handle, enabled=False) + # if the capture from the TRex console was started before the arp request step, + # it keeps 'service_mode' enabled, otherwise, it disables the 'service_mode' + if not self.config.service_mode: + self.client.set_service_mode(ports=self.port_handle, enabled=False) if len(arp_dest_macs) == len(self.port_handle): return arp_dest_macs return None @@ -581,7 +790,7 @@ class TRex(AbstractTrafficGenerator): return {'result': True} - def create_traffic(self, l2frame_size, rates, bidirectional, latency=True): + def create_traffic(self, l2frame_size, rates, bidirectional, latency=True, e2e=False): """Program all the streams in Trex server. l2frame_size: L2 frame size or IMIX @@ -589,6 +798,7 @@ class TRex(AbstractTrafficGenerator): each rate is a dict like {'rate_pps': '10kpps'} bidirectional: True if bidirectional latency: True if latency measurement is needed + e2e: True if performing "end to end" connectivity check """ r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency) if not r['result']: @@ -612,15 +822,21 @@ class TRex(AbstractTrafficGenerator): chain_id, fwd_stream_cfg, l2frame_size, - latency=latency)) + latency=latency, + e2e=e2e)) if len(self.rates) > 1: streamblock[1].extend(self.generate_streams(self.port_handle[1], chain_id, rev_stream_cfg, l2frame_size, - latency=bidirectional and latency)) + latency=bidirectional and latency, + e2e=e2e)) for port in self.port_handle: + if self.config.vxlan: + self.client.set_port_attr(ports=port, vxlan_fs=[4789]) + else: + self.client.set_port_attr(ports=port, vxlan_fs=None) self.client.add_streams(streamblock[port], ports=port) LOG.info('Created %d traffic streams for port %s.', len(streamblock[port]), port) @@ -673,8 +889,8 @@ class TRex(AbstractTrafficGenerator): bpf_filter = "ether dst %s or ether dst %s" % (src_mac_list[0], src_mac_list[1]) # ports must be set in service in order to enable capture self.client.set_service_mode(ports=self.port_handle) - self.capture_id = self.client.start_capture(rx_ports=self.port_handle, - bpf_filter=bpf_filter) + self.capture_id = self.client.start_capture \ + (rx_ports=self.port_handle, bpf_filter=bpf_filter) def fetch_capture_packets(self): """Fetch capture packets in capture mode.""" @@ -688,7 +904,10 @@ class TRex(AbstractTrafficGenerator): if self.capture_id: self.client.stop_capture(capture_id=self.capture_id['id']) self.capture_id = None - self.client.set_service_mode(ports=self.port_handle, enabled=False) + # if the capture from TRex console was started before the connectivity step, + # it keeps 'service_mode' enabled, otherwise, it disables the 'service_mode' + if not self.config.service_mode: + self.client.set_service_mode(ports=self.port_handle, enabled=False) def cleanup(self): """Cleanup Trex driver.""" @@ -699,3 +918,7 @@ class TRex(AbstractTrafficGenerator): except STLError: # TRex does not like a reset while in disconnected state pass + + def set_service_mode(self, enabled=True): + """Enable/disable the 'service_mode'.""" + self.client.set_service_mode(ports=self.port_handle, enabled=enabled)