X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=nfvbench%2Ftraffic_client.py;h=2ce118cea555031eec19c93e6b8b1c3f88f51ebd;hb=refs%2Fheads%2Fstable%2Ffraser;hp=ec63944d3df9ea2e293eef3acd72c8e3aa495db1;hpb=97b452affb0e99816ad503a5f79b01f38b93059a;p=nfvbench.git diff --git a/nfvbench/traffic_client.py b/nfvbench/traffic_client.py old mode 100644 new mode 100755 index ec63944..2ce118c --- a/nfvbench/traffic_client.py +++ b/nfvbench/traffic_client.py @@ -12,18 +12,24 @@ # License for the specific language governing permissions and limitations # under the License. +from datetime import datetime +import re +import socket +import struct +import time + from attrdict import AttrDict import bitmath -from datetime import datetime -from log import LOG from netaddr import IPNetwork +# pylint: disable=import-error +from trex_stl_lib.api import STLError +# pylint: enable=import-error + +from log import LOG from network import Interface -import socket from specs import ChainType from stats_collector import IntervalCollector from stats_collector import IterationCollector -import struct -import time import traffic_gen.traffic_utils as utils from utils import cast_integer @@ -33,7 +39,6 @@ class TrafficClientException(Exception): class TrafficRunner(object): - def __init__(self, client, duration_sec, interval_sec=0): self.client = client self.start_time = None @@ -58,12 +63,14 @@ class TrafficRunner(object): def time_elapsed(self): if self.is_running(): return time.time() - self.start_time - else: - return self.duration_sec + return self.duration_sec def poll_stats(self): if not self.is_running(): return None + if self.client.skip_sleep: + self.stop() + return self.client.get_stats() time_elapsed = self.time_elapsed() if time_elapsed > self.duration_sec: self.stop() @@ -81,12 +88,42 @@ class TrafficRunner(object): return self.client.get_stats() -class Device(object): +class IpBlock(object): + def __init__(self, base_ip, step_ip, count_ip): + self.base_ip_int = Device.ip_to_int(base_ip) + self.step = Device.ip_to_int(step_ip) + self.max_available = count_ip + self.next_free = 0 + + def get_ip(self, index=0): + '''Return the IP address at given index + ''' + if index < 0 or index >= self.max_available: + raise IndexError('Index out of bounds') + return Device.int_to_ip(self.base_ip_int + index * self.step) + def reserve_ip_range(self, count): + '''Reserve a range of count consecutive IP addresses spaced by step + ''' + if self.next_free + count > self.max_available: + raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' % + (self.next_free, + self.max_available, + count)) + first_ip = self.get_ip(self.next_free) + last_ip = self.get_ip(self.next_free + count - 1) + self.next_free += count + return (first_ip, last_ip) + + def reset_reservation(self): + self.next_free = 0 + + +class Device(object): def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None, gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None, gateway_ip_addrs_step=None, udp_src_port=None, udp_dst_port=None, - chain_count=1, flow_count=1, vlan_tagging=False): + dst_mac=None, chain_count=1, flow_count=1, vlan_tagging=False): self.chain_count = chain_count self.flow_count = flow_count self.dst = None @@ -97,6 +134,7 @@ class Device(object): self.vlan_tagging = vlan_tagging self.pci = pci self.mac = None + self.dst_mac = dst_mac self.vm_mac_list = None subnet = IPNetwork(ip) self.ip = subnet.ip.format() @@ -104,15 +142,15 @@ class Device(object): self.ip_addrs_step = ip_addrs_step self.tg_gateway_ip_addrs_step = tg_gateway_ip_addrs_step self.gateway_ip_addrs_step = gateway_ip_addrs_step - self.ip_list = self.expand_ip(self.ip, self.ip_addrs_step, self.flow_count) self.gateway_ip = gateway_ip - self.gateway_ip_list = self.expand_ip(self.gateway_ip, - self.gateway_ip_addrs_step, - self.chain_count) self.tg_gateway_ip = tg_gateway_ip - self.tg_gateway_ip_list = self.expand_ip(self.tg_gateway_ip, - self.tg_gateway_ip_addrs_step, - self.chain_count) + self.ip_block = IpBlock(self.ip, ip_addrs_step, flow_count) + self.gw_ip_block = IpBlock(gateway_ip, + gateway_ip_addrs_step, + chain_count) + self.tg_gw_ip_block = IpBlock(tg_gateway_ip, + tg_gateway_ip_addrs_step, + chain_count) self.udp_src_port = udp_src_port self.udp_dst_port = udp_dst_port @@ -132,55 +170,65 @@ class Device(object): raise TrafficClientException('Trying to set VLAN tag as None') self.vlan_tag = vlan_tag + def get_gw_ip(self, chain_index): + '''Retrieve the IP address assigned for the gateway of a given chain + ''' + return self.gw_ip_block.get_ip(chain_index) + def get_stream_configs(self, service_chain): configs = [] - flow_idx = 0 + # exact flow count for each chain is calculated as follows: + # - all chains except the first will have the same flow count + # calculated as (total_flows + chain_count - 1) / chain_count + # - the first chain will have the remainder + # example 11 flows and 3 chains => 3, 4, 4 + flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count + cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1) + + self.ip_block.reset_reservation() + self.dst.ip_block.reset_reservation() + for chain_idx in xrange(self.chain_count): - current_flow_count = (self.flow_count - flow_idx) / (self.chain_count - chain_idx) - max_idx = flow_idx + current_flow_count - 1 - ip_src_count = self.ip_to_int(self.ip_list[max_idx]) - \ - self.ip_to_int(self.ip_list[flow_idx]) + 1 - ip_dst_count = self.ip_to_int(self.dst.ip_list[max_idx]) - \ - self.ip_to_int(self.dst.ip_list[flow_idx]) + 1 + src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count) + dst_ip_first, dst_ip_last = self.dst.ip_block.reserve_ip_range(cur_chain_flow_count) + + dst_mac = self.dst_mac[chain_idx] if self.dst_mac is not None else self.dst.mac + if not re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", dst_mac.lower()): + raise TrafficClientException("Invalid MAC address '{mac}' specified in " + "mac_addrs_left/right".format(mac=dst_mac)) configs.append({ - 'count': current_flow_count, + 'count': cur_chain_flow_count, 'mac_src': self.mac, - 'mac_dst': self.dst.mac if service_chain == ChainType.EXT - else self.vm_mac_list[chain_idx], - 'ip_src_addr': self.ip_list[flow_idx], - 'ip_src_addr_max': self.ip_list[max_idx], - 'ip_src_count': ip_src_count, - 'ip_dst_addr': self.dst.ip_list[flow_idx], - 'ip_dst_addr_max': self.dst.ip_list[max_idx], - 'ip_dst_count': ip_dst_count, + 'mac_dst': dst_mac if service_chain == ChainType.EXT else self.vm_mac_list[ + chain_idx], + 'ip_src_addr': src_ip_first, + 'ip_src_addr_max': src_ip_last, + 'ip_src_count': cur_chain_flow_count, + 'ip_dst_addr': dst_ip_first, + 'ip_dst_addr_max': dst_ip_last, + 'ip_dst_count': cur_chain_flow_count, 'ip_addrs_step': self.ip_addrs_step, 'udp_src_port': self.udp_src_port, 'udp_dst_port': self.udp_dst_port, - 'mac_discovery_gw': self.gateway_ip_list[chain_idx], - 'ip_src_tg_gw': self.tg_gateway_ip_list[chain_idx], - 'ip_dst_tg_gw': self.dst.tg_gateway_ip_list[chain_idx], + 'mac_discovery_gw': self.get_gw_ip(chain_idx), + 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx), + 'ip_dst_tg_gw': self.dst.tg_gw_ip_block.get_ip(chain_idx), 'vlan_tag': self.vlan_tag if self.vlan_tagging else None }) + # after first chain, fall back to the flow count for all other chains + cur_chain_flow_count = flows_per_chain - flow_idx += current_flow_count return configs - @classmethod - def expand_ip(cls, ip, step_ip, count): - if step_ip == 'random': - # Repeatable Random will used in the stream src/dst IP pairs, but we still need - # to expand the IP based on the number of chains and flows configured. So we use - # "0.0.0.1" as the step to have the exact IP flow ranges for every chain. - step_ip = '0.0.0.1' - - step_ip_in_int = cls.ip_to_int(step_ip) - subnet = IPNetwork(ip) - ip_list = [] - for _ in xrange(count): - ip_list.append(subnet.ip.format()) - subnet = subnet.next(step_ip_in_int) - return ip_list + def ip_range_overlaps(self): + '''Check if this device ip range is overlapping with the dst device ip range + ''' + src_base_ip = Device.ip_to_int(self.ip) + dst_base_ip = Device.ip_to_int(self.dst.ip) + src_last_ip = src_base_ip + self.flow_count - 1 + dst_last_ip = dst_base_ip + self.flow_count - 1 + return dst_last_ip >= src_base_ip and src_last_ip >= dst_base_ip @staticmethod def mac_to_int(mac): @@ -196,6 +244,10 @@ class Device(object): def ip_to_int(addr): return struct.unpack("!I", socket.inet_aton(addr))[0] + @staticmethod + def int_to_ip(nvalue): + return socket.inet_ntoa(struct.pack("!I", nvalue)) + class RunningTrafficProfile(object): """Represents traffic configuration for currently running traffic profile.""" @@ -226,6 +278,8 @@ class RunningTrafficProfile(object): self.src_device = None self.dst_device = None self.vm_mac_list = None + self.mac_addrs_left = generator_config.mac_addrs_left + self.mac_addrs_right = generator_config.mac_addrs_right self.__prep_interfaces(generator_config) def to_json(self): @@ -240,8 +294,8 @@ class RunningTrafficProfile(object): generator_config = AttrDict(traffic_generator) generator_config.pop('default_profile') generator_config.pop('generator_profile') - matching_profile = filter(lambda profile: profile.name == generator_profile, - traffic_generator.generator_profile) + matching_profile = [profile for profile in traffic_generator.generator_profile if + profile.name == generator_profile] if len(matching_profile) != 1: raise Exception('Traffic generator profile not found: ' + generator_profile) @@ -261,7 +315,8 @@ class RunningTrafficProfile(object): 'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step, 'udp_src_port': generator_config.udp_src_port, 'udp_dst_port': generator_config.udp_dst_port, - 'vlan_tagging': self.vlan_tagging + 'vlan_tagging': self.vlan_tagging, + 'dst_mac': generator_config.mac_addrs_left } dst_config = { 'chain_count': self.service_chain_count, @@ -274,7 +329,8 @@ class RunningTrafficProfile(object): 'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step, 'udp_src_port': generator_config.udp_src_port, 'udp_dst_port': generator_config.udp_dst_port, - 'vlan_tagging': self.vlan_tagging + 'vlan_tagging': self.vlan_tagging, + 'dst_mac': generator_config.mac_addrs_right } self.src_device = Device(**dict(src_config, **generator_config.interfaces[0])) @@ -283,14 +339,11 @@ class RunningTrafficProfile(object): self.dst_device.set_destination(self.src_device) if self.service_chain == ChainType.EXT and not self.no_arp \ - and not self.__are_unique(self.src_device.ip_list, self.dst_device.ip_list): - raise Exception('Computed IP addresses are not unique, choose different base. ' - 'Start IPs: {start}. End IPs: {end}' - .format(start=self.src_device.ip_list, - end=self.dst_device.ip_list)) - - def __are_unique(self, list1, list2): - return set(list1).isdisjoint(set(list2)) + and self.src_device.ip_range_overlaps(): + raise Exception('Overlapping IP address ranges src=%s dst=%d flows=%d' % + self.src_device.ip, + self.dst_device.ip, + self.flow_count) @property def devices(self): @@ -314,7 +367,6 @@ class RunningTrafficProfile(object): class TrafficGeneratorFactory(object): - def __init__(self, config): self.config = config @@ -329,8 +381,7 @@ class TrafficGeneratorFactory(object): elif tool == 'dummy': from traffic_gen import dummy return dummy.DummyTG(self.config) - else: - return None + return None def list_generator_profile(self): return [profile.name for profile in self.config.traffic_generator.generator_profile] @@ -339,12 +390,12 @@ class TrafficGeneratorFactory(object): return RunningTrafficProfile(self.config, generator_profile) def get_matching_profile(self, traffic_profile_name): - matching_profile = filter(lambda profile: profile.name == traffic_profile_name, - self.config.traffic_profile) + matching_profile = [profile for profile in self.config.traffic_profile if + profile.name == traffic_profile_name] if len(matching_profile) > 1: raise Exception('Multiple traffic profiles with the same name found.') - elif len(matching_profile) == 0: + elif not matching_profile: raise Exception('No traffic profile found.') return matching_profile[0] @@ -355,10 +406,9 @@ class TrafficGeneratorFactory(object): class TrafficClient(object): - PORTS = [0, 1] - def __init__(self, config, notifier=None): + def __init__(self, config, notifier=None, skip_sleep=False): generator_factory = TrafficGeneratorFactory(config) self.gen = generator_factory.get_generator_client() self.tool = generator_factory.get_tool() @@ -374,11 +424,13 @@ class TrafficClient(object): 'l2frame_size': None, 'duration_sec': self.config.duration_sec, 'bidirectional': True, - 'rates': None + 'rates': [] # to avoid unsbuscriptable-obj warning } self.current_total_rate = {'rate_percent': '10'} if self.config.single_run: self.current_total_rate = utils.parse_rate_str(self.config.rate) + # UT with dummy TG can bypass all sleeps + self.skip_sleep = skip_sleep def set_macs(self): for mac, device in zip(self.gen.get_macs(), self.config.generator_config.devices): @@ -401,53 +453,52 @@ class TrafficClient(object): Ensure traffic generator receives packets it has transmitted. This ensures end to end connectivity and also waits until VMs are ready to forward packets. - At this point all VMs are in active state, but forwarding does not have to work. - Small amount of traffic is sent to every chain. Then total of sent and received packets - is compared. If ratio between received and transmitted packets is higher than (N-1)/N, - N being number of chains, traffic flows through every chain and real measurements can be - performed. + VMs that are started and in active state may not pass traffic yet. It is imperative to make + sure that all VMs are passing traffic in both directions before starting any benchmarking. + To verify this, we need to send at a low frequency bi-directional packets and make sure + that we receive all packets back from all VMs. The number of flows is equal to 2 times + the number of chains (1 per direction) and we need to make sure we receive packets coming + from exactly 2 x chain count different source MAC addresses. Example: PVP chain (1 VM per chain) N = 10 (number of chains) - threshold = (N-1)/N = 9/10 = 0.9 (acceptable ratio ensuring working conditions) - if total_received/total_sent > 0.9, traffic is flowing to more than 9 VMs meaning - all 10 VMs are in operational state. + Flow count = 20 (number of flows) + If the number of unique source MAC addresses from received packets is 20 then + all 10 VMs 10 VMs are in operational state. """ LOG.info('Starting traffic generator to ensure end-to-end connectivity') - rate_pps = {'rate_pps': str(self.config.service_chain_count * 100)} + rate_pps = {'rate_pps': str(self.config.service_chain_count * 1)} self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False) # ensures enough traffic is coming back - threshold = (self.config.service_chain_count - 1) / float(self.config.service_chain_count) retry_count = (self.config.check_traffic_time_sec + self.config.generic_poll_sec - 1) / self.config.generic_poll_sec + mac_addresses = set() + ln = 0 for it in xrange(retry_count): self.gen.clear_stats() self.gen.start_traffic() - LOG.info('Waiting for packets to be received back... ({} / {})'.format(it + 1, - retry_count)) - time.sleep(self.config.generic_poll_sec) + self.gen.start_capture() + LOG.info('Waiting for packets to be received back... (%d / %d)', it + 1, retry_count) + if not self.skip_sleep: + time.sleep(self.config.generic_poll_sec) self.gen.stop_traffic() - stats = self.gen.get_stats() - - # compute total sent and received traffic on both ports - total_rx = 0 - total_tx = 0 - for port in self.PORTS: - total_rx += float(stats[port]['rx'].get('total_pkts', 0)) - total_tx += float(stats[port]['tx'].get('total_pkts', 0)) - - # how much of traffic came back - ratio = total_rx / total_tx if total_tx else 0 - - if ratio > threshold: - self.gen.clear_stats() - self.gen.clear_streamblock() - LOG.info('End-to-end connectivity ensured') - return - - time.sleep(self.config.generic_poll_sec) + self.gen.fetch_capture_packets() + self.gen.stop_capture() + + for packet in self.gen.packet_list: + mac_addresses.add(packet['binary'][6:12]) + if ln != len(mac_addresses): + ln = len(mac_addresses) + LOG.info('Flows passing traffic %d / %d', ln, + self.config.service_chain_count * 2) + if len(mac_addresses) == self.config.service_chain_count * 2: + LOG.info('End-to-end connectivity ensured') + return + + if not self.skip_sleep: + time.sleep(self.config.generic_poll_sec) raise TrafficClientException('End-to-end connectivity cannot be ensured') @@ -465,6 +516,10 @@ class TrafficClient(object): unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps) if unidir_reverse_pps > 0: self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)}) + # Fix for [NFVBENCH-67], convert the rate string to PPS + for idx, rate in enumerate(self.run_config['rates']): + if 'rate_pps' not in rate: + self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']} self.gen.clear_streamblock() self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True) @@ -519,8 +574,7 @@ class TrafficClient(object): total_pkts = result['tx']['total_pkts'] if not total_pkts: return float('inf') - else: - return float(dropped_pkts) / total_pkts * 100 + return float(dropped_pkts) / total_pkts * 100 def get_stats(self): stats = self.gen.get_stats() @@ -587,7 +641,7 @@ class TrafficClient(object): self.interval_collector.add_ndr_pdr(tag, last_stats) def __format_output_stats(self, stats): - for key in (self.PORTS + ['overall']): + for key in self.PORTS + ['overall']: interface = stats[key] stats[key] = { 'tx_pkts': interface['tx']['total_pkts'], @@ -603,7 +657,7 @@ class TrafficClient(object): def __targets_found(self, rate, targets, results): for tag, target in targets.iteritems(): - LOG.info('Found {} ({}) load: {}'.format(tag, target, rate)) + LOG.info('Found %s (%s) load: %s', tag, target, rate) self.__ndr_pdr_found(tag, rate) results[tag]['timestamp_sec'] = time.time() @@ -618,9 +672,9 @@ class TrafficClient(object): ('ndr', 'pdr') results a dict to store results ''' - if len(targets) == 0: + if not targets: return - LOG.info('Range search [{} .. {}] targets: {}'.format(left, right, targets)) + LOG.info('Range search [%s .. %s] targets: %s', left, right, targets) # Terminate search when gap is less than load epsilon if right - left < self.config.measurement.load_epsilon: @@ -629,8 +683,12 @@ class TrafficClient(object): # Obtain the average drop rate in for middle load middle = (left + right) / 2.0 - stats, rates = self.__run_search_iteration(middle) - + try: + stats, rates = self.__run_search_iteration(middle) + except STLError: + LOG.exception("Got exception from traffic generator during binary search") + self.__targets_found(left, targets, results) + return # Split target dicts based on the avg drop rate left_targets = {} right_targets = {} @@ -695,7 +753,7 @@ class TrafficClient(object): # save reliable stats from whole iteration self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps']) - LOG.info('Average drop rate: {}'.format(stats['overall']['drop_rate_percent'])) + LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent']) return stats, current_traffic_config['direction-total'] @@ -723,19 +781,17 @@ class TrafficClient(object): if stats is None: return self.log_stats(stats) - LOG.info('Drop rate: {}'.format(stats['overall']['drop_rate_percent'])) + LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent']) yield stats def cancel_traffic(self): self.runner.stop() - def get_interface(self, port_index): + def get_interface(self, port_index, stats): port = self.gen.port_handle[port_index] tx, rx = 0, 0 - if not self.config.no_traffic: - stats = self.get_stats() - if port in stats: - tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts']) + if stats and port in stats: + tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts']) return Interface('traffic-generator', self.tool.lower(), tx, rx) def get_traffic_config(self): @@ -779,7 +835,7 @@ class TrafficClient(object): for direction in ['orig', 'tx', 'rx']: total[direction] = {} for unit in ['rate_percent', 'rate_bps', 'rate_pps']: - total[direction][unit] = sum(map(lambda x: float(x[direction][unit]), r.values())) + total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()]) r['direction-total'] = total return r