X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=nfvbench%2Ftraffic_client.py;h=6d870f63cf80ee90c4f50cf83733029383ba9864;hb=24314713446b6411cedce4329ab5ebfd6da678a2;hp=44147106c131f966bebfbd1c04693e02f4a9eb36;hpb=391dcf76fefb747888a3411ae3b8df7b1ad26685;p=nfvbench.git diff --git a/nfvbench/traffic_client.py b/nfvbench/traffic_client.py index 4414710..6d870f6 100755 --- a/nfvbench/traffic_client.py +++ b/nfvbench/traffic_client.py @@ -23,7 +23,9 @@ from attrdict import AttrDict import bitmath from netaddr import IPNetwork # pylint: disable=import-error -from trex_stl_lib.api import STLError +from trex.stl.api import Ether +from trex.stl.api import STLError +from trex.stl.api import UDP # pylint: enable=import-error from log import LOG @@ -44,12 +46,13 @@ class TrafficClientException(Exception): class TrafficRunner(object): """Serialize various steps required to run traffic.""" - def __init__(self, client, duration_sec, interval_sec=0): + def __init__(self, client, duration_sec, interval_sec=0, service_mode=False): """Create a traffic runner.""" self.client = client self.start_time = None self.duration_sec = duration_sec self.interval_sec = interval_sec + self.service_mode = service_mode def run(self): """Clear stats and instruct the traffic generator to start generating traffic.""" @@ -57,6 +60,10 @@ class TrafficRunner(object): return None LOG.info('Running traffic generator') self.client.gen.clear_stats() + # Debug use only : new '--service-mode' option available for the NFVBench command line. + # A read-only mode TRex console would be able to capture the generated traffic. + self.client.gen.set_service_mode(enabled=self.service_mode) + LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis') self.client.gen.start_traffic() self.start_time = time.time() return self.poll_stats() @@ -144,17 +151,26 @@ class Device(object): identified as port 0 or port 1. """ - def __init__(self, port, generator_config, vtep_vlan=None): + def __init__(self, port, generator_config): """Create a new device for a given port.""" self.generator_config = generator_config self.chain_count = generator_config.service_chain_count self.flow_count = generator_config.flow_count / 2 self.port = port self.switch_port = generator_config.interfaces[port].get('switch_port', None) - self.vtep_vlan = vtep_vlan + self.vtep_vlan = None + self.vtep_src_mac = None + self.vxlan = False self.pci = generator_config.interfaces[port].pci self.mac = None self.dest_macs = None + self.vtep_dst_mac = None + self.vtep_dst_ip = None + if generator_config.vteps is None: + self.vtep_src_ip = None + else: + self.vtep_src_ip = generator_config.vteps[port] + self.vnis = None self.vlans = None self.ip_addrs = generator_config.ip_addrs[port] subnet = IPNetwork(self.ip_addrs) @@ -181,15 +197,62 @@ class Device(object): """Get the peer device (device 0 -> device 1, or device 1 -> device 0).""" return self.generator_config.devices[1 - self.port] + def set_vtep_dst_mac(self, dest_macs): + """Set the list of dest MACs indexed by the chain id. + + This is only called in 2 cases: + - VM macs discovered using openstack API + - dest MACs provisioned in config file + """ + self.vtep_dst_mac = map(str, dest_macs) + def set_dest_macs(self, dest_macs): - """Set the list of dest MACs indexed by the chain id.""" + """Set the list of dest MACs indexed by the chain id. + + This is only called in 2 cases: + - VM macs discovered using openstack API + - dest MACs provisioned in config file + """ self.dest_macs = map(str, dest_macs) + def get_dest_macs(self): + """Get the list of dest macs for this device. + + If set_dest_macs was never called, assumes l2-loopback and return + a list of peer mac (as many as chains but normally only 1 chain) + """ + if self.dest_macs: + return self.dest_macs + # assume this is l2-loopback + return [self.get_peer_device().mac] * self.chain_count + def set_vlans(self, vlans): """Set the list of vlans to use indexed by the chain id.""" self.vlans = vlans LOG.info("Port %d: VLANs %s", self.port, self.vlans) + def set_vtep_vlan(self, vlan): + """Set the vtep vlan to use indexed by specific port.""" + self.vtep_vlan = vlan + self.vxlan = True + self.vlan_tagging = None + LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan) + + def set_vxlan_endpoints(self, src_ip, dst_ip): + self.vtep_dst_ip = dst_ip + self.vtep_src_ip = src_ip + LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port, + self.vtep_src_ip, self.vtep_dst_ip) + + def set_vxlans(self, vnis): + self.vnis = vnis + LOG.info("Port %d: VNIs %s", self.port, self.vnis) + + def set_gw_ip(self, gateway_ip): + self.gw_ip_block = IpBlock(gateway_ip, + self.generator_config.gateway_ip_addrs_step, + self.chain_count) + def get_gw_ip(self, chain_index): """Retrieve the IP address assigned for the gateway of a given chain.""" return self.gw_ip_block.get_ip(chain_index) @@ -211,16 +274,16 @@ class Device(object): peer = self.get_peer_device() self.ip_block.reset_reservation() peer.ip_block.reset_reservation() + dest_macs = self.get_dest_macs() for chain_idx in xrange(self.chain_count): src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count) dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count) - dest_mac = self.dest_macs[chain_idx] if self.dest_macs else peer.mac configs.append({ 'count': cur_chain_flow_count, 'mac_src': self.mac, - 'mac_dst': dest_mac, + 'mac_dst': dest_macs[chain_idx], 'ip_src_addr': src_ip_first, 'ip_src_addr_max': src_ip_last, 'ip_src_count': cur_chain_flow_count, @@ -233,7 +296,14 @@ class Device(object): 'mac_discovery_gw': self.get_gw_ip(chain_idx), 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx), 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx), - 'vlan_tag': self.vlans[chain_idx] if self.vlans else None + 'vlan_tag': self.vlans[chain_idx] if self.vlans else None, + 'vxlan': self.vxlan, + 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None, + 'vtep_src_mac': self.mac if self.vxlan is True else None, + 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None, + 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None, + 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None, + 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None }) # after first chain, fall back to the flow count for all other chains cur_chain_flow_count = flows_per_chain @@ -270,18 +340,28 @@ class GeneratorConfig(object): # copy over fields from the dict self.tool = gen_config.tool self.ip = gen_config.ip - self.cores = gen_config.get('cores', 1) + # overrides on config.cores and config.mbuf_factor + if config.cores: + self.cores = config.cores + else: + self.cores = gen_config.get('cores', 1) + self.mbuf_factor = config.mbuf_factor + self.mbuf_64 = config.mbuf_64 + self.hdrh = not config.disable_hdrh if gen_config.intf_speed: # interface speed is overriden from config self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits else: # interface speed is discovered/provided by the traffic generator self.intf_speed = 0 + self.name = gen_config.name + self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500) + self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501) + self.limit_memory = gen_config.get('limit_memory', 1024) self.software_mode = gen_config.get('software_mode', False) self.interfaces = gen_config.interfaces if self.interfaces[0].port != 0 or self.interfaces[1].port != 1: raise TrafficClientException('Invalid port order/id in generator_profile.interfaces') - self.service_chain = config.service_chain self.service_chain_count = config.service_chain_count self.flow_count = config.flow_count @@ -296,6 +376,7 @@ class GeneratorConfig(object): self.gateway_ips = gen_config.gateway_ip_addrs self.udp_src_port = gen_config.udp_src_port self.udp_dst_port = gen_config.udp_dst_port + self.vteps = gen_config.get('vteps') self.devices = [Device(port, self) for port in [0, 1]] # This should normally always be [0, 1] self.ports = [device.port for device in self.devices] @@ -322,12 +403,29 @@ class GeneratorConfig(object): port_index: the port for which dest macs must be set dest_macs: a list of dest MACs indexed by chain id """ - if len(dest_macs) != self.config.service_chain_count: + if len(dest_macs) < self.config.service_chain_count: raise TrafficClientException('Dest MAC list %s must have %d entries' % (dest_macs, self.config.service_chain_count)) - self.devices[port_index].set_dest_macs(dest_macs) + # only pass the first scc dest MACs + self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count]) LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs]) + def set_vtep_dest_macs(self, port_index, dest_macs): + """Set the list of dest MACs indexed by the chain id on given port. + + port_index: the port for which dest macs must be set + dest_macs: a list of dest MACs indexed by chain id + """ + if len(dest_macs) != self.config.service_chain_count: + raise TrafficClientException('Dest MAC list %s must have %d entries' % + (dest_macs, self.config.service_chain_count)) + self.devices[port_index].set_vtep_dst_mac(dest_macs) + LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs])) + + def get_dest_macs(self): + """Return the list of dest macs indexed by port.""" + return [dev.get_dest_macs() for dev in self.devices] + def set_vlans(self, port_index, vlans): """Set the list of vlans to use indexed by the chain id on given port. @@ -339,6 +437,26 @@ class GeneratorConfig(object): (vlans, self.config.service_chain_count)) self.devices[port_index].set_vlans(vlans) + def set_vxlans(self, port_index, vxlans): + """Set the list of vxlans (VNIs) to use indexed by the chain id on given port. + + port_index: the port for which VXLANs must be set + VXLANs: a list of VNIs lists indexed by chain id + """ + if len(vxlans) != self.config.service_chain_count: + raise TrafficClientException('VXLAN list %s must have %d entries' % + (vxlans, self.config.service_chain_count)) + self.devices[port_index].set_vxlans(vxlans) + + def set_vtep_vlan(self, port_index, vlan): + """Set the vtep vlan to use indexed by the chain id on given port. + port_index: the port for which VLAN must be set + """ + self.devices[port_index].set_vtep_vlan(vlan) + + def set_vxlan_endpoints(self, port_index, src_ip, dst_ip): + self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip) + @staticmethod def __match_generator_profile(traffic_generator, generator_profile): gen_config = AttrDict(traffic_generator) @@ -373,7 +491,8 @@ class TrafficClient(object): self.notifier = notifier self.interval_collector = None self.iteration_collector = None - self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec) + self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec, + self.config.service_mode) self.config.frame_sizes = self._get_frame_sizes() self.run_config = { 'l2frame_size': None, @@ -393,8 +512,8 @@ class TrafficClient(object): def _get_generator(self): tool = self.tool.lower() if tool == 'trex': - from traffic_gen import trex - return trex.TRex(self) + from traffic_gen import trex_gen + return trex_gen.TRex(self) if tool == 'dummy': from traffic_gen import dummy return dummy.DummyTG(self) @@ -443,7 +562,6 @@ class TrafficClient(object): def setup(self): """Set up the traffic client.""" - self.gen.set_mode() self.gen.clear_stats() def get_version(self): @@ -472,44 +590,76 @@ class TrafficClient(object): LOG.info('Starting traffic generator to ensure end-to-end connectivity') # send 2pps on each chain and each direction rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)} - self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False) - + self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False, + e2e=True) # ensures enough traffic is coming back retry_count = (self.config.check_traffic_time_sec + self.config.generic_poll_sec - 1) / self.config.generic_poll_sec - mac_addresses = set() # we expect to see packets coming from 2 unique MAC per chain - unique_src_mac_count = self.config.service_chain_count * 2 + # because there can be flooding in the case of shared net + # we must verify that packets from the right VMs are received + # and not just count unique src MAC + # create a dict of (port, chain) tuples indexed by dest mac + mac_map = {} + for port, dest_macs in enumerate(self.generator_config.get_dest_macs()): + for chain, mac in enumerate(dest_macs): + mac_map[mac] = (port, chain) + unique_src_mac_count = len(mac_map) + if self.config.vxlan and self.config.traffic_generator.vtep_vlan: + get_mac_id = lambda packet: packet['binary'][60:66] + elif self.config.vxlan: + get_mac_id = lambda packet: packet['binary'][56:62] + else: + get_mac_id = lambda packet: packet['binary'][6:12] for it in xrange(retry_count): self.gen.clear_stats() self.gen.start_traffic() self.gen.start_capture() LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...', - len(mac_addresses), unique_src_mac_count, + unique_src_mac_count - len(mac_map), unique_src_mac_count, it + 1, retry_count) if not self.skip_sleep(): time.sleep(self.config.generic_poll_sec) self.gen.stop_traffic() self.gen.fetch_capture_packets() self.gen.stop_capture() - for packet in self.gen.packet_list: - src_mac = packet['binary'][6:12] - if src_mac not in mac_addresses: - LOG.info('Received packet from mac: %s', - ':'.join(["%02x" % ord(x) for x in src_mac])) - mac_addresses.add(src_mac) - - if len(mac_addresses) == unique_src_mac_count: + mac_id = get_mac_id(packet) + src_mac = ':'.join(["%02x" % ord(x) for x in mac_id]) + if src_mac in mac_map and self.is_udp(packet): + port, chain = mac_map[src_mac] + LOG.info('Received packet from mac: %s (chain=%d, port=%d)', + src_mac, chain, port) + mac_map.pop(src_mac, None) + + if not mac_map: LOG.info('End-to-end connectivity established') return - + if self.config.l3_router and not self.config.no_arp: + # In case of L3 traffic mode, routers are not able to route traffic + # until VM interfaces are up and ARP requests are done + LOG.info('Waiting for loopback service completely started...') + LOG.info('Sending ARP request to assure end-to-end connectivity established') + self.ensure_arp_successful() raise TrafficClientException('End-to-end connectivity cannot be ensured') + def is_udp(self, packet): + pkt = Ether(packet['binary']) + return UDP in pkt + def ensure_arp_successful(self): """Resolve all IP using ARP and throw an exception in case of failure.""" - if not self.gen.resolve_arp(): + dest_macs = self.gen.resolve_arp() + if dest_macs: + # all dest macs are discovered, saved them into the generator config + if self.config.vxlan: + self.generator_config.set_vtep_dest_macs(0, dest_macs[0]) + self.generator_config.set_vtep_dest_macs(1, dest_macs[1]) + else: + self.generator_config.set_dest_macs(0, dest_macs[0]) + self.generator_config.set_dest_macs(1, dest_macs[1]) + else: raise TrafficClientException('ARP cannot be resolved') def set_traffic(self, frame_size, bidirectional): @@ -529,7 +679,10 @@ class TrafficClient(object): self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']} self.gen.clear_streamblock() - self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True) + if self.config.no_latency_streams: + LOG.info("Latency streams are disabled") + self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, + latency=not self.config.no_latency_streams) def _modify_load(self, load): self.current_total_rate = {'rate_percent': str(load)} @@ -903,7 +1056,29 @@ class TrafficClient(object): # interface stats for the pps because it could have been modified to contain # additional interface stats self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx) - + # special handling for vxlan + # in case of vxlan, flow stats are not available so all rx counters will be + # zeros when the total rx port counter is non zero. + # in that case, + for port in range(2): + total_rx = 0 + for ifs in self.ifstats: + total_rx += ifs[port].rx + if total_rx == 0: + # check if the total port rx from Trex is also zero + port_rx = stats[port]['rx']['total_pkts'] + if port_rx: + # the total rx for all chains from port level stats is non zero + # which means that the per-chain stats are not available + if len(self.ifstats) == 1: + # only one chain, simply report the port level rx to the chain rx stats + self.ifstats[0][port].rx = port_rx + else: + for ifs in self.ifstats: + # mark this data as unavailable + ifs[port].rx = None + # pitch in the total rx only in the last chain pps + self.ifstats[-1][port].rx_total = port_rx @staticmethod def compare_tx_rates(required, actual):