NFVBENCH-155 Add options to disable extra stats, latency stats and latency streams
[nfvbench.git] / nfvbench / traffic_client.py
index 4414710..6d870f6 100755 (executable)
@@ -23,7 +23,9 @@ from attrdict import AttrDict
 import bitmath
 from netaddr import IPNetwork
 # pylint: disable=import-error
-from trex_stl_lib.api import STLError
+from trex.stl.api import Ether
+from trex.stl.api import STLError
+from trex.stl.api import UDP
 # pylint: enable=import-error
 
 from log import LOG
@@ -44,12 +46,13 @@ class TrafficClientException(Exception):
 class TrafficRunner(object):
     """Serialize various steps required to run traffic."""
 
-    def __init__(self, client, duration_sec, interval_sec=0):
+    def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
         """Create a traffic runner."""
         self.client = client
         self.start_time = None
         self.duration_sec = duration_sec
         self.interval_sec = interval_sec
+        self.service_mode = service_mode
 
     def run(self):
         """Clear stats and instruct the traffic generator to start generating traffic."""
@@ -57,6 +60,10 @@ class TrafficRunner(object):
             return None
         LOG.info('Running traffic generator')
         self.client.gen.clear_stats()
+        # Debug use only : new '--service-mode' option available for the NFVBench command line.
+        # A read-only mode TRex console would be able to capture the generated traffic.
+        self.client.gen.set_service_mode(enabled=self.service_mode)
+        LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
         self.client.gen.start_traffic()
         self.start_time = time.time()
         return self.poll_stats()
@@ -144,17 +151,26 @@ class Device(object):
     identified as port 0 or port 1.
     """
 
-    def __init__(self, port, generator_config, vtep_vlan=None):
+    def __init__(self, port, generator_config):
         """Create a new device for a given port."""
         self.generator_config = generator_config
         self.chain_count = generator_config.service_chain_count
         self.flow_count = generator_config.flow_count / 2
         self.port = port
         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
-        self.vtep_vlan = vtep_vlan
+        self.vtep_vlan = None
+        self.vtep_src_mac = None
+        self.vxlan = False
         self.pci = generator_config.interfaces[port].pci
         self.mac = None
         self.dest_macs = None
+        self.vtep_dst_mac = None
+        self.vtep_dst_ip = None
+        if generator_config.vteps is None:
+            self.vtep_src_ip = None
+        else:
+            self.vtep_src_ip = generator_config.vteps[port]
+        self.vnis = None
         self.vlans = None
         self.ip_addrs = generator_config.ip_addrs[port]
         subnet = IPNetwork(self.ip_addrs)
@@ -181,15 +197,62 @@ class Device(object):
         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
         return self.generator_config.devices[1 - self.port]
 
+    def set_vtep_dst_mac(self, dest_macs):
+        """Set the list of dest MACs indexed by the chain id.
+
+        This is only called in 2 cases:
+        - VM macs discovered using openstack API
+        - dest MACs provisioned in config file
+        """
+        self.vtep_dst_mac = map(str, dest_macs)
+
     def set_dest_macs(self, dest_macs):
-        """Set the list of dest MACs indexed by the chain id."""
+        """Set the list of dest MACs indexed by the chain id.
+
+        This is only called in 2 cases:
+        - VM macs discovered using openstack API
+        - dest MACs provisioned in config file
+        """
         self.dest_macs = map(str, dest_macs)
 
+    def get_dest_macs(self):
+        """Get the list of dest macs for this device.
+
+        If set_dest_macs was never called, assumes l2-loopback and return
+        a list of peer mac (as many as chains but normally only 1 chain)
+        """
+        if self.dest_macs:
+            return self.dest_macs
+        # assume this is l2-loopback
+        return [self.get_peer_device().mac] * self.chain_count
+
     def set_vlans(self, vlans):
         """Set the list of vlans to use indexed by the chain id."""
         self.vlans = vlans
         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
 
+    def set_vtep_vlan(self, vlan):
+        """Set the vtep vlan to use indexed by specific port."""
+        self.vtep_vlan = vlan
+        self.vxlan = True
+        self.vlan_tagging = None
+        LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
+
+    def set_vxlan_endpoints(self, src_ip, dst_ip):
+        self.vtep_dst_ip = dst_ip
+        self.vtep_src_ip = src_ip
+        LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
+                 self.vtep_src_ip, self.vtep_dst_ip)
+
+    def set_vxlans(self, vnis):
+        self.vnis = vnis
+        LOG.info("Port %d: VNIs %s", self.port, self.vnis)
+
+    def set_gw_ip(self, gateway_ip):
+        self.gw_ip_block = IpBlock(gateway_ip,
+                                   self.generator_config.gateway_ip_addrs_step,
+                                   self.chain_count)
+
     def get_gw_ip(self, chain_index):
         """Retrieve the IP address assigned for the gateway of a given chain."""
         return self.gw_ip_block.get_ip(chain_index)
@@ -211,16 +274,16 @@ class Device(object):
         peer = self.get_peer_device()
         self.ip_block.reset_reservation()
         peer.ip_block.reset_reservation()
+        dest_macs = self.get_dest_macs()
 
         for chain_idx in xrange(self.chain_count):
             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
 
-            dest_mac = self.dest_macs[chain_idx] if self.dest_macs else peer.mac
             configs.append({
                 'count': cur_chain_flow_count,
                 'mac_src': self.mac,
-                'mac_dst': dest_mac,
+                'mac_dst': dest_macs[chain_idx],
                 'ip_src_addr': src_ip_first,
                 'ip_src_addr_max': src_ip_last,
                 'ip_src_count': cur_chain_flow_count,
@@ -233,7 +296,14 @@ class Device(object):
                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
-                'vlan_tag': self.vlans[chain_idx] if self.vlans else None
+                'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
+                'vxlan': self.vxlan,
+                'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
+                'vtep_src_mac': self.mac if self.vxlan is True else None,
+                'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
+                'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
+                'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
+                'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
             })
             # after first chain, fall back to the flow count for all other chains
             cur_chain_flow_count = flows_per_chain
@@ -270,18 +340,28 @@ class GeneratorConfig(object):
         # copy over fields from the dict
         self.tool = gen_config.tool
         self.ip = gen_config.ip
-        self.cores = gen_config.get('cores', 1)
+        # overrides on config.cores and config.mbuf_factor
+        if config.cores:
+            self.cores = config.cores
+        else:
+            self.cores = gen_config.get('cores', 1)
+        self.mbuf_factor = config.mbuf_factor
+        self.mbuf_64 = config.mbuf_64
+        self.hdrh = not config.disable_hdrh
         if gen_config.intf_speed:
             # interface speed is overriden from config
             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
         else:
             # interface speed is discovered/provided by the traffic generator
             self.intf_speed = 0
+        self.name = gen_config.name
+        self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
+        self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
+        self.limit_memory = gen_config.get('limit_memory', 1024)
         self.software_mode = gen_config.get('software_mode', False)
         self.interfaces = gen_config.interfaces
         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
-
         self.service_chain = config.service_chain
         self.service_chain_count = config.service_chain_count
         self.flow_count = config.flow_count
@@ -296,6 +376,7 @@ class GeneratorConfig(object):
         self.gateway_ips = gen_config.gateway_ip_addrs
         self.udp_src_port = gen_config.udp_src_port
         self.udp_dst_port = gen_config.udp_dst_port
+        self.vteps = gen_config.get('vteps')
         self.devices = [Device(port, self) for port in [0, 1]]
         # This should normally always be [0, 1]
         self.ports = [device.port for device in self.devices]
@@ -322,12 +403,29 @@ class GeneratorConfig(object):
         port_index: the port for which dest macs must be set
         dest_macs: a list of dest MACs indexed by chain id
         """
-        if len(dest_macs) != self.config.service_chain_count:
+        if len(dest_macs) < self.config.service_chain_count:
             raise TrafficClientException('Dest MAC list %s must have %d entries' %
                                          (dest_macs, self.config.service_chain_count))
-        self.devices[port_index].set_dest_macs(dest_macs)
+        # only pass the first scc dest MACs
+        self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
 
+    def set_vtep_dest_macs(self, port_index, dest_macs):
+        """Set the list of dest MACs indexed by the chain id on given port.
+
+        port_index: the port for which dest macs must be set
+        dest_macs: a list of dest MACs indexed by chain id
+        """
+        if len(dest_macs) != self.config.service_chain_count:
+            raise TrafficClientException('Dest MAC list %s must have %d entries' %
+                                         (dest_macs, self.config.service_chain_count))
+        self.devices[port_index].set_vtep_dst_mac(dest_macs)
+        LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
+
+    def get_dest_macs(self):
+        """Return the list of dest macs indexed by port."""
+        return [dev.get_dest_macs() for dev in self.devices]
+
     def set_vlans(self, port_index, vlans):
         """Set the list of vlans to use indexed by the chain id on given port.
 
@@ -339,6 +437,26 @@ class GeneratorConfig(object):
                                          (vlans, self.config.service_chain_count))
         self.devices[port_index].set_vlans(vlans)
 
+    def set_vxlans(self, port_index, vxlans):
+        """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
+
+        port_index: the port for which VXLANs must be set
+        VXLANs: a  list of VNIs lists indexed by chain id
+        """
+        if len(vxlans) != self.config.service_chain_count:
+            raise TrafficClientException('VXLAN list %s must have %d entries' %
+                                         (vxlans, self.config.service_chain_count))
+        self.devices[port_index].set_vxlans(vxlans)
+
+    def set_vtep_vlan(self, port_index, vlan):
+        """Set the vtep vlan to use indexed by the chain id on given port.
+        port_index: the port for which VLAN must be set
+        """
+        self.devices[port_index].set_vtep_vlan(vlan)
+
+    def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
+        self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
+
     @staticmethod
     def __match_generator_profile(traffic_generator, generator_profile):
         gen_config = AttrDict(traffic_generator)
@@ -373,7 +491,8 @@ class TrafficClient(object):
         self.notifier = notifier
         self.interval_collector = None
         self.iteration_collector = None
-        self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
+        self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
+                                    self.config.service_mode)
         self.config.frame_sizes = self._get_frame_sizes()
         self.run_config = {
             'l2frame_size': None,
@@ -393,8 +512,8 @@ class TrafficClient(object):
     def _get_generator(self):
         tool = self.tool.lower()
         if tool == 'trex':
-            from traffic_gen import trex
-            return trex.TRex(self)
+            from traffic_gen import trex_gen
+            return trex_gen.TRex(self)
         if tool == 'dummy':
             from traffic_gen import dummy
             return dummy.DummyTG(self)
@@ -443,7 +562,6 @@ class TrafficClient(object):
 
     def setup(self):
         """Set up the traffic client."""
-        self.gen.set_mode()
         self.gen.clear_stats()
 
     def get_version(self):
@@ -472,44 +590,76 @@ class TrafficClient(object):
         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
         # send 2pps on each chain and each direction
         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
-        self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
-
+        self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
+                                e2e=True)
         # ensures enough traffic is coming back
         retry_count = (self.config.check_traffic_time_sec +
                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
-        mac_addresses = set()
 
         # we expect to see packets coming from 2 unique MAC per chain
-        unique_src_mac_count = self.config.service_chain_count * 2
+        # because there can be flooding in the case of shared net
+        # we must verify that packets from the right VMs are received
+        # and not just count unique src MAC
+        # create a dict of (port, chain) tuples indexed by dest mac
+        mac_map = {}
+        for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
+            for chain, mac in enumerate(dest_macs):
+                mac_map[mac] = (port, chain)
+        unique_src_mac_count = len(mac_map)
+        if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
+            get_mac_id = lambda packet: packet['binary'][60:66]
+        elif self.config.vxlan:
+            get_mac_id = lambda packet: packet['binary'][56:62]
+        else:
+            get_mac_id = lambda packet: packet['binary'][6:12]
         for it in xrange(retry_count):
             self.gen.clear_stats()
             self.gen.start_traffic()
             self.gen.start_capture()
             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
-                     len(mac_addresses), unique_src_mac_count,
+                     unique_src_mac_count - len(mac_map), unique_src_mac_count,
                      it + 1, retry_count)
             if not self.skip_sleep():
                 time.sleep(self.config.generic_poll_sec)
             self.gen.stop_traffic()
             self.gen.fetch_capture_packets()
             self.gen.stop_capture()
-
             for packet in self.gen.packet_list:
-                src_mac = packet['binary'][6:12]
-                if src_mac not in mac_addresses:
-                    LOG.info('Received packet from mac: %s',
-                             ':'.join(["%02x" % ord(x) for x in src_mac]))
-                    mac_addresses.add(src_mac)
-
-                if len(mac_addresses) == unique_src_mac_count:
+                mac_id = get_mac_id(packet)
+                src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
+                if src_mac in mac_map and self.is_udp(packet):
+                    port, chain = mac_map[src_mac]
+                    LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
+                             src_mac, chain, port)
+                    mac_map.pop(src_mac, None)
+
+                if not mac_map:
                     LOG.info('End-to-end connectivity established')
                     return
-
+            if self.config.l3_router and not self.config.no_arp:
+                # In case of L3 traffic mode, routers are not able to route traffic
+                # until VM interfaces are up and ARP requests are done
+                LOG.info('Waiting for loopback service completely started...')
+                LOG.info('Sending ARP request to assure end-to-end connectivity established')
+                self.ensure_arp_successful()
         raise TrafficClientException('End-to-end connectivity cannot be ensured')
 
+    def is_udp(self, packet):
+        pkt = Ether(packet['binary'])
+        return UDP in pkt
+
     def ensure_arp_successful(self):
         """Resolve all IP using ARP and throw an exception in case of failure."""
-        if not self.gen.resolve_arp():
+        dest_macs = self.gen.resolve_arp()
+        if dest_macs:
+            # all dest macs are discovered, saved them into the generator config
+            if self.config.vxlan:
+                self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
+                self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
+            else:
+                self.generator_config.set_dest_macs(0, dest_macs[0])
+                self.generator_config.set_dest_macs(1, dest_macs[1])
+        else:
             raise TrafficClientException('ARP cannot be resolved')
 
     def set_traffic(self, frame_size, bidirectional):
@@ -529,7 +679,10 @@ class TrafficClient(object):
                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
 
         self.gen.clear_streamblock()
-        self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
+        if self.config.no_latency_streams:
+            LOG.info("Latency streams are disabled")
+        self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
+                                latency=not self.config.no_latency_streams)
 
     def _modify_load(self, load):
         self.current_total_rate = {'rate_percent': str(load)}
@@ -903,7 +1056,29 @@ class TrafficClient(object):
                 # interface stats for the pps because it could have been modified to contain
                 # additional interface stats
                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
-
+            # special handling for vxlan
+            # in case of vxlan, flow stats are not available so all rx counters will be
+            # zeros when the total rx port counter is non zero.
+            # in that case,
+            for port in range(2):
+                total_rx = 0
+                for ifs in self.ifstats:
+                    total_rx += ifs[port].rx
+                if total_rx == 0:
+                    # check if the total port rx from Trex is also zero
+                    port_rx = stats[port]['rx']['total_pkts']
+                    if port_rx:
+                        # the total rx for all chains from port level stats is non zero
+                        # which means that the per-chain stats are not available
+                        if len(self.ifstats) == 1:
+                            # only one chain, simply report the port level rx to the chain rx stats
+                            self.ifstats[0][port].rx = port_rx
+                        else:
+                            for ifs in self.ifstats:
+                                # mark this data as unavailable
+                                ifs[port].rx = None
+                            # pitch in the total rx only in the last chain pps
+                            self.ifstats[-1][port].rx_total = port_rx
 
     @staticmethod
     def compare_tx_rates(required, actual):