X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=nfvbench%2Ftraffic_client.py;h=47af2651ab419f9a6b84d7a9a2bb7100d4ec8333;hb=8ecfd4c886507fe602398a8623e6044d40ea8090;hp=7542d0b96863faf0d6dfb12252ebce4f0d3bf003;hpb=34c79fa7279ead3180d4a0096e9bdeaef3907cf2;p=nfvbench.git diff --git a/nfvbench/traffic_client.py b/nfvbench/traffic_client.py old mode 100644 new mode 100755 index 7542d0b..47af265 --- a/nfvbench/traffic_client.py +++ b/nfvbench/traffic_client.py @@ -12,57 +12,90 @@ # License for the specific language governing permissions and limitations # under the License. -from attrdict import AttrDict -import bitmath -from datetime import datetime -from log import LOG -from netaddr import IPNetwork -from network import Interface +"""Interface to the traffic generator clients including NDR/PDR binary search.""" import socket -from specs import ChainType -from stats_collector import IntervalCollector -from stats_collector import IterationCollector import struct import time -import traffic_gen.traffic_utils as utils +import sys +from attrdict import AttrDict +import bitmath +from hdrh.histogram import HdrHistogram +from netaddr import IPNetwork +# pylint: disable=import-error +from trex.stl.api import Ether +from trex.stl.api import STLError +from trex.stl.api import UDP +# pylint: disable=wrong-import-order +from scapy.contrib.mpls import MPLS # flake8: noqa +# pylint: enable=wrong-import-order +# pylint: enable=import-error + +from .log import LOG +from .packet_stats import InterfaceStats +from .packet_stats import PacketPathStats +from .stats_collector import IntervalCollector +from .stats_collector import IterationCollector +from .traffic_gen import traffic_utils as utils +from .utils import cast_integer, find_max_size, find_tuples_equal_to_lcm_value, get_divisors, lcm class TrafficClientException(Exception): - pass - + """Generic traffic client exception.""" class TrafficRunner(object): + """Serialize various steps required to run traffic.""" - def __init__(self, client, duration_sec, interval_sec=0): + def __init__(self, client, duration_sec, interval_sec=0, service_mode=False): + """Create a traffic runner.""" self.client = client self.start_time = None self.duration_sec = duration_sec self.interval_sec = interval_sec + self.service_mode = service_mode def run(self): + """Clear stats and instruct the traffic generator to start generating traffic.""" + if self.is_running(): + return None LOG.info('Running traffic generator') self.client.gen.clear_stats() + # Debug use only: the service_mode flag may have been set in + # the configuration, in order to enable the 'service' mode + # in the trex generator, before starting the traffic (run). + # From this point, a T-rex console (launched in readonly mode) would + # then be able to capture the transmitted and/or received traffic. + self.client.gen.set_service_mode(enabled=self.service_mode) + LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis') self.client.gen.start_traffic() self.start_time = time.time() return self.poll_stats() def stop(self): + """Stop the current run and instruct the traffic generator to stop traffic.""" if self.is_running(): self.start_time = None self.client.gen.stop_traffic() def is_running(self): + """Check if a run is still pending.""" return self.start_time is not None def time_elapsed(self): + """Return time elapsed since start of run.""" if self.is_running(): return time.time() - self.start_time - else: - return self.duration_sec + return self.duration_sec def poll_stats(self): + """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary. + + return: latest stats or None if traffic is stopped + """ if not self.is_running(): return None + if self.client.skip_sleep(): + self.stop() + return self.client.get_stats() time_elapsed = self.time_elapsed() if time_elapsed > self.duration_sec: self.stop() @@ -80,381 +113,878 @@ class TrafficRunner(object): return self.client.get_stats() +class IpBlock(object): + """Manage a block of IP addresses.""" + + def __init__(self, base_ip, step_ip, count_ip): + """Create an IP block.""" + self.base_ip_int = Device.ip_to_int(base_ip) + if step_ip == 'random': + step_ip = '0.0.0.1' + self.step = Device.ip_to_int(step_ip) + self.max_available = count_ip + self.next_free = 0 + + def get_ip(self, index=0): + """Return the IP address at given index.""" + if index < 0 or index >= self.max_available: + raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available)) + return Device.int_to_ip(self.base_ip_int + index * self.step) + + def get_ip_from_chain_first_ip(self, first_ip, index=0): + """Return the IP address at given index starting from chain first ip.""" + if index < 0 or index >= self.max_available: + raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available)) + return Device.int_to_ip(first_ip + index * self.step) + + def reserve_ip_range(self, count): + """Reserve a range of count consecutive IP addresses spaced by step. + """ + if self.next_free + count > self.max_available: + raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' % + (self.next_free, + self.max_available, + count)) + first_ip = self.get_ip(self.next_free) + last_ip = self.get_ip(self.next_free + count - 1) + self.next_free += count + return (first_ip, last_ip) + + def reset_reservation(self): + """Reset all reservations and restart with a completely unused IP block.""" + self.next_free = 0 + + +class UdpPorts(object): + + def __init__(self, src_min, src_max, dst_min, dst_max, udp_src_size, udp_dst_size, step): + + self.src_min = int(src_min) + self.src_max = int(src_max) + self.dst_min = int(dst_min) + self.dst_max = int(dst_max) + self.udp_src_size = udp_src_size + self.udp_dst_size = udp_dst_size + self.step = step + + def get_src_max(self, index=0): + """Return the UDP src port at given index.""" + return int(self.src_min) + index * int(self.step) + + def get_dst_max(self, index=0): + """Return the UDP dst port at given index.""" + return int(self.dst_min) + index * int(self.step) + + class Device(object): + """Represent a port device and all information associated to it. + + In the curent version we only support 2 port devices for the traffic generator + identified as port 0 or port 1. + """ + + def __init__(self, port, generator_config): + """Create a new device for a given port.""" + self.generator_config = generator_config + self.chain_count = generator_config.service_chain_count + if generator_config.bidirectional: + self.flow_count = generator_config.flow_count / 2 + else: + self.flow_count = generator_config.flow_count - def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None, - gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None, - gateway_ip_addrs_step=None, udp_src_port=None, udp_dst_port=None, - chain_count=1, flow_count=1, vlan_tagging=False): - self.chain_count = chain_count - self.flow_count = flow_count - self.dst = None self.port = port - self.switch_port = switch_port - self.vtep_vlan = vtep_vlan - self.vlan_tag = None - self.vlan_tagging = vlan_tagging - self.pci = pci + self.switch_port = generator_config.interfaces[port].get('switch_port', None) + self.vtep_vlan = None + self.vtep_src_mac = None + self.vxlan = False + self.mpls = False + self.inner_labels = None + self.outer_labels = None + self.pci = generator_config.interfaces[port].pci self.mac = None - self.vm_mac_list = None - subnet = IPNetwork(ip) - self.ip = subnet.ip.format() - self.ip_prefixlen = subnet.prefixlen - self.ip_addrs_step = ip_addrs_step - self.tg_gateway_ip_addrs_step = tg_gateway_ip_addrs_step - self.gateway_ip_addrs_step = gateway_ip_addrs_step - self.ip_list = self.expand_ip(self.ip, self.ip_addrs_step, self.flow_count) - self.gateway_ip = gateway_ip - self.gateway_ip_list = self.expand_ip(self.gateway_ip, - self.gateway_ip_addrs_step, - self.chain_count) - self.tg_gateway_ip = tg_gateway_ip - self.tg_gateway_ip_list = self.expand_ip(self.tg_gateway_ip, - self.tg_gateway_ip_addrs_step, - self.chain_count) - self.udp_src_port = udp_src_port - self.udp_dst_port = udp_dst_port + self.dest_macs = None + self.vtep_dst_mac = None + self.vtep_dst_ip = None + if generator_config.vteps is None: + self.vtep_src_ip = None + else: + self.vtep_src_ip = generator_config.vteps[port] + self.vnis = None + self.vlans = None + self.ip_addrs = generator_config.ip_addrs[port] + self.ip_src_static = generator_config.ip_src_static + self.ip_addrs_step = generator_config.ip_addrs_step + if self.ip_addrs_step == 'random': + # Set step to 1 to calculate the IP range size (see check_range_size below) + step = '0.0.0.1' + else: + step = self.ip_addrs_step + self.ip_size = self.check_range_size(IPNetwork(self.ip_addrs).size, Device.ip_to_int(step)) + self.ip = str(IPNetwork(self.ip_addrs).network) + ip_addrs_left = generator_config.ip_addrs[0] + ip_addrs_right = generator_config.ip_addrs[1] + self.ip_addrs_size = { + 'left': self.check_range_size(IPNetwork(ip_addrs_left).size, Device.ip_to_int(step)), + 'right': self.check_range_size(IPNetwork(ip_addrs_right).size, Device.ip_to_int(step))} + udp_src_port = generator_config.gen_config.udp_src_port + if udp_src_port is None: + udp_src_port = 53 + udp_dst_port = generator_config.gen_config.udp_dst_port + if udp_dst_port is None: + udp_dst_port = 53 + src_max, src_min = self.define_udp_range(udp_src_port, 'udp_src_port') + dst_max, dst_min = self.define_udp_range(udp_dst_port, 'udp_dst_port') + if generator_config.gen_config.udp_port_step == 'random': + # Set step to 1 to calculate the UDP range size + udp_step = 1 + else: + udp_step = int(generator_config.gen_config.udp_port_step) + udp_src_size = self.check_range_size(int(src_max) - int(src_min) + 1, udp_step) + udp_dst_size = self.check_range_size(int(dst_max) - int(dst_min) + 1, udp_step) + lcm_port = lcm(udp_src_size, udp_dst_size) + if self.ip_src_static is True: + lcm_ip = lcm(1, min(self.ip_addrs_size['left'], self.ip_addrs_size['right'])) + else: + lcm_ip = lcm(self.ip_addrs_size['left'], self.ip_addrs_size['right']) + flow_max = lcm(lcm_port, lcm_ip) + if self.flow_count > flow_max: + raise TrafficClientException('Trying to set unachievable traffic (%d > %d)' % + (self.flow_count, flow_max)) + + self.udp_ports = UdpPorts(src_min, src_max, dst_min, dst_max, udp_src_size, udp_dst_size, + generator_config.gen_config.udp_port_step) + + self.ip_block = IpBlock(self.ip, step, self.ip_size) + + self.gw_ip_block = IpBlock(generator_config.gateway_ips[port], + generator_config.gateway_ip_addrs_step, + self.chain_count) + self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port] + self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs, + generator_config.tg_gateway_ip_addrs_step, + self.chain_count) + + def limit_ip_udp_ranges(self, peer_ip_size, cur_chain_flow_count): + # init to min value in case of no matching values found with lcm calculation + new_src_ip_size = 1 + new_peer_ip_size = 1 + new_src_udp_size = 1 + new_dst_udp_size = 1 + + if self.ip_src_static is True: + src_ip_size = 1 + else: + src_ip_size = self.ip_size + ip_src_divisors = list(get_divisors(src_ip_size)) + ip_dst_divisors = list(get_divisors(peer_ip_size)) + udp_src_divisors = list(get_divisors(self.udp_ports.udp_src_size)) + udp_dst_divisors = list(get_divisors(self.udp_ports.udp_dst_size)) + fc = int(cur_chain_flow_count) + tuples_ip = list(find_tuples_equal_to_lcm_value(ip_src_divisors, ip_dst_divisors, fc)) + tuples_udp = list(find_tuples_equal_to_lcm_value(udp_src_divisors, udp_dst_divisors, fc)) + + if tuples_ip: + new_src_ip_size = tuples_ip[-1][0] + new_peer_ip_size = tuples_ip[-1][1] + + if tuples_udp: + new_src_udp_size = tuples_udp[-1][0] + new_dst_udp_size = tuples_udp[-1][1] + + tuples_src = [] + tuples_dst = [] + if not tuples_ip and not tuples_udp: + # in case of not divisors in common matching LCM value (i.e. requested flow count) + # try to find an accurate UDP range to fit requested flow count + udp_src_int = range(self.udp_ports.src_min, self.udp_ports.src_max) + udp_dst_int = range(self.udp_ports.dst_min, self.udp_ports.dst_max) + tuples_src = list(find_tuples_equal_to_lcm_value(ip_src_divisors, udp_src_int, fc)) + tuples_dst = list(find_tuples_equal_to_lcm_value(ip_dst_divisors, udp_dst_int, fc)) + + if not tuples_src and not tuples_dst: + # iterate IP and UDP ranges to find a tuple that match flow count values + src_ip_range = range(1,src_ip_size) + dst_ip_range = range(1, peer_ip_size) + tuples_src = list(find_tuples_equal_to_lcm_value(src_ip_range, udp_src_int, fc)) + tuples_dst = list(find_tuples_equal_to_lcm_value(dst_ip_range, udp_dst_int, fc)) + + if tuples_src or tuples_dst: + if tuples_src: + new_src_ip_size = tuples_src[-1][0] + new_src_udp_size = tuples_src[-1][1] + if tuples_dst: + new_peer_ip_size = tuples_dst[-1][0] + new_dst_udp_size = tuples_dst[-1][1] + else: + if not tuples_ip: + if src_ip_size != 1: + if src_ip_size > fc: + new_src_ip_size = fc + else: + new_src_ip_size = find_max_size(src_ip_size, tuples_udp, fc) + if peer_ip_size != 1: + if peer_ip_size > fc: + new_peer_ip_size = fc + else: + new_peer_ip_size = find_max_size(peer_ip_size, tuples_udp, fc) + + if not tuples_udp: + if self.udp_ports.udp_src_size != 1: + if self.udp_ports.udp_src_size > fc: + new_src_udp_size = fc + else: + new_src_udp_size = find_max_size(self.udp_ports.udp_src_size, + tuples_ip, fc) + if self.udp_ports.udp_dst_size != 1: + if self.udp_ports.udp_dst_size > fc: + new_dst_udp_size = fc + else: + new_dst_udp_size = find_max_size(self.udp_ports.udp_dst_size, + tuples_ip, fc) + max_possible_flows = lcm(lcm(new_src_ip_size, new_peer_ip_size), + lcm(new_src_udp_size, new_dst_udp_size)) + + LOG.debug("IP dst size: %d", new_peer_ip_size) + LOG.debug("LCM IP: %d", lcm(new_src_ip_size, new_peer_ip_size)) + LOG.debug("LCM UDP: %d", lcm(new_src_udp_size, new_dst_udp_size)) + LOG.debug("Global LCM: %d", max_possible_flows) + LOG.debug("IP src size: %d, IP dst size: %d, UDP src size: %d, UDP dst size: %d", + new_src_ip_size, new_peer_ip_size, self.udp_ports.udp_src_size, + self.udp_ports.udp_dst_size) + if not max_possible_flows == cur_chain_flow_count: + if (self.ip_addrs_step != '0.0.0.1' or self.udp_ports.step != '1') and not ( + self.ip_addrs_step == 'random' and self.udp_ports.step == 'random'): + LOG.warning("Current values of ip_addrs_step and/or udp_port_step properties " + "do not allow to control an accurate flow count. " + "Values will be overridden as follows:") + if self.ip_addrs_step != '0.0.0.1': + LOG.info("ip_addrs_step='0.0.0.1' (previous value: ip_addrs_step='%s')", + self.ip_addrs_step) + self.ip_addrs_step = '0.0.0.1' + + if self.udp_ports.step != '1': + LOG.info("udp_port_step='1' (previous value: udp_port_step='%s')", + self.udp_ports.step) + self.udp_ports.step = '1' + # override config for not logging random step warning message in trex_gen.py + self.generator_config.gen_config.udp_port_step = self.udp_ports.step + else: + LOG.error("Current values of ip_addrs_step and udp_port_step properties " + "do not allow to control an accurate flow count.") + else: + src_ip_size = new_src_ip_size + peer_ip_size = new_peer_ip_size + self.udp_ports.udp_src_size = new_src_udp_size + self.udp_ports.udp_dst_size = new_dst_udp_size + return src_ip_size, peer_ip_size + + @staticmethod + def define_udp_range(udp_port, property_name): + if isinstance(udp_port, int): + min = udp_port + max = min + elif isinstance(udp_port, tuple): + min = udp_port[0] + max = udp_port[1] + else: + raise TrafficClientException('Invalid %s property value (53 or [\'53\',\'1024\'])' + % property_name) + return max, min + + + @staticmethod + def check_range_size(range_size, step): + """Check and set the available IPs or UDP ports, considering the step.""" + try: + if range_size % step == 0: + value = range_size // step + else: + value = range_size // step + 1 + return value + except ZeroDivisionError: + raise ZeroDivisionError("step can't be zero !") from ZeroDivisionError def set_mac(self, mac): + """Set the local MAC for this port device.""" if mac is None: raise TrafficClientException('Trying to set traffic generator MAC address as None') self.mac = mac - def set_destination(self, dst): - self.dst = dst + def get_peer_device(self): + """Get the peer device (device 0 -> device 1, or device 1 -> device 0).""" + return self.generator_config.devices[1 - self.port] - def set_vm_mac_list(self, vm_mac_list): - self.vm_mac_list = map(str, vm_mac_list) + def set_vtep_dst_mac(self, dest_macs): + """Set the list of dest MACs indexed by the chain id. - def set_vlan_tag(self, vlan_tag): - if self.vlan_tagging and vlan_tag is None: - raise TrafficClientException('Trying to set VLAN tag as None') - self.vlan_tag = vlan_tag + This is only called in 2 cases: + - VM macs discovered using openstack API + - dest MACs provisioned in config file + """ + self.vtep_dst_mac = list(map(str, dest_macs)) - def get_stream_configs(self, service_chain): + def set_dest_macs(self, dest_macs): + """Set the list of dest MACs indexed by the chain id. + + This is only called in 2 cases: + - VM macs discovered using openstack API + - dest MACs provisioned in config file + """ + self.dest_macs = list(map(str, dest_macs)) + + def get_dest_macs(self): + """Get the list of dest macs for this device. + + If set_dest_macs was never called, assumes l2-loopback and return + a list of peer mac (as many as chains but normally only 1 chain) + """ + if self.dest_macs: + return self.dest_macs + # assume this is l2-loopback + return [self.get_peer_device().mac] * self.chain_count + + def set_vlans(self, vlans): + """Set the list of vlans to use indexed by the chain id.""" + self.vlans = vlans + LOG.info("Port %d: VLANs %s", self.port, self.vlans) + + def set_vtep_vlan(self, vlan): + """Set the vtep vlan to use indexed by specific port.""" + self.vtep_vlan = vlan + self.vxlan = True + self.vlan_tagging = None + LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan) + + def set_vxlan_endpoints(self, src_ip, dst_ip): + self.vtep_dst_ip = dst_ip + self.vtep_src_ip = src_ip + LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port, + self.vtep_src_ip, self.vtep_dst_ip) + + def set_mpls_peers(self, src_ip, dst_ip): + self.mpls = True + self.vtep_dst_ip = dst_ip + self.vtep_src_ip = src_ip + LOG.info("Port %d: src_mpls_vtep %s, mpls_peer_ip %s", self.port, + self.vtep_src_ip, self.vtep_dst_ip) + + def set_vxlans(self, vnis): + self.vnis = vnis + LOG.info("Port %d: VNIs %s", self.port, self.vnis) + + def set_mpls_inner_labels(self, labels): + self.inner_labels = labels + LOG.info("Port %d: MPLS Inner Labels %s", self.port, self.inner_labels) + + def set_mpls_outer_labels(self, labels): + self.outer_labels = labels + LOG.info("Port %d: MPLS Outer Labels %s", self.port, self.outer_labels) + + def set_gw_ip(self, gateway_ip): + self.gw_ip_block = IpBlock(gateway_ip, + self.generator_config.gateway_ip_addrs_step, + self.chain_count) + + def get_gw_ip(self, chain_index): + """Retrieve the IP address assigned for the gateway of a given chain.""" + return self.gw_ip_block.get_ip(chain_index) + + def get_stream_configs(self): + """Get the stream config for a given chain on this device. + + Called by the traffic generator driver to program the traffic generator properly + before generating traffic + """ configs = [] - flow_idx = 0 - for chain_idx in xrange(self.chain_count): - current_flow_count = (self.flow_count - flow_idx) / (self.chain_count - chain_idx) - max_idx = flow_idx + current_flow_count - 1 - ip_src_count = self.ip_to_int(self.ip_list[max_idx]) - \ - self.ip_to_int(self.ip_list[flow_idx]) + 1 - ip_dst_count = self.ip_to_int(self.dst.ip_list[max_idx]) - \ - self.ip_to_int(self.dst.ip_list[flow_idx]) + 1 + # exact flow count for each chain is calculated as follows: + # - all chains except the first will have the same flow count + # calculated as (total_flows + chain_count - 1) / chain_count + # - the first chain will have the remainder + # example 11 flows and 3 chains => 3, 4, 4 + flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count) + cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1)) + + peer = self.get_peer_device() + self.ip_block.reset_reservation() + peer.ip_block.reset_reservation() + dest_macs = self.get_dest_macs() + + # limit ranges of UDP ports and IP to avoid overflow of the number of flows + peer_size = peer.ip_size // self.chain_count + + for chain_idx in range(self.chain_count): + src_ip_size, peer_ip_size = self.limit_ip_udp_ranges(peer_size, cur_chain_flow_count) + + src_ip_first, src_ip_last = self.ip_block.reserve_ip_range \ + (src_ip_size) + dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range \ + (peer_ip_size) + + if self.ip_addrs_step != 'random': + src_ip_last = self.ip_block.get_ip_from_chain_first_ip( + Device.ip_to_int(src_ip_first), src_ip_size - 1) + dst_ip_last = peer.ip_block.get_ip_from_chain_first_ip( + Device.ip_to_int(dst_ip_first), peer_ip_size - 1) + if self.udp_ports.step != 'random': + self.udp_ports.src_max = self.udp_ports.get_src_max(self.udp_ports.udp_src_size - 1) + self.udp_ports.dst_max = self.udp_ports.get_dst_max(self.udp_ports.udp_dst_size - 1) + if self.ip_src_static: + src_ip_last = src_ip_first + + LOG.info("Port %d, chain %d: IP src range [%s,%s]", self.port, chain_idx, + src_ip_first, src_ip_last) + LOG.info("Port %d, chain %d: IP dst range [%s,%s]", self.port, chain_idx, + dst_ip_first, dst_ip_last) + LOG.info("Port %d, chain %d: UDP src range [%s,%s]", self.port, chain_idx, + self.udp_ports.src_min, self.udp_ports.src_max) + LOG.info("Port %d, chain %d: UDP dst range [%s,%s]", self.port, chain_idx, + self.udp_ports.dst_min, self.udp_ports.dst_max) configs.append({ - 'count': current_flow_count, + 'count': cur_chain_flow_count, 'mac_src': self.mac, - 'mac_dst': self.dst.mac if service_chain == ChainType.EXT - else self.vm_mac_list[chain_idx], - 'ip_src_addr': self.ip_list[flow_idx], - 'ip_src_addr_max': self.ip_list[max_idx], - 'ip_src_count': ip_src_count, - 'ip_dst_addr': self.dst.ip_list[flow_idx], - 'ip_dst_addr_max': self.dst.ip_list[max_idx], - 'ip_dst_count': ip_dst_count, + 'mac_dst': dest_macs[chain_idx], + 'ip_src_addr': src_ip_first, + 'ip_src_addr_max': src_ip_last, + 'ip_src_count': src_ip_size, + 'ip_dst_addr': dst_ip_first, + 'ip_dst_addr_max': dst_ip_last, + 'ip_dst_count': peer_ip_size, 'ip_addrs_step': self.ip_addrs_step, - 'udp_src_port': self.udp_src_port, - 'udp_dst_port': self.udp_dst_port, - 'mac_discovery_gw': self.gateway_ip_list[chain_idx], - 'ip_src_tg_gw': self.tg_gateway_ip_list[chain_idx], - 'ip_dst_tg_gw': self.dst.tg_gateway_ip_list[chain_idx], - 'vlan_tag': self.vlan_tag if self.vlan_tagging else None - }) + 'ip_src_static': self.ip_src_static, + 'udp_src_port': self.udp_ports.src_min, + 'udp_src_port_max': self.udp_ports.src_max, + 'udp_src_count': self.udp_ports.udp_src_size, + 'udp_dst_port': self.udp_ports.dst_min, + 'udp_dst_port_max': self.udp_ports.dst_max, + 'udp_dst_count': self.udp_ports.udp_dst_size, + 'udp_port_step': self.udp_ports.step, + 'mac_discovery_gw': self.get_gw_ip(chain_idx), + 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx), + 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx), + 'vlan_tag': self.vlans[chain_idx] if self.vlans else None, + 'vxlan': self.vxlan, + 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None, + 'vtep_src_mac': self.mac if (self.vxlan or self.mpls) else None, + 'vtep_dst_mac': self.vtep_dst_mac if (self.vxlan or self.mpls) else None, + 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None, + 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None, + 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None, + 'mpls': self.mpls, + 'mpls_outer_label': self.outer_labels[chain_idx] if self.mpls is True else None, + 'mpls_inner_label': self.inner_labels[chain_idx] if self.mpls is True else None - flow_idx += current_flow_count + }) + # after first chain, fall back to the flow count for all other chains + cur_chain_flow_count = flows_per_chain return configs - @classmethod - def expand_ip(cls, ip, step_ip, count): - if step_ip == 'random': - # Repeatable Random will used in the stream src/dst IP pairs, but we still need - # to expand the IP based on the number of chains and flows configured. So we use - # "0.0.0.1" as the step to have the exact IP flow ranges for every chain. - step_ip = '0.0.0.1' - - step_ip_in_int = cls.ip_to_int(step_ip) - subnet = IPNetwork(ip) - ip_list = [] - for _ in xrange(count): - ip_list.append(subnet.ip.format()) - subnet = subnet.next(step_ip_in_int) - return ip_list - - @staticmethod - def mac_to_int(mac): - return int(mac.translate(None, ":.- "), 16) - - @staticmethod - def int_to_mac(i): - mac = format(i, 'x').zfill(12) - blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)] - return ':'.join(blocks) - @staticmethod def ip_to_int(addr): + """Convert an IP address from string to numeric.""" return struct.unpack("!I", socket.inet_aton(addr))[0] + @staticmethod + def int_to_ip(nvalue): + """Convert an IP address from numeric to string.""" + return socket.inet_ntoa(struct.pack("!I", int(nvalue))) -class RunningTrafficProfile(object): + +class GeneratorConfig(object): """Represents traffic configuration for currently running traffic profile.""" DEFAULT_IP_STEP = '0.0.0.1' DEFAULT_SRC_DST_IP_STEP = '0.0.0.1' - def __init__(self, config, generator_profile): - generator_config = self.__match_generator_profile(config.traffic_generator, - generator_profile) - self.generator_config = generator_config + def __init__(self, config): + """Create a generator config.""" + self.config = config + # name of the generator profile (normally trex or dummy) + # pick the default one if not specified explicitly from cli options + if not config.generator_profile: + config.generator_profile = config.traffic_generator.default_profile + # pick up the profile dict based on the name + gen_config = self.__match_generator_profile(config.traffic_generator, + config.generator_profile) + self.gen_config = gen_config + # copy over fields from the dict + self.tool = gen_config.tool + self.ip = gen_config.ip + # overrides on config.cores and config.mbuf_factor + if config.cores: + self.cores = config.cores + else: + self.cores = gen_config.get('cores', 1) + # let's report the value actually used in the end + config.cores_used = self.cores + self.mbuf_factor = config.mbuf_factor + self.mbuf_64 = config.mbuf_64 + self.hdrh = not config.disable_hdrh + if config.intf_speed: + # interface speed is overriden from the command line + self.intf_speed = config.intf_speed + elif gen_config.intf_speed: + # interface speed is overriden from the generator config + self.intf_speed = gen_config.intf_speed + else: + self.intf_speed = "auto" + if self.intf_speed in ("auto", "0"): + # interface speed is discovered/provided by the traffic generator + self.intf_speed = 0 + else: + self.intf_speed = bitmath.parse_string(self.intf_speed.replace('ps', '')).bits + self.name = gen_config.name + self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500) + self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501) + self.limit_memory = gen_config.get('limit_memory', 1024) + self.software_mode = gen_config.get('software_mode', False) + self.interfaces = gen_config.interfaces + if self.interfaces[0].port != 0 or self.interfaces[1].port != 1: + raise TrafficClientException('Invalid port order/id in generator_profile.interfaces') self.service_chain = config.service_chain self.service_chain_count = config.service_chain_count self.flow_count = config.flow_count - self.host_name = generator_config.host_name - self.name = generator_config.name - self.tool = generator_config.tool - self.cores = generator_config.get('cores', 1) - self.ip_addrs_step = generator_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP + self.host_name = gen_config.host_name + self.bidirectional = config.traffic.bidirectional + self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs + self.ip_addrs = gen_config.ip_addrs + self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP self.tg_gateway_ip_addrs_step = \ - generator_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP - self.gateway_ip_addrs_step = generator_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP - self.gateway_ips = generator_config.gateway_ip_addrs - self.ip = generator_config.ip - self.intf_speed = bitmath.parse_string(generator_config.intf_speed.replace('ps', '')).bits + gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP + self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP + self.gateway_ips = gen_config.gateway_ip_addrs + self.ip_src_static = gen_config.ip_src_static + self.vteps = gen_config.get('vteps') + self.devices = [Device(port, self) for port in [0, 1]] + # This should normally always be [0, 1] + self.ports = [device.port for device in self.devices] + + # check that pci is not empty + if not gen_config.interfaces[0].get('pci', None) or \ + not gen_config.interfaces[1].get('pci', None): + raise TrafficClientException("configuration interfaces pci fields cannot be empty") + + self.pcis = [tgif['pci'] for tgif in gen_config.interfaces] self.vlan_tagging = config.vlan_tagging - self.no_arp = config.no_arp - self.src_device = None - self.dst_device = None - self.vm_mac_list = None - self.__prep_interfaces(generator_config) - def to_json(self): - return dict(self.generator_config) - - def set_vm_mac_list(self, vm_mac_list): - self.src_device.set_vm_mac_list(vm_mac_list[0]) - self.dst_device.set_vm_mac_list(vm_mac_list[1]) + # needed for result/summarizer + config['tg-name'] = gen_config.name + config['tg-tool'] = self.tool - @staticmethod - def __match_generator_profile(traffic_generator, generator_profile): - generator_config = AttrDict(traffic_generator) - generator_config.pop('default_profile') - generator_config.pop('generator_profile') - matching_profile = filter(lambda profile: profile.name == generator_profile, - traffic_generator.generator_profile) - if len(matching_profile) != 1: - raise Exception('Traffic generator profile not found: ' + generator_profile) - - generator_config.update(matching_profile[0]) - - return generator_config - - def __prep_interfaces(self, generator_config): - src_config = { - 'chain_count': self.service_chain_count, - 'flow_count': self.flow_count / 2, - 'ip': generator_config.ip_addrs[0], - 'ip_addrs_step': self.ip_addrs_step, - 'gateway_ip': self.gateway_ips[0], - 'gateway_ip_addrs_step': self.gateway_ip_addrs_step, - 'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[0], - 'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step, - 'udp_src_port': generator_config.udp_src_port, - 'udp_dst_port': generator_config.udp_dst_port, - 'vlan_tagging': self.vlan_tagging - } - dst_config = { - 'chain_count': self.service_chain_count, - 'flow_count': self.flow_count / 2, - 'ip': generator_config.ip_addrs[1], - 'ip_addrs_step': self.ip_addrs_step, - 'gateway_ip': self.gateway_ips[1], - 'gateway_ip_addrs_step': self.gateway_ip_addrs_step, - 'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[1], - 'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step, - 'udp_src_port': generator_config.udp_src_port, - 'udp_dst_port': generator_config.udp_dst_port, - 'vlan_tagging': self.vlan_tagging - } - - self.src_device = Device(**dict(src_config, **generator_config.interfaces[0])) - self.dst_device = Device(**dict(dst_config, **generator_config.interfaces[1])) - self.src_device.set_destination(self.dst_device) - self.dst_device.set_destination(self.src_device) - - if self.service_chain == ChainType.EXT and not self.no_arp \ - and not self.__are_unique(self.src_device.ip_list, self.dst_device.ip_list): - raise Exception('Computed IP addresses are not unique, choose different base. ' - 'Start IPs: {start}. End IPs: {end}' - .format(start=self.src_device.ip_list, - end=self.dst_device.ip_list)) - - def __are_unique(self, list1, list2): - return set(list1).isdisjoint(set(list2)) - - @property - def devices(self): - return [self.src_device, self.dst_device] + def to_json(self): + """Get json form to display the content into the overall result dict.""" + return dict(self.gen_config) - @property - def vlans(self): - return [self.src_device.vtep_vlan, self.dst_device.vtep_vlan] + def set_dest_macs(self, port_index, dest_macs): + """Set the list of dest MACs indexed by the chain id on given port. - @property - def ports(self): - return [self.src_device.port, self.dst_device.port] + port_index: the port for which dest macs must be set + dest_macs: a list of dest MACs indexed by chain id + """ + if len(dest_macs) < self.config.service_chain_count: + raise TrafficClientException('Dest MAC list %s must have %d entries' % + (dest_macs, self.config.service_chain_count)) + # only pass the first scc dest MACs + self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count]) + LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs]) + + def set_vtep_dest_macs(self, port_index, dest_macs): + """Set the list of dest MACs indexed by the chain id on given port. + + port_index: the port for which dest macs must be set + dest_macs: a list of dest MACs indexed by chain id + """ + if len(dest_macs) != self.config.service_chain_count: + raise TrafficClientException('Dest MAC list %s must have %d entries' % + (dest_macs, self.config.service_chain_count)) + self.devices[port_index].set_vtep_dst_mac(dest_macs) + LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs}) - @property - def switch_ports(self): - return [self.src_device.switch_port, self.dst_device.switch_port] + def get_dest_macs(self): + """Return the list of dest macs indexed by port.""" + return [dev.get_dest_macs() for dev in self.devices] - @property - def pcis(self): - return [self.src_device.pci, self.dst_device.pci] + def set_vlans(self, port_index, vlans): + """Set the list of vlans to use indexed by the chain id on given port. + port_index: the port for which VLANs must be set + vlans: a list of vlan lists indexed by chain id + """ + if len(vlans) != self.config.service_chain_count: + raise TrafficClientException('VLAN list %s must have %d entries' % + (vlans, self.config.service_chain_count)) + self.devices[port_index].set_vlans(vlans) -class TrafficGeneratorFactory(object): + def set_vxlans(self, port_index, vxlans): + """Set the list of vxlans (VNIs) to use indexed by the chain id on given port. - def __init__(self, config): - self.config = config + port_index: the port for which VXLANs must be set + VXLANs: a list of VNIs lists indexed by chain id + """ + if len(vxlans) != self.config.service_chain_count: + raise TrafficClientException('VXLAN list %s must have %d entries' % + (vxlans, self.config.service_chain_count)) + self.devices[port_index].set_vxlans(vxlans) - def get_tool(self): - return self.config.generator_config.tool + def set_mpls_inner_labels(self, port_index, labels): + """Set the list of MPLS Labels to use indexed by the chain id on given port. - def get_generator_client(self): - tool = self.get_tool().lower() - if tool == 'trex': - from traffic_gen import trex - return trex.TRex(self.config) - elif tool == 'dummy': - from traffic_gen import dummy - return dummy.DummyTG(self.config) - else: - return None + port_index: the port for which Labels must be set + Labels: a list of Labels lists indexed by chain id + """ + if len(labels) != self.config.service_chain_count: + raise TrafficClientException('Inner MPLS list %s must have %d entries' % + (labels, self.config.service_chain_count)) + self.devices[port_index].set_mpls_inner_labels(labels) - def list_generator_profile(self): - return [profile.name for profile in self.config.traffic_generator.generator_profile] + def set_mpls_outer_labels(self, port_index, labels): + """Set the list of MPLS Labels to use indexed by the chain id on given port. - def get_generator_config(self, generator_profile): - return RunningTrafficProfile(self.config, generator_profile) + port_index: the port for which Labels must be set + Labels: a list of Labels lists indexed by chain id + """ + if len(labels) != self.config.service_chain_count: + raise TrafficClientException('Outer MPLS list %s must have %d entries' % + (labels, self.config.service_chain_count)) + self.devices[port_index].set_mpls_outer_labels(labels) + + def set_vtep_vlan(self, port_index, vlan): + """Set the vtep vlan to use indexed by the chain id on given port. + port_index: the port for which VLAN must be set + """ + self.devices[port_index].set_vtep_vlan(vlan) - def get_matching_profile(self, traffic_profile_name): - matching_profile = filter(lambda profile: profile.name == traffic_profile_name, - self.config.traffic_profile) + def set_vxlan_endpoints(self, port_index, src_ip, dst_ip): + self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip) - if len(matching_profile) > 1: - raise Exception('Multiple traffic profiles with the same name found.') - elif len(matching_profile) == 0: - raise Exception('No traffic profile found.') + def set_mpls_peers(self, port_index, src_ip, dst_ip): + self.devices[port_index].set_mpls_peers(src_ip, dst_ip) - return matching_profile[0] + @staticmethod + def __match_generator_profile(traffic_generator, generator_profile): + gen_config = AttrDict(traffic_generator) + gen_config.pop('default_profile') + gen_config.pop('generator_profile') + matching_profile = [profile for profile in traffic_generator.generator_profile if + profile.name == generator_profile] + if len(matching_profile) != 1: + raise Exception('Traffic generator profile not found: ' + generator_profile) - def get_frame_sizes(self, traffic_profile): - matching_profile = self.get_matching_profile(traffic_profile) - return matching_profile.l2frame_size + gen_config.update(matching_profile[0]) + return gen_config class TrafficClient(object): + """Traffic generator client with NDR/PDR binary seearch.""" PORTS = [0, 1] def __init__(self, config, notifier=None): - generator_factory = TrafficGeneratorFactory(config) - self.gen = generator_factory.get_generator_client() - self.tool = generator_factory.get_tool() + """Create a new TrafficClient instance. + + config: nfvbench config + notifier: notifier (optional) + + A new instance is created everytime the nfvbench config may have changed. + """ self.config = config + self.generator_config = GeneratorConfig(config) + self.tool = self.generator_config.tool + self.gen = self._get_generator() self.notifier = notifier self.interval_collector = None self.iteration_collector = None - self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec) - if self.gen is None: - raise TrafficClientException('%s is not a supported traffic generator' % self.tool) - + self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec, + self.config.service_mode) + self.config.frame_sizes = self._get_frame_sizes() self.run_config = { 'l2frame_size': None, 'duration_sec': self.config.duration_sec, 'bidirectional': True, - 'rates': None + 'rates': [] # to avoid unsbuscriptable-obj warning } self.current_total_rate = {'rate_percent': '10'} if self.config.single_run: self.current_total_rate = utils.parse_rate_str(self.config.rate) + self.ifstats = None + # Speed is either discovered when connecting to TG or set from config + # This variable is 0 if not yet discovered from TG or must be the speed of + # each interface in bits per second + self.intf_speed = self.generator_config.intf_speed + + def _get_generator(self): + tool = self.tool.lower() + if tool == 'trex': + from .traffic_gen import trex_gen + return trex_gen.TRex(self) + if tool == 'dummy': + from .traffic_gen import dummy + return dummy.DummyTG(self) + raise TrafficClientException('Unsupported generator tool name:' + self.tool) - def set_macs(self): - for mac, device in zip(self.gen.get_macs(), self.config.generator_config.devices): - device.set_mac(mac) + def skip_sleep(self): + """Skip all sleeps when doing unit testing with dummy TG. + + Must be overriden using mock.patch + """ + return False + + def _get_frame_sizes(self): + traffic_profile_name = self.config.traffic.profile + matching_profiles = [profile for profile in self.config.traffic_profile if + profile.name == traffic_profile_name] + if len(matching_profiles) > 1: + raise TrafficClientException('Multiple traffic profiles with name: ' + + traffic_profile_name) + if not matching_profiles: + raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name) + return matching_profiles[0].l2frame_size def start_traffic_generator(self): - self.gen.init() + """Start the traffic generator process (traffic not started yet).""" self.gen.connect() + # pick up the interface speed if it is not set from config + intf_speeds = self.gen.get_port_speed_gbps() + # convert Gbps unit into bps + tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits + if self.intf_speed: + # interface speed is overriden from config + if self.intf_speed != tg_if_speed: + # Warn the user if the speed in the config is different + LOG.warning( + 'Interface speed provided (%g Gbps) is different from actual speed (%d Gbps)', + self.intf_speed / 1000000000.0, intf_speeds[0]) + else: + # interface speed not provisioned by config + self.intf_speed = tg_if_speed + # also update the speed in the tg config + self.generator_config.intf_speed = tg_if_speed + # let's report detected and actually used interface speed + self.config.intf_speed_detected = tg_if_speed + self.config.intf_speed_used = self.intf_speed + + # Save the traffic generator local MAC + for mac, device in zip(self.gen.get_macs(), self.generator_config.devices): + device.set_mac(mac) def setup(self): - self.gen.set_mode() - self.gen.config_interface() + """Set up the traffic client.""" self.gen.clear_stats() def get_version(self): + """Get the traffic generator version.""" return self.gen.get_version() def ensure_end_to_end(self): - """ - Ensure traffic generator receives packets it has transmitted. + """Ensure traffic generator receives packets it has transmitted. + This ensures end to end connectivity and also waits until VMs are ready to forward packets. - At this point all VMs are in active state, but forwarding does not have to work. - Small amount of traffic is sent to every chain. Then total of sent and received packets - is compared. If ratio between received and transmitted packets is higher than (N-1)/N, - N being number of chains, traffic flows through every chain and real measurements can be - performed. + VMs that are started and in active state may not pass traffic yet. It is imperative to make + sure that all VMs are passing traffic in both directions before starting any benchmarking. + To verify this, we need to send at a low frequency bi-directional packets and make sure + that we receive all packets back from all VMs. The number of flows is equal to 2 times + the number of chains (1 per direction) and we need to make sure we receive packets coming + from exactly 2 x chain count different source MAC addresses. Example: PVP chain (1 VM per chain) N = 10 (number of chains) - threshold = (N-1)/N = 9/10 = 0.9 (acceptable ratio ensuring working conditions) - if total_received/total_sent > 0.9, traffic is flowing to more than 9 VMs meaning - all 10 VMs are in operational state. + Flow count = 20 (number of flows) + If the number of unique source MAC addresses from received packets is 20 then + all 10 VMs 10 VMs are in operational state. """ LOG.info('Starting traffic generator to ensure end-to-end connectivity') - rate_pps = {'rate_pps': str(self.config.service_chain_count * 100)} - self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False) - + # send 2pps on each chain and each direction + rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)} + self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False, + e2e=True) # ensures enough traffic is coming back - threshold = (self.config.service_chain_count - 1) / float(self.config.service_chain_count) - retry_count = (self.config.check_traffic_time_sec + - self.config.generic_poll_sec - 1) / self.config.generic_poll_sec - for it in xrange(retry_count): + retry_count = int((self.config.check_traffic_time_sec + + self.config.generic_poll_sec - 1) / self.config.generic_poll_sec) + + # we expect to see packets coming from 2 unique MAC per chain + # because there can be flooding in the case of shared net + # we must verify that packets from the right VMs are received + # and not just count unique src MAC + # create a dict of (port, chain) tuples indexed by dest mac + mac_map = {} + for port, dest_macs in enumerate(self.generator_config.get_dest_macs()): + for chain, mac in enumerate(dest_macs): + mac_map[mac] = (port, chain) + unique_src_mac_count = len(mac_map) + if self.config.vxlan and self.config.traffic_generator.vtep_vlan: + get_mac_id = lambda packet: packet['binary'][60:66] + elif self.config.vxlan: + get_mac_id = lambda packet: packet['binary'][56:62] + elif self.config.mpls: + get_mac_id = lambda packet: packet['binary'][24:30] + # mpls_transport_label = lambda packet: packet['binary'][14:18] + else: + get_mac_id = lambda packet: packet['binary'][6:12] + for it in range(retry_count): self.gen.clear_stats() self.gen.start_traffic() - LOG.info('Waiting for packets to be received back... ({} / {})'.format(it + 1, - retry_count)) - time.sleep(self.config.generic_poll_sec) + self.gen.start_capture() + LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...', + unique_src_mac_count - len(mac_map), unique_src_mac_count, + it + 1, retry_count) + if not self.skip_sleep(): + time.sleep(self.config.generic_poll_sec) self.gen.stop_traffic() - stats = self.gen.get_stats() - - # compute total sent and received traffic on both ports - total_rx = 0 - total_tx = 0 - for port in self.PORTS: - total_rx += float(stats[port]['rx'].get('total_pkts', 0)) - total_tx += float(stats[port]['tx'].get('total_pkts', 0)) - - # how much of traffic came back - ratio = total_rx / total_tx if total_tx else 0 - - if ratio > threshold: - self.gen.clear_stats() - self.gen.clear_streamblock() - LOG.info('End-to-end connectivity ensured') - return + self.gen.fetch_capture_packets() + self.gen.stop_capture() + for packet in self.gen.packet_list: + mac_id = get_mac_id(packet).decode('latin-1') + src_mac = ':'.join(["%02x" % ord(x) for x in mac_id]) + if self.config.mpls: + if src_mac in mac_map and self.is_mpls(packet): + port, chain = mac_map[src_mac] + LOG.info('Received mpls packet from mac: %s (chain=%d, port=%d)', + src_mac, chain, port) + mac_map.pop(src_mac, None) + else: + if src_mac in mac_map and self.is_udp(packet): + port, chain = mac_map[src_mac] + LOG.info('Received udp packet from mac: %s (chain=%d, port=%d)', + src_mac, chain, port) + mac_map.pop(src_mac, None) + + if not mac_map: + LOG.info('End-to-end connectivity established') + return + if self.config.l3_router and not self.config.no_arp: + # In case of L3 traffic mode, routers are not able to route traffic + # until VM interfaces are up and ARP requests are done + LOG.info('Waiting for loopback service completely started...') + LOG.info('Sending ARP request to assure end-to-end connectivity established') + self.ensure_arp_successful() + raise TrafficClientException('End-to-end connectivity cannot be ensured') - time.sleep(self.config.generic_poll_sec) + def is_udp(self, packet): + pkt = Ether(packet['binary']) + return UDP in pkt - raise TrafficClientException('End-to-end connectivity cannot be ensured') + def is_mpls(self, packet): + pkt = Ether(packet['binary']) + return MPLS in pkt def ensure_arp_successful(self): - if not self.gen.resolve_arp(): + """Resolve all IP using ARP and throw an exception in case of failure.""" + dest_macs = self.gen.resolve_arp() + if dest_macs: + # all dest macs are discovered, saved them into the generator config + if self.config.vxlan or self.config.mpls: + self.generator_config.set_vtep_dest_macs(0, dest_macs[0]) + self.generator_config.set_vtep_dest_macs(1, dest_macs[1]) + else: + self.generator_config.set_dest_macs(0, dest_macs[0]) + self.generator_config.set_dest_macs(1, dest_macs[1]) + else: raise TrafficClientException('ARP cannot be resolved') def set_traffic(self, frame_size, bidirectional): + """Reconfigure the traffic generator for a new frame size.""" self.run_config['bidirectional'] = bidirectional self.run_config['l2frame_size'] = frame_size self.run_config['rates'] = [self.get_per_direction_rate()] @@ -464,11 +994,21 @@ class TrafficClient(object): unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps) if unidir_reverse_pps > 0: self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)}) + # Fix for [NFVBENCH-67], convert the rate string to PPS + for idx, rate in enumerate(self.run_config['rates']): + if 'rate_pps' not in rate: + self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']} self.gen.clear_streamblock() - self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True) - def modify_load(self, load): + if self.config.no_latency_streams: + LOG.info("Latency streams are disabled") + # in service mode, we must disable flow stats (e2e=True) + self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, + latency=not self.config.no_latency_streams, + e2e=self.runner.service_mode) + + def _modify_load(self, load): self.current_total_rate = {'rate_percent': str(load)} rate_per_direction = self.get_per_direction_rate() @@ -479,6 +1019,7 @@ class TrafficClient(object): self.run_config['rates'][1] = rate_per_direction def get_ndr_and_pdr(self): + """Start the NDR/PDR iteration and return the results.""" dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional' targets = {} if self.config.ndr_run: @@ -518,32 +1059,41 @@ class TrafficClient(object): total_pkts = result['tx']['total_pkts'] if not total_pkts: return float('inf') - else: - return float(dropped_pkts) / total_pkts * 100 + return float(dropped_pkts) / total_pkts * 100 def get_stats(self): - stats = self.gen.get_stats() - retDict = {'total_tx_rate': stats['total_tx_rate']} - for port in self.PORTS: - retDict[port] = {'tx': {}, 'rx': {}} + """Collect final stats for previous run.""" + stats = self.gen.get_stats(self.ifstats) + retDict = {'total_tx_rate': stats['total_tx_rate'], + 'offered_tx_rate_bps': stats['offered_tx_rate_bps'], + 'theoretical_tx_rate_bps': stats['theoretical_tx_rate_bps'], + 'theoretical_tx_rate_pps': stats['theoretical_tx_rate_pps']} + + if self.config.periodic_gratuitous_arp: + retDict['garp_total_tx_rate'] = stats['garp_total_tx_rate'] tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate'] rx_keys = tx_keys + ['dropped_pkts'] for port in self.PORTS: + port_stats = {'tx': {}, 'rx': {}} for key in tx_keys: - retDict[port]['tx'][key] = int(stats[port]['tx'][key]) + port_stats['tx'][key] = int(stats[port]['tx'][key]) for key in rx_keys: try: - retDict[port]['rx'][key] = int(stats[port]['rx'][key]) + port_stats['rx'][key] = int(stats[port]['rx'][key]) except ValueError: - retDict[port]['rx'][key] = 0 - retDict[port]['rx']['avg_delay_usec'] = int(stats[port]['rx']['avg_delay_usec']) - retDict[port]['rx']['min_delay_usec'] = int(stats[port]['rx']['min_delay_usec']) - retDict[port]['rx']['max_delay_usec'] = int(stats[port]['rx']['max_delay_usec']) - retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port]) - - ports = sorted(retDict.keys()) + port_stats['rx'][key] = 0 + port_stats['rx']['avg_delay_usec'] = cast_integer( + stats[port]['rx']['avg_delay_usec']) + port_stats['rx']['min_delay_usec'] = cast_integer( + stats[port]['rx']['min_delay_usec']) + port_stats['rx']['max_delay_usec'] = cast_integer( + stats[port]['rx']['max_delay_usec']) + port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats) + retDict[str(port)] = port_stats + + ports = sorted(list(retDict.keys()), key=str) if self.run_config['bidirectional']: retDict['overall'] = {'tx': {}, 'rx': {}} for key in tx_keys: @@ -569,12 +1119,28 @@ class TrafficClient(object): else: retDict['overall'] = retDict[ports[0]] retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall']) + + if 'overall_hdrh' in stats: + retDict['overall']['hdrh'] = stats.get('overall_hdrh', None) + decoded_histogram = HdrHistogram.decode(retDict['overall']['hdrh']) + retDict['overall']['rx']['lat_percentile'] = {} + # override min max and avg from hdrh (only if histogram is valid) + if decoded_histogram.get_total_count() != 0: + retDict['overall']['rx']['min_delay_usec'] = decoded_histogram.get_min_value() + retDict['overall']['rx']['max_delay_usec'] = decoded_histogram.get_max_value() + retDict['overall']['rx']['avg_delay_usec'] = decoded_histogram.get_mean_value() + for percentile in self.config.lat_percentiles: + retDict['overall']['rx']['lat_percentile'][percentile] = \ + decoded_histogram.get_value_at_percentile(percentile) + else: + for percentile in self.config.lat_percentiles: + retDict['overall']['rx']['lat_percentile'][percentile] = 'n/a' return retDict def __convert_rates(self, rate): return utils.convert_rates(self.run_config['l2frame_size'], rate, - self.config.generator_config.intf_speed) + self.intf_speed) def __ndr_pdr_found(self, tag, load): rates = self.__convert_rates({'rate_percent': load}) @@ -583,7 +1149,8 @@ class TrafficClient(object): self.interval_collector.add_ndr_pdr(tag, last_stats) def __format_output_stats(self, stats): - for key in (self.PORTS + ['overall']): + for key in self.PORTS + ['overall']: + key = str(key) interface = stats[key] stats[key] = { 'tx_pkts': interface['tx']['total_pkts'], @@ -595,16 +1162,32 @@ class TrafficClient(object): 'min_delay_usec': interface['rx']['min_delay_usec'], } + if key == 'overall': + if 'hdrh' in interface: + stats[key]['hdrh'] = interface.get('hdrh', None) + decoded_histogram = HdrHistogram.decode(stats[key]['hdrh']) + stats[key]['lat_percentile'] = {} + # override min max and avg from hdrh (only if histogram is valid) + if decoded_histogram.get_total_count() != 0: + stats[key]['min_delay_usec'] = decoded_histogram.get_min_value() + stats[key]['max_delay_usec'] = decoded_histogram.get_max_value() + stats[key]['avg_delay_usec'] = decoded_histogram.get_mean_value() + for percentile in self.config.lat_percentiles: + stats[key]['lat_percentile'][percentile] = decoded_histogram.\ + get_value_at_percentile(percentile) + else: + for percentile in self.config.lat_percentiles: + stats[key]['lat_percentile'][percentile] = 'n/a' return stats def __targets_found(self, rate, targets, results): - for tag, target in targets.iteritems(): - LOG.info('Found {} ({}) load: {}'.format(tag, target, rate)) + for tag, target in list(targets.items()): + LOG.info('Found %s (%s) load: %s', tag, target, rate) self.__ndr_pdr_found(tag, rate) results[tag]['timestamp_sec'] = time.time() def __range_search(self, left, right, targets, results): - '''Perform a binary search for a list of targets inside a [left..right] range or rate + """Perform a binary search for a list of targets inside a [left..right] range or rate. left the left side of the range to search as a % the line rate (100 = 100% line rate) indicating the rate to send on each interface @@ -613,10 +1196,10 @@ class TrafficClient(object): targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag" ('ndr', 'pdr') results a dict to store results - ''' - if len(targets) == 0: + """ + if not targets: return - LOG.info('Range search [{} .. {}] targets: {}'.format(left, right, targets)) + LOG.info('Range search [%s .. %s] targets: %s', left, right, targets) # Terminate search when gap is less than load epsilon if right - left < self.config.measurement.load_epsilon: @@ -625,12 +1208,16 @@ class TrafficClient(object): # Obtain the average drop rate in for middle load middle = (left + right) / 2.0 - stats, rates = self.__run_search_iteration(middle) - + try: + stats, rates = self.__run_search_iteration(middle) + except STLError: + LOG.exception("Got exception from traffic generator during binary search") + self.__targets_found(left, targets, results) + return # Split target dicts based on the avg drop rate left_targets = {} right_targets = {} - for tag, target in targets.iteritems(): + for tag, target in list(targets.items()): if stats['overall']['drop_rate_percent'] <= target: # record the best possible rate found for this target results[tag] = rates @@ -670,8 +1257,22 @@ class TrafficClient(object): self.__range_search(middle, right, right_targets, results) def __run_search_iteration(self, rate): - # set load - self.modify_load(rate) + """Run one iteration at the given rate level. + + rate: the rate to send on each port in percent (0 to 100) + """ + self._modify_load(rate) + + # There used to be a inconsistency in case of interface speed override. + # The emulated 'intf_speed' value is unknown to the T-Rex generator which + # refers to the detected line rate for converting relative traffic loads. + # Therefore, we need to convert actual rates here, in terms of packets/s. + + for idx, str_rate in enumerate(self.gen.rates): + if str_rate.endswith('%'): + float_rate = float(str_rate.replace('%', '').strip()) + pps_rate = self.__convert_rates({'rate_percent': float_rate})['rate_pps'] + self.gen.rates[idx] = str(pps_rate) + 'pps' # poll interval stats and collect them for stats in self.run_traffic(): @@ -679,11 +1280,13 @@ class TrafficClient(object): time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec'] if time_elapsed_ratio >= 1: self.cancel_traffic() + if not self.skip_sleep(): + time.sleep(self.config.pause_sec) self.interval_collector.reset() # get stats from the run stats = self.runner.client.get_stats() - current_traffic_config = self.get_traffic_config() + current_traffic_config = self._get_traffic_config() warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'], stats['total_tx_rate']) if warning is not None: @@ -691,27 +1294,35 @@ class TrafficClient(object): # save reliable stats from whole iteration self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps']) - LOG.info('Average drop rate: {}'.format(stats['overall']['drop_rate_percent'])) - + LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent']) return stats, current_traffic_config['direction-total'] - @staticmethod - def log_stats(stats): - report = { - 'datetime': str(datetime.now()), - 'tx_packets': stats['overall']['tx']['total_pkts'], - 'rx_packets': stats['overall']['rx']['total_pkts'], - 'drop_packets': stats['overall']['rx']['dropped_pkts'], - 'drop_rate_percent': stats['overall']['drop_rate_percent'] - } - LOG.info('TX: %(tx_packets)d; ' - 'RX: %(rx_packets)d; ' - 'Dropped: %(drop_packets)d; ' - 'Drop rate: %(drop_rate_percent).4f%%', - report) + def log_stats(self, stats): + """Log estimated stats during run.""" + # Calculate a rolling drop rate based on differential to + # the previous reading + cur_tx = stats['overall']['tx']['total_pkts'] + cur_rx = stats['overall']['rx']['total_pkts'] + delta_tx = cur_tx - self.prev_tx + delta_rx = cur_rx - self.prev_rx + drops = delta_tx - delta_rx + if delta_tx == 0: + LOG.info("\x1b[1mConfiguration issue!\x1b[0m (no transmission)") + sys.exit(0) + drop_rate_pct = 100 * (delta_tx - delta_rx)/delta_tx + self.prev_tx = cur_tx + self.prev_rx = cur_rx + LOG.info('TX: %15s; RX: %15s; (Est.) Dropped: %12s; Drop rate: %8.4f%%', + format(cur_tx, ',d'), + format(cur_rx, ',d'), + format(drops, ',d'), + drop_rate_pct) def run_traffic(self): + """Start traffic and return intermediate stats for each interval.""" stats = self.runner.run() + self.prev_tx = 0 + self.prev_rx = 0 while self.runner.is_running: self.log_stats(stats) yield stats @@ -719,22 +1330,14 @@ class TrafficClient(object): if stats is None: return self.log_stats(stats) - LOG.info('Drop rate: {}'.format(stats['overall']['drop_rate_percent'])) + LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent']) yield stats def cancel_traffic(self): + """Stop traffic.""" self.runner.stop() - def get_interface(self, port_index): - port = self.gen.port_handle[port_index] - tx, rx = 0, 0 - if not self.config.no_traffic: - stats = self.get_stats() - if port in stats: - tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts']) - return Interface('traffic-generator', self.tool.lower(), tx, rx) - - def get_traffic_config(self): + def _get_traffic_config(self): config = {} load_total = 0.0 bps_total = 0.0 @@ -753,35 +1356,128 @@ class TrafficClient(object): config['direction-total'] = dict(config['direction-forward']) config['direction-total'].update({ 'rate_percent': load_total, - 'rate_pps': pps_total, + 'rate_pps': cast_integer(pps_total), 'rate_bps': bps_total }) return config def get_run_config(self, results): - """Returns configuration which was used for the last run.""" + """Return configuration which was used for the last run.""" r = {} + # because we want each direction to have the far end RX rates, + # use the far end index (1-idx) to retrieve the RX rates for idx, key in enumerate(["direction-forward", "direction-reverse"]): - tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec - rx_rate = results["stats"][idx]["rx"]["total_pkts"] / self.config.duration_sec + tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec + rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec + + orig_rate = self.run_config['rates'][idx] + if self.config.periodic_gratuitous_arp: + orig_rate['rate_pps'] = float( + orig_rate['rate_pps']) - self.config.gratuitous_arp_pps + r[key] = { - "orig": self.__convert_rates(self.run_config['rates'][idx]), + "orig": self.__convert_rates(orig_rate), "tx": self.__convert_rates({'rate_pps': tx_rate}), "rx": self.__convert_rates({'rate_pps': rx_rate}) } + if self.config.periodic_gratuitous_arp: + r['garp-direction-total'] = { + "orig": self.__convert_rates({'rate_pps': self.config.gratuitous_arp_pps * 2}), + "tx": self.__convert_rates({'rate_pps': results["stats"]["garp_total_tx_rate"]}), + "rx": self.__convert_rates({'rate_pps': 0}) + } + total = {} for direction in ['orig', 'tx', 'rx']: total[direction] = {} for unit in ['rate_percent', 'rate_bps', 'rate_pps']: - total[direction][unit] = sum(map(lambda x: float(x[direction][unit]), r.values())) + total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())]) r['direction-total'] = total + return r + def insert_interface_stats(self, pps_list): + """Insert interface stats to a list of packet path stats. + + pps_list: a list of packet path stats instances indexed by chain index + + This function will insert the packet path stats for the traffic gen ports 0 and 1 + with itemized per chain tx/rx counters. + There will be as many packet path stats as chains. + Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1 + self.pps_list: + [ + PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)), + PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)), + ... + ] + """ + def get_if_stats(chain_idx): + return [InterfaceStats('p' + str(port), self.tool) + for port in range(2)] + # keep the list of list of interface stats indexed by the chain id + self.ifstats = [get_if_stats(chain_idx) + for chain_idx in range(self.config.service_chain_count)] + # note that we need to make a copy of the ifs list so that any modification in the + # list from pps will not change the list saved in self.ifstats + self.pps_list = [PacketPathStats(self.config, list(ifs)) for ifs in self.ifstats] + # insert the corresponding pps in the passed list + pps_list.extend(self.pps_list) + + def update_interface_stats(self, diff=False): + """Update all interface stats. + + diff: if False, simply refresh the interface stats values with latest values + if True, diff the interface stats with the latest values + Make sure that the interface stats inserted in insert_interface_stats() are updated + with proper values. + self.ifstats: + [ + [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)], + [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)], + ... + ] + """ + if diff: + stats = self.gen.get_stats(self.ifstats) + for chain_idx, ifs in enumerate(self.ifstats): + # each ifs has exactly 2 InterfaceStats and 2 Latency instances + # corresponding to the + # port 0 and port 1 for the given chain_idx + # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the + # interface stats for the pps because it could have been modified to contain + # additional interface stats + self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx) + # special handling for vxlan + # in case of vxlan, flow stats are not available so all rx counters will be + # zeros when the total rx port counter is non zero. + # in that case, + for port in range(2): + total_rx = 0 + for ifs in self.ifstats: + total_rx += ifs[port].rx + if total_rx == 0: + # check if the total port rx from Trex is also zero + port_rx = stats[port]['rx']['total_pkts'] + if port_rx: + # the total rx for all chains from port level stats is non zero + # which means that the per-chain stats are not available + if len(self.ifstats) == 1: + # only one chain, simply report the port level rx to the chain rx stats + self.ifstats[0][port].rx = port_rx + else: + for ifs in self.ifstats: + # mark this data as unavailable + ifs[port].rx = None + # pitch in the total rx only in the last chain pps + self.ifstats[-1][port].rx_total = port_rx + @staticmethod def compare_tx_rates(required, actual): + """Compare the actual TX rate to the required TX rate.""" threshold = 0.9 are_different = False try: @@ -800,6 +1496,7 @@ class TrafficClient(object): return None def get_per_direction_rate(self): + """Get the rate for each direction.""" divisor = 2 if self.run_config['bidirectional'] else 1 if 'rate_percent' in self.current_total_rate: # don't split rate if it's percentage @@ -808,6 +1505,7 @@ class TrafficClient(object): return utils.divide_rate(self.current_total_rate, divisor) def close(self): + """Close this instance.""" try: self.gen.stop_traffic() except Exception: