# License for the specific language governing permissions and limitations
# under the License.
+from datetime import datetime
+import re
+import socket
+import struct
+import time
+
from attrdict import AttrDict
import bitmath
-from datetime import datetime
-from log import LOG
from netaddr import IPNetwork
+# pylint: disable=import-error
+from trex_stl_lib.api import STLError
+# pylint: enable=import-error
+
+from log import LOG
from network import Interface
-import socket
from specs import ChainType
from stats_collector import IntervalCollector
from stats_collector import IterationCollector
-import struct
-import time
import traffic_gen.traffic_utils as utils
+from utils import cast_integer
class TrafficClientException(Exception):
class TrafficRunner(object):
-
def __init__(self, client, duration_sec, interval_sec=0):
self.client = client
self.start_time = None
def time_elapsed(self):
if self.is_running():
return time.time() - self.start_time
- else:
- return self.duration_sec
+ return self.duration_sec
def poll_stats(self):
if not self.is_running():
return None
+ if self.client.skip_sleep:
+ self.stop()
+ return self.client.get_stats()
time_elapsed = self.time_elapsed()
if time_elapsed > self.duration_sec:
self.stop()
return self.client.get_stats()
-class Device(object):
+class IpBlock(object):
+ def __init__(self, base_ip, step_ip, count_ip):
+ self.base_ip_int = Device.ip_to_int(base_ip)
+ self.step = Device.ip_to_int(step_ip)
+ self.max_available = count_ip
+ self.next_free = 0
+
+ def get_ip(self, index=0):
+ '''Return the IP address at given index
+ '''
+ if index < 0 or index >= self.max_available:
+ raise IndexError('Index out of bounds')
+ return Device.int_to_ip(self.base_ip_int + index * self.step)
+ def reserve_ip_range(self, count):
+ '''Reserve a range of count consecutive IP addresses spaced by step
+ '''
+ if self.next_free + count > self.max_available:
+ raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
+ (self.next_free,
+ self.max_available,
+ count))
+ first_ip = self.get_ip(self.next_free)
+ last_ip = self.get_ip(self.next_free + count - 1)
+ self.next_free += count
+ return (first_ip, last_ip)
+
+ def reset_reservation(self):
+ self.next_free = 0
+
+
+class Device(object):
def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None,
gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None,
- gateway_ip_addrs_step=None, chain_count=1, flow_count=1, vlan_tagging=False):
+ gateway_ip_addrs_step=None, udp_src_port=None, udp_dst_port=None,
+ dst_mac=None, chain_count=1, flow_count=1, vlan_tagging=False):
self.chain_count = chain_count
self.flow_count = flow_count
self.dst = None
self.vlan_tagging = vlan_tagging
self.pci = pci
self.mac = None
+ self.dst_mac = dst_mac
self.vm_mac_list = None
subnet = IPNetwork(ip)
self.ip = subnet.ip.format()
self.ip_addrs_step = ip_addrs_step
self.tg_gateway_ip_addrs_step = tg_gateway_ip_addrs_step
self.gateway_ip_addrs_step = gateway_ip_addrs_step
- self.ip_list = self.expand_ip(self.ip, self.ip_addrs_step, self.flow_count)
self.gateway_ip = gateway_ip
- self.gateway_ip_list = self.expand_ip(self.gateway_ip,
- self.gateway_ip_addrs_step,
- self.chain_count)
self.tg_gateway_ip = tg_gateway_ip
- self.tg_gateway_ip_list = self.expand_ip(self.tg_gateway_ip,
- self.tg_gateway_ip_addrs_step,
- self.chain_count)
+ self.ip_block = IpBlock(self.ip, ip_addrs_step, flow_count)
+ self.gw_ip_block = IpBlock(gateway_ip,
+ gateway_ip_addrs_step,
+ chain_count)
+ self.tg_gw_ip_block = IpBlock(tg_gateway_ip,
+ tg_gateway_ip_addrs_step,
+ chain_count)
+ self.udp_src_port = udp_src_port
+ self.udp_dst_port = udp_dst_port
def set_mac(self, mac):
if mac is None:
raise TrafficClientException('Trying to set VLAN tag as None')
self.vlan_tag = vlan_tag
+ def get_gw_ip(self, chain_index):
+ '''Retrieve the IP address assigned for the gateway of a given chain
+ '''
+ return self.gw_ip_block.get_ip(chain_index)
+
def get_stream_configs(self, service_chain):
configs = []
- flow_idx = 0
+ # exact flow count for each chain is calculated as follows:
+ # - all chains except the first will have the same flow count
+ # calculated as (total_flows + chain_count - 1) / chain_count
+ # - the first chain will have the remainder
+ # example 11 flows and 3 chains => 3, 4, 4
+ flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
+ cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
+
+ self.ip_block.reset_reservation()
+ self.dst.ip_block.reset_reservation()
+
for chain_idx in xrange(self.chain_count):
- current_flow_count = (self.flow_count - flow_idx) / (self.chain_count - chain_idx)
- max_idx = flow_idx + current_flow_count - 1
- ip_src_count = self.ip_to_int(self.ip_list[max_idx]) - \
- self.ip_to_int(self.ip_list[flow_idx]) + 1
- ip_dst_count = self.ip_to_int(self.dst.ip_list[max_idx]) - \
- self.ip_to_int(self.dst.ip_list[flow_idx]) + 1
+ src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
+ dst_ip_first, dst_ip_last = self.dst.ip_block.reserve_ip_range(cur_chain_flow_count)
+
+ dst_mac = self.dst_mac[chain_idx] if self.dst_mac is not None else self.dst.mac
+ if not re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", dst_mac.lower()):
+ raise TrafficClientException("Invalid MAC address '{mac}' specified in "
+ "mac_addrs_left/right".format(mac=dst_mac))
configs.append({
- 'count': current_flow_count,
+ 'count': cur_chain_flow_count,
'mac_src': self.mac,
- 'mac_dst': self.dst.mac if service_chain == ChainType.EXT
- else self.vm_mac_list[chain_idx],
- 'ip_src_addr': self.ip_list[flow_idx],
- 'ip_src_addr_max': self.ip_list[max_idx],
- 'ip_src_count': ip_src_count,
- 'ip_dst_addr': self.dst.ip_list[flow_idx],
- 'ip_dst_addr_max': self.dst.ip_list[max_idx],
- 'ip_dst_count': ip_dst_count,
+ 'mac_dst': dst_mac if service_chain == ChainType.EXT else self.vm_mac_list[
+ chain_idx],
+ 'ip_src_addr': src_ip_first,
+ 'ip_src_addr_max': src_ip_last,
+ 'ip_src_count': cur_chain_flow_count,
+ 'ip_dst_addr': dst_ip_first,
+ 'ip_dst_addr_max': dst_ip_last,
+ 'ip_dst_count': cur_chain_flow_count,
'ip_addrs_step': self.ip_addrs_step,
- 'mac_discovery_gw': self.gateway_ip_list[chain_idx],
- 'ip_src_tg_gw': self.tg_gateway_ip_list[chain_idx],
- 'ip_dst_tg_gw': self.dst.tg_gateway_ip_list[chain_idx],
+ 'udp_src_port': self.udp_src_port,
+ 'udp_dst_port': self.udp_dst_port,
+ 'mac_discovery_gw': self.get_gw_ip(chain_idx),
+ 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
+ 'ip_dst_tg_gw': self.dst.tg_gw_ip_block.get_ip(chain_idx),
'vlan_tag': self.vlan_tag if self.vlan_tagging else None
})
+ # after first chain, fall back to the flow count for all other chains
+ cur_chain_flow_count = flows_per_chain
- flow_idx += current_flow_count
return configs
- @classmethod
- def expand_ip(cls, ip, step_ip, count):
- if step_ip == 'random':
- # Repeatable Random will used in the stream src/dst IP pairs, but we still need
- # to expand the IP based on the number of chains and flows configured. So we use
- # "0.0.0.1" as the step to have the exact IP flow ranges for every chain.
- step_ip = '0.0.0.1'
-
- step_ip_in_int = cls.ip_to_int(step_ip)
- subnet = IPNetwork(ip)
- ip_list = []
- for _ in xrange(count):
- ip_list.append(subnet.ip.format())
- subnet = subnet.next(step_ip_in_int)
- return ip_list
+ def ip_range_overlaps(self):
+ '''Check if this device ip range is overlapping with the dst device ip range
+ '''
+ src_base_ip = Device.ip_to_int(self.ip)
+ dst_base_ip = Device.ip_to_int(self.dst.ip)
+ src_last_ip = src_base_ip + self.flow_count - 1
+ dst_last_ip = dst_base_ip + self.flow_count - 1
+ return dst_last_ip >= src_base_ip and src_last_ip >= dst_base_ip
@staticmethod
def mac_to_int(mac):
def ip_to_int(addr):
return struct.unpack("!I", socket.inet_aton(addr))[0]
+ @staticmethod
+ def int_to_ip(nvalue):
+ return socket.inet_ntoa(struct.pack("!I", nvalue))
+
class RunningTrafficProfile(object):
"""Represents traffic configuration for currently running traffic profile."""
self.src_device = None
self.dst_device = None
self.vm_mac_list = None
+ self.mac_addrs_left = generator_config.mac_addrs_left
+ self.mac_addrs_right = generator_config.mac_addrs_right
self.__prep_interfaces(generator_config)
def to_json(self):
generator_config = AttrDict(traffic_generator)
generator_config.pop('default_profile')
generator_config.pop('generator_profile')
- matching_profile = filter(lambda profile: profile.name == generator_profile,
- traffic_generator.generator_profile)
+ matching_profile = [profile for profile in traffic_generator.generator_profile if
+ profile.name == generator_profile]
if len(matching_profile) != 1:
raise Exception('Traffic generator profile not found: ' + generator_profile)
'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[0],
'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
- 'vlan_tagging': self.vlan_tagging
+ 'udp_src_port': generator_config.udp_src_port,
+ 'udp_dst_port': generator_config.udp_dst_port,
+ 'vlan_tagging': self.vlan_tagging,
+ 'dst_mac': generator_config.mac_addrs_left
}
dst_config = {
'chain_count': self.service_chain_count,
'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[1],
'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
- 'vlan_tagging': self.vlan_tagging
+ 'udp_src_port': generator_config.udp_src_port,
+ 'udp_dst_port': generator_config.udp_dst_port,
+ 'vlan_tagging': self.vlan_tagging,
+ 'dst_mac': generator_config.mac_addrs_right
}
self.src_device = Device(**dict(src_config, **generator_config.interfaces[0]))
self.dst_device.set_destination(self.src_device)
if self.service_chain == ChainType.EXT and not self.no_arp \
- and not self.__are_unique(self.src_device.ip_list, self.dst_device.ip_list):
- raise Exception('Computed IP addresses are not unique, choose different base. '
- 'Start IPs: {start}. End IPs: {end}'
- .format(start=self.src_device.ip_list,
- end=self.dst_device.ip_list))
-
- def __are_unique(self, list1, list2):
- return set(list1).isdisjoint(set(list2))
+ and self.src_device.ip_range_overlaps():
+ raise Exception('Overlapping IP address ranges src=%s dst=%d flows=%d' %
+ self.src_device.ip,
+ self.dst_device.ip,
+ self.flow_count)
@property
def devices(self):
class TrafficGeneratorFactory(object):
-
def __init__(self, config):
self.config = config
elif tool == 'dummy':
from traffic_gen import dummy
return dummy.DummyTG(self.config)
- else:
- return None
+ return None
def list_generator_profile(self):
return [profile.name for profile in self.config.traffic_generator.generator_profile]
return RunningTrafficProfile(self.config, generator_profile)
def get_matching_profile(self, traffic_profile_name):
- matching_profile = filter(lambda profile: profile.name == traffic_profile_name,
- self.config.traffic_profile)
+ matching_profile = [profile for profile in self.config.traffic_profile if
+ profile.name == traffic_profile_name]
if len(matching_profile) > 1:
raise Exception('Multiple traffic profiles with the same name found.')
- elif len(matching_profile) == 0:
+ elif not matching_profile:
raise Exception('No traffic profile found.')
return matching_profile[0]
class TrafficClient(object):
-
PORTS = [0, 1]
- def __init__(self, config, notifier=None):
+ def __init__(self, config, notifier=None, skip_sleep=False):
generator_factory = TrafficGeneratorFactory(config)
self.gen = generator_factory.get_generator_client()
self.tool = generator_factory.get_tool()
'l2frame_size': None,
'duration_sec': self.config.duration_sec,
'bidirectional': True,
- 'rates': None
+ 'rates': [] # to avoid unsbuscriptable-obj warning
}
self.current_total_rate = {'rate_percent': '10'}
if self.config.single_run:
self.current_total_rate = utils.parse_rate_str(self.config.rate)
+ # UT with dummy TG can bypass all sleeps
+ self.skip_sleep = skip_sleep
def set_macs(self):
for mac, device in zip(self.gen.get_macs(), self.config.generator_config.devices):
Ensure traffic generator receives packets it has transmitted.
This ensures end to end connectivity and also waits until VMs are ready to forward packets.
- At this point all VMs are in active state, but forwarding does not have to work.
- Small amount of traffic is sent to every chain. Then total of sent and received packets
- is compared. If ratio between received and transmitted packets is higher than (N-1)/N,
- N being number of chains, traffic flows through every chain and real measurements can be
- performed.
+ VMs that are started and in active state may not pass traffic yet. It is imperative to make
+ sure that all VMs are passing traffic in both directions before starting any benchmarking.
+ To verify this, we need to send at a low frequency bi-directional packets and make sure
+ that we receive all packets back from all VMs. The number of flows is equal to 2 times
+ the number of chains (1 per direction) and we need to make sure we receive packets coming
+ from exactly 2 x chain count different source MAC addresses.
Example:
PVP chain (1 VM per chain)
N = 10 (number of chains)
- threshold = (N-1)/N = 9/10 = 0.9 (acceptable ratio ensuring working conditions)
- if total_received/total_sent > 0.9, traffic is flowing to more than 9 VMs meaning
- all 10 VMs are in operational state.
+ Flow count = 20 (number of flows)
+ If the number of unique source MAC addresses from received packets is 20 then
+ all 10 VMs 10 VMs are in operational state.
"""
LOG.info('Starting traffic generator to ensure end-to-end connectivity')
- rate_pps = {'rate_pps': str(self.config.service_chain_count * 100)}
+ rate_pps = {'rate_pps': str(self.config.service_chain_count * 1)}
self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
# ensures enough traffic is coming back
- threshold = (self.config.service_chain_count - 1) / float(self.config.service_chain_count)
-
- for it in xrange(self.config.generic_retry_count):
+ retry_count = (self.config.check_traffic_time_sec +
+ self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
+ mac_addresses = set()
+ ln = 0
+ for it in xrange(retry_count):
self.gen.clear_stats()
self.gen.start_traffic()
- LOG.info('Waiting for packets to be received back... ({} / {})'.format(it + 1,
- self.config.generic_retry_count))
- time.sleep(self.config.generic_poll_sec)
+ self.gen.start_capture()
+ LOG.info('Waiting for packets to be received back... (%d / %d)', it + 1, retry_count)
+ if not self.skip_sleep:
+ time.sleep(self.config.generic_poll_sec)
self.gen.stop_traffic()
- stats = self.gen.get_stats()
-
- # compute total sent and received traffic on both ports
- total_rx = 0
- total_tx = 0
- for port in self.PORTS:
- total_rx += float(stats[port]['rx'].get('total_pkts', 0))
- total_tx += float(stats[port]['tx'].get('total_pkts', 0))
-
- # how much of traffic came back
- ratio = total_rx / total_tx if total_tx else 0
-
- if ratio > threshold:
- self.gen.clear_stats()
- self.gen.clear_streamblock()
- LOG.info('End-to-end connectivity ensured')
- return
-
- time.sleep(self.config.generic_poll_sec)
+ self.gen.fetch_capture_packets()
+ self.gen.stop_capture()
+
+ for packet in self.gen.packet_list:
+ mac_addresses.add(packet['binary'][6:12])
+ if ln != len(mac_addresses):
+ ln = len(mac_addresses)
+ LOG.info('Flows passing traffic %d / %d', ln,
+ self.config.service_chain_count * 2)
+ if len(mac_addresses) == self.config.service_chain_count * 2:
+ LOG.info('End-to-end connectivity ensured')
+ return
+
+ if not self.skip_sleep:
+ time.sleep(self.config.generic_poll_sec)
raise TrafficClientException('End-to-end connectivity cannot be ensured')
unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
if unidir_reverse_pps > 0:
self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
+ # Fix for [NFVBENCH-67], convert the rate string to PPS
+ for idx, rate in enumerate(self.run_config['rates']):
+ if 'rate_pps' not in rate:
+ self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
self.gen.clear_streamblock()
self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
total_pkts = result['tx']['total_pkts']
if not total_pkts:
return float('inf')
- else:
- return float(dropped_pkts) / total_pkts * 100
+ return float(dropped_pkts) / total_pkts * 100
def get_stats(self):
stats = self.gen.get_stats()
retDict[port]['rx'][key] = int(stats[port]['rx'][key])
except ValueError:
retDict[port]['rx'][key] = 0
- retDict[port]['rx']['avg_delay_usec'] = float(stats[port]['rx']['avg_delay_usec'])
- retDict[port]['rx']['min_delay_usec'] = float(stats[port]['rx']['min_delay_usec'])
- retDict[port]['rx']['max_delay_usec'] = float(stats[port]['rx']['max_delay_usec'])
+ retDict[port]['rx']['avg_delay_usec'] = cast_integer(
+ stats[port]['rx']['avg_delay_usec'])
+ retDict[port]['rx']['min_delay_usec'] = cast_integer(
+ stats[port]['rx']['min_delay_usec'])
+ retDict[port]['rx']['max_delay_usec'] = cast_integer(
+ stats[port]['rx']['max_delay_usec'])
retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
ports = sorted(retDict.keys())
self.interval_collector.add_ndr_pdr(tag, last_stats)
def __format_output_stats(self, stats):
- for key in (self.PORTS + ['overall']):
+ for key in self.PORTS + ['overall']:
interface = stats[key]
stats[key] = {
'tx_pkts': interface['tx']['total_pkts'],
def __targets_found(self, rate, targets, results):
for tag, target in targets.iteritems():
- LOG.info('Found {} ({}) load: {}'.format(tag, target, rate))
+ LOG.info('Found %s (%s) load: %s', tag, target, rate)
self.__ndr_pdr_found(tag, rate)
results[tag]['timestamp_sec'] = time.time()
indicating the rate to send on each interface
right the right side of the range to search as a % of line rate
indicating the rate to send on each interface
- targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag" ('ndr', 'pdr')
+ targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
+ ('ndr', 'pdr')
results a dict to store results
'''
- if len(targets) == 0:
+ if not targets:
return
- LOG.info('Range search [{} .. {}] targets: {}'.format(left, right, targets))
+ LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
# Terminate search when gap is less than load epsilon
if right - left < self.config.measurement.load_epsilon:
# Obtain the average drop rate in for middle load
middle = (left + right) / 2.0
- stats, rates = self.__run_search_iteration(middle)
-
+ try:
+ stats, rates = self.__run_search_iteration(middle)
+ except STLError:
+ LOG.exception("Got exception from traffic generator during binary search")
+ self.__targets_found(left, targets, results)
+ return
# Split target dicts based on the avg drop rate
left_targets = {}
right_targets = {}
})
right_targets[tag] = target
else:
+ # initialize to 0 all fields of result for
+ # the worst case scenario of the binary search (if ndr/pdr is not found)
+ if tag not in results:
+ results[tag] = dict.fromkeys(rates, 0)
+ empty_stats = self.__format_output_stats(dict(stats))
+ for key in empty_stats:
+ if isinstance(empty_stats[key], dict):
+ empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
+ else:
+ empty_stats[key] = 0
+ results[tag].update({
+ 'load_percent_per_direction': 0,
+ 'stats': empty_stats,
+ 'timestamp_sec': None
+ })
left_targets[tag] = target
# search lower half
# save reliable stats from whole iteration
self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
- LOG.info('Average drop rate: {}'.format(stats['overall']['drop_rate_percent']))
+ LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
return stats, current_traffic_config['direction-total']
if stats is None:
return
self.log_stats(stats)
- LOG.info('Drop rate: {}'.format(stats['overall']['drop_rate_percent']))
+ LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
yield stats
def cancel_traffic(self):
self.runner.stop()
- def get_interface(self, port_index):
+ def get_interface(self, port_index, stats):
port = self.gen.port_handle[port_index]
tx, rx = 0, 0
- if not self.config.no_traffic:
- stats = self.get_stats()
- if port in stats:
- tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts'])
+ if stats and port in stats:
+ tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts'])
return Interface('traffic-generator', self.tool.lower(), tx, rx)
def get_traffic_config(self):
config['direction-total'] = dict(config['direction-forward'])
config['direction-total'].update({
'rate_percent': load_total,
- 'rate_pps': pps_total,
+ 'rate_pps': cast_integer(pps_total),
'rate_bps': bps_total
})
for direction in ['orig', 'tx', 'rx']:
total[direction] = {}
for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
- total[direction][unit] = sum(map(lambda x: float(x[direction][unit]), r.values()))
+ total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
r['direction-total'] = total
return r