1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
17 from datetime import datetime
22 from attrdict import AttrDict
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex_stl_lib.api import STLError
27 # pylint: enable=import-error
30 from packet_stats import InterfaceStats
31 from packet_stats import PacketPathStats
32 from stats_collector import IntervalCollector
33 from stats_collector import IterationCollector
34 import traffic_gen.traffic_utils as utils
35 from utils import cast_integer
38 class TrafficClientException(Exception):
39 """Generic traffic client exception."""
44 class TrafficRunner(object):
45 """Serialize various steps required to run traffic."""
47 def __init__(self, client, duration_sec, interval_sec=0):
48 """Create a traffic runner."""
50 self.start_time = None
51 self.duration_sec = duration_sec
52 self.interval_sec = interval_sec
55 """Clear stats and instruct the traffic generator to start generating traffic."""
58 LOG.info('Running traffic generator')
59 self.client.gen.clear_stats()
60 self.client.gen.start_traffic()
61 self.start_time = time.time()
62 return self.poll_stats()
65 """Stop the current run and instruct the traffic generator to stop traffic."""
67 self.start_time = None
68 self.client.gen.stop_traffic()
71 """Check if a run is still pending."""
72 return self.start_time is not None
74 def time_elapsed(self):
75 """Return time elapsed since start of run."""
77 return time.time() - self.start_time
78 return self.duration_sec
81 """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
83 return: latest stats or None if traffic is stopped
85 if not self.is_running():
87 if self.client.skip_sleep():
89 return self.client.get_stats()
90 time_elapsed = self.time_elapsed()
91 if time_elapsed > self.duration_sec:
94 time_left = self.duration_sec - time_elapsed
95 if self.interval_sec > 0.0:
96 if time_left <= self.interval_sec:
100 time.sleep(self.interval_sec)
102 time.sleep(self.duration_sec)
104 return self.client.get_stats()
107 class IpBlock(object):
108 """Manage a block of IP addresses."""
110 def __init__(self, base_ip, step_ip, count_ip):
111 """Create an IP block."""
112 self.base_ip_int = Device.ip_to_int(base_ip)
113 self.step = Device.ip_to_int(step_ip)
114 self.max_available = count_ip
117 def get_ip(self, index=0):
118 """Return the IP address at given index."""
119 if index < 0 or index >= self.max_available:
120 raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
121 return Device.int_to_ip(self.base_ip_int + index * self.step)
123 def reserve_ip_range(self, count):
124 """Reserve a range of count consecutive IP addresses spaced by step."""
125 if self.next_free + count > self.max_available:
126 raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
130 first_ip = self.get_ip(self.next_free)
131 last_ip = self.get_ip(self.next_free + count - 1)
132 self.next_free += count
133 return (first_ip, last_ip)
135 def reset_reservation(self):
136 """Reset all reservations and restart with a completely unused IP block."""
140 class Device(object):
141 """Represent a port device and all information associated to it.
143 In the curent version we only support 2 port devices for the traffic generator
144 identified as port 0 or port 1.
147 def __init__(self, port, generator_config, vtep_vlan=None):
148 """Create a new device for a given port."""
149 self.generator_config = generator_config
150 self.chain_count = generator_config.service_chain_count
151 self.flow_count = generator_config.flow_count / 2
153 self.switch_port = generator_config.interfaces[port].get('switch_port', None)
154 self.vtep_vlan = vtep_vlan
155 self.pci = generator_config.interfaces[port].pci
157 self.dest_macs = None
159 self.ip_addrs = generator_config.ip_addrs[port]
160 subnet = IPNetwork(self.ip_addrs)
161 self.ip = subnet.ip.format()
162 self.ip_addrs_step = generator_config.ip_addrs_step
163 self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
164 self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
165 generator_config.gateway_ip_addrs_step,
167 self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
168 self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
169 generator_config.tg_gateway_ip_addrs_step,
171 self.udp_src_port = generator_config.udp_src_port
172 self.udp_dst_port = generator_config.udp_dst_port
174 def set_mac(self, mac):
175 """Set the local MAC for this port device."""
177 raise TrafficClientException('Trying to set traffic generator MAC address as None')
180 def get_peer_device(self):
181 """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
182 return self.generator_config.devices[1 - self.port]
184 def set_dest_macs(self, dest_macs):
185 """Set the list of dest MACs indexed by the chain id.
187 This is only called in 2 cases:
188 - VM macs discovered using openstack API
189 - dest MACs provisioned in config file
191 self.dest_macs = map(str, dest_macs)
193 def get_dest_macs(self):
194 """Get the list of dest macs for this device.
196 If set_dest_macs was never called, assumes l2-loopback and return
197 a list of peer mac (as many as chains but normally only 1 chain)
200 return self.dest_macs
201 # assume this is l2-loopback
202 return [self.get_peer_device().mac] * self.chain_count
204 def set_vlans(self, vlans):
205 """Set the list of vlans to use indexed by the chain id."""
207 LOG.info("Port %d: VLANs %s", self.port, self.vlans)
209 def get_gw_ip(self, chain_index):
210 """Retrieve the IP address assigned for the gateway of a given chain."""
211 return self.gw_ip_block.get_ip(chain_index)
213 def get_stream_configs(self):
214 """Get the stream config for a given chain on this device.
216 Called by the traffic generator driver to program the traffic generator properly
217 before generating traffic
220 # exact flow count for each chain is calculated as follows:
221 # - all chains except the first will have the same flow count
222 # calculated as (total_flows + chain_count - 1) / chain_count
223 # - the first chain will have the remainder
224 # example 11 flows and 3 chains => 3, 4, 4
225 flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
226 cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
227 peer = self.get_peer_device()
228 self.ip_block.reset_reservation()
229 peer.ip_block.reset_reservation()
230 dest_macs = self.get_dest_macs()
232 for chain_idx in xrange(self.chain_count):
233 src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
234 dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
237 'count': cur_chain_flow_count,
239 'mac_dst': dest_macs[chain_idx],
240 'ip_src_addr': src_ip_first,
241 'ip_src_addr_max': src_ip_last,
242 'ip_src_count': cur_chain_flow_count,
243 'ip_dst_addr': dst_ip_first,
244 'ip_dst_addr_max': dst_ip_last,
245 'ip_dst_count': cur_chain_flow_count,
246 'ip_addrs_step': self.ip_addrs_step,
247 'udp_src_port': self.udp_src_port,
248 'udp_dst_port': self.udp_dst_port,
249 'mac_discovery_gw': self.get_gw_ip(chain_idx),
250 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
251 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
252 'vlan_tag': self.vlans[chain_idx] if self.vlans else None
254 # after first chain, fall back to the flow count for all other chains
255 cur_chain_flow_count = flows_per_chain
260 """Convert an IP address from string to numeric."""
261 return struct.unpack("!I", socket.inet_aton(addr))[0]
264 def int_to_ip(nvalue):
265 """Convert an IP address from numeric to string."""
266 return socket.inet_ntoa(struct.pack("!I", nvalue))
269 class GeneratorConfig(object):
270 """Represents traffic configuration for currently running traffic profile."""
272 DEFAULT_IP_STEP = '0.0.0.1'
273 DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
275 def __init__(self, config):
276 """Create a generator config."""
278 # name of the generator profile (normally trex or dummy)
279 # pick the default one if not specified explicitly from cli options
280 if not config.generator_profile:
281 config.generator_profile = config.traffic_generator.default_profile
282 # pick up the profile dict based on the name
283 gen_config = self.__match_generator_profile(config.traffic_generator,
284 config.generator_profile)
285 self.gen_config = gen_config
286 # copy over fields from the dict
287 self.tool = gen_config.tool
288 self.ip = gen_config.ip
289 self.cores = gen_config.get('cores', 1)
290 if gen_config.intf_speed:
291 # interface speed is overriden from config
292 self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
294 # interface speed is discovered/provided by the traffic generator
296 self.software_mode = gen_config.get('software_mode', False)
297 self.interfaces = gen_config.interfaces
298 if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
299 raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
301 self.service_chain = config.service_chain
302 self.service_chain_count = config.service_chain_count
303 self.flow_count = config.flow_count
304 self.host_name = gen_config.host_name
306 self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
307 self.ip_addrs = gen_config.ip_addrs
308 self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
309 self.tg_gateway_ip_addrs_step = \
310 gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
311 self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
312 self.gateway_ips = gen_config.gateway_ip_addrs
313 self.udp_src_port = gen_config.udp_src_port
314 self.udp_dst_port = gen_config.udp_dst_port
315 self.devices = [Device(port, self) for port in [0, 1]]
316 # This should normally always be [0, 1]
317 self.ports = [device.port for device in self.devices]
319 # check that pci is not empty
320 if not gen_config.interfaces[0].get('pci', None) or \
321 not gen_config.interfaces[1].get('pci', None):
322 raise TrafficClientException("configuration interfaces pci fields cannot be empty")
324 self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
325 self.vlan_tagging = config.vlan_tagging
327 # needed for result/summarizer
328 config['tg-name'] = gen_config.name
329 config['tg-tool'] = self.tool
332 """Get json form to display the content into the overall result dict."""
333 return dict(self.gen_config)
335 def set_dest_macs(self, port_index, dest_macs):
336 """Set the list of dest MACs indexed by the chain id on given port.
338 port_index: the port for which dest macs must be set
339 dest_macs: a list of dest MACs indexed by chain id
341 if len(dest_macs) != self.config.service_chain_count:
342 raise TrafficClientException('Dest MAC list %s must have %d entries' %
343 (dest_macs, self.config.service_chain_count))
344 self.devices[port_index].set_dest_macs(dest_macs)
345 LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
347 def get_dest_macs(self):
348 """Return the list of dest macs indexed by port."""
349 return [dev.get_dest_macs() for dev in self.devices]
351 def set_vlans(self, port_index, vlans):
352 """Set the list of vlans to use indexed by the chain id on given port.
354 port_index: the port for which VLANs must be set
355 vlans: a list of vlan lists indexed by chain id
357 if len(vlans) != self.config.service_chain_count:
358 raise TrafficClientException('VLAN list %s must have %d entries' %
359 (vlans, self.config.service_chain_count))
360 self.devices[port_index].set_vlans(vlans)
363 def __match_generator_profile(traffic_generator, generator_profile):
364 gen_config = AttrDict(traffic_generator)
365 gen_config.pop('default_profile')
366 gen_config.pop('generator_profile')
367 matching_profile = [profile for profile in traffic_generator.generator_profile if
368 profile.name == generator_profile]
369 if len(matching_profile) != 1:
370 raise Exception('Traffic generator profile not found: ' + generator_profile)
372 gen_config.update(matching_profile[0])
376 class TrafficClient(object):
377 """Traffic generator client with NDR/PDR binary seearch."""
381 def __init__(self, config, notifier=None):
382 """Create a new TrafficClient instance.
384 config: nfvbench config
385 notifier: notifier (optional)
387 A new instance is created everytime the nfvbench config may have changed.
390 self.generator_config = GeneratorConfig(config)
391 self.tool = self.generator_config.tool
392 self.gen = self._get_generator()
393 self.notifier = notifier
394 self.interval_collector = None
395 self.iteration_collector = None
396 self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
397 self.config.frame_sizes = self._get_frame_sizes()
399 'l2frame_size': None,
400 'duration_sec': self.config.duration_sec,
401 'bidirectional': True,
402 'rates': [] # to avoid unsbuscriptable-obj warning
404 self.current_total_rate = {'rate_percent': '10'}
405 if self.config.single_run:
406 self.current_total_rate = utils.parse_rate_str(self.config.rate)
408 # Speed is either discovered when connecting to TG or set from config
409 # This variable is 0 if not yet discovered from TG or must be the speed of
410 # each interface in bits per second
411 self.intf_speed = self.generator_config.intf_speed
413 def _get_generator(self):
414 tool = self.tool.lower()
416 from traffic_gen import trex
417 return trex.TRex(self)
419 from traffic_gen import dummy
420 return dummy.DummyTG(self)
421 raise TrafficClientException('Unsupported generator tool name:' + self.tool)
423 def skip_sleep(self):
424 """Skip all sleeps when doing unit testing with dummy TG.
426 Must be overriden using mock.patch
430 def _get_frame_sizes(self):
431 traffic_profile_name = self.config.traffic.profile
432 matching_profiles = [profile for profile in self.config.traffic_profile if
433 profile.name == traffic_profile_name]
434 if len(matching_profiles) > 1:
435 raise TrafficClientException('Multiple traffic profiles with name: ' +
436 traffic_profile_name)
437 elif not matching_profiles:
438 raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
439 return matching_profiles[0].l2frame_size
441 def start_traffic_generator(self):
442 """Start the traffic generator process (traffic not started yet)."""
444 # pick up the interface speed if it is not set from config
445 intf_speeds = self.gen.get_port_speed_gbps()
446 # convert Gbps unit into bps
447 tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
449 # interface speed is overriden from config
450 if self.intf_speed != tg_if_speed:
451 # Warn the user if the speed in the config is different
452 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
455 # interface speed not provisioned by config
456 self.intf_speed = tg_if_speed
457 # also update the speed in the tg config
458 self.generator_config.intf_speed = tg_if_speed
460 # Save the traffic generator local MAC
461 for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
465 """Set up the traffic client."""
466 self.gen.clear_stats()
468 def get_version(self):
469 """Get the traffic generator version."""
470 return self.gen.get_version()
472 def ensure_end_to_end(self):
473 """Ensure traffic generator receives packets it has transmitted.
475 This ensures end to end connectivity and also waits until VMs are ready to forward packets.
477 VMs that are started and in active state may not pass traffic yet. It is imperative to make
478 sure that all VMs are passing traffic in both directions before starting any benchmarking.
479 To verify this, we need to send at a low frequency bi-directional packets and make sure
480 that we receive all packets back from all VMs. The number of flows is equal to 2 times
481 the number of chains (1 per direction) and we need to make sure we receive packets coming
482 from exactly 2 x chain count different source MAC addresses.
485 PVP chain (1 VM per chain)
486 N = 10 (number of chains)
487 Flow count = 20 (number of flows)
488 If the number of unique source MAC addresses from received packets is 20 then
489 all 10 VMs 10 VMs are in operational state.
491 LOG.info('Starting traffic generator to ensure end-to-end connectivity')
492 # send 2pps on each chain and each direction
493 rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
494 self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
496 # ensures enough traffic is coming back
497 retry_count = (self.config.check_traffic_time_sec +
498 self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
500 # we expect to see packets coming from 2 unique MAC per chain
501 # because there can be flooding in the case of shared net
502 # we must verify that packets from the right VMs are received
503 # and not just count unique src MAC
504 # create a dict of (port, chain) tuples indexed by dest mac
506 for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
507 for chain, mac in enumerate(dest_macs):
508 mac_map[mac] = (port, chain)
509 unique_src_mac_count = len(mac_map)
510 for it in xrange(retry_count):
511 self.gen.clear_stats()
512 self.gen.start_traffic()
513 self.gen.start_capture()
514 LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
515 unique_src_mac_count - len(mac_map), unique_src_mac_count,
517 if not self.skip_sleep():
518 time.sleep(self.config.generic_poll_sec)
519 self.gen.stop_traffic()
520 self.gen.fetch_capture_packets()
521 self.gen.stop_capture()
523 for packet in self.gen.packet_list:
524 src_mac = packet['binary'][6:12]
525 src_mac = ':'.join(["%02x" % ord(x) for x in src_mac])
526 if src_mac in mac_map:
527 port, chain = mac_map[src_mac]
528 LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
529 src_mac, chain, port)
530 mac_map.pop(src_mac, None)
533 LOG.info('End-to-end connectivity established')
536 raise TrafficClientException('End-to-end connectivity cannot be ensured')
538 def ensure_arp_successful(self):
539 """Resolve all IP using ARP and throw an exception in case of failure."""
540 dest_macs = self.gen.resolve_arp()
542 # all dest macs are discovered, saved them into the generator config
543 self.generator_config.set_dest_macs(0, dest_macs[0])
544 self.generator_config.set_dest_macs(1, dest_macs[1])
546 raise TrafficClientException('ARP cannot be resolved')
548 def set_traffic(self, frame_size, bidirectional):
549 """Reconfigure the traffic generator for a new frame size."""
550 self.run_config['bidirectional'] = bidirectional
551 self.run_config['l2frame_size'] = frame_size
552 self.run_config['rates'] = [self.get_per_direction_rate()]
554 self.run_config['rates'].append(self.get_per_direction_rate())
556 unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
557 if unidir_reverse_pps > 0:
558 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
559 # Fix for [NFVBENCH-67], convert the rate string to PPS
560 for idx, rate in enumerate(self.run_config['rates']):
561 if 'rate_pps' not in rate:
562 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
564 self.gen.clear_streamblock()
565 self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
567 def _modify_load(self, load):
568 self.current_total_rate = {'rate_percent': str(load)}
569 rate_per_direction = self.get_per_direction_rate()
571 self.gen.modify_rate(rate_per_direction, False)
572 self.run_config['rates'][0] = rate_per_direction
573 if self.run_config['bidirectional']:
574 self.gen.modify_rate(rate_per_direction, True)
575 self.run_config['rates'][1] = rate_per_direction
577 def get_ndr_and_pdr(self):
578 """Start the NDR/PDR iteration and return the results."""
579 dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
581 if self.config.ndr_run:
582 LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
583 targets['ndr'] = self.config.measurement.NDR
584 if self.config.pdr_run:
585 LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
586 targets['pdr'] = self.config.measurement.PDR
588 self.run_config['start_time'] = time.time()
589 self.interval_collector = IntervalCollector(self.run_config['start_time'])
590 self.interval_collector.attach_notifier(self.notifier)
591 self.iteration_collector = IterationCollector(self.run_config['start_time'])
593 self.__range_search(0.0, 200.0, targets, results)
595 results['iteration_stats'] = {
596 'ndr_pdr': self.iteration_collector.get()
599 if self.config.ndr_run:
600 LOG.info('NDR load: %s', results['ndr']['rate_percent'])
601 results['ndr']['time_taken_sec'] = \
602 results['ndr']['timestamp_sec'] - self.run_config['start_time']
603 if self.config.pdr_run:
604 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
605 results['pdr']['time_taken_sec'] = \
606 results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
608 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
609 results['pdr']['time_taken_sec'] = \
610 results['pdr']['timestamp_sec'] - self.run_config['start_time']
613 def __get_dropped_rate(self, result):
614 dropped_pkts = result['rx']['dropped_pkts']
615 total_pkts = result['tx']['total_pkts']
618 return float(dropped_pkts) / total_pkts * 100
621 """Collect final stats for previous run."""
622 stats = self.gen.get_stats()
623 retDict = {'total_tx_rate': stats['total_tx_rate']}
624 for port in self.PORTS:
625 retDict[port] = {'tx': {}, 'rx': {}}
627 tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
628 rx_keys = tx_keys + ['dropped_pkts']
630 for port in self.PORTS:
632 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
635 retDict[port]['rx'][key] = int(stats[port]['rx'][key])
637 retDict[port]['rx'][key] = 0
638 retDict[port]['rx']['avg_delay_usec'] = cast_integer(
639 stats[port]['rx']['avg_delay_usec'])
640 retDict[port]['rx']['min_delay_usec'] = cast_integer(
641 stats[port]['rx']['min_delay_usec'])
642 retDict[port]['rx']['max_delay_usec'] = cast_integer(
643 stats[port]['rx']['max_delay_usec'])
644 retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
646 ports = sorted(retDict.keys())
647 if self.run_config['bidirectional']:
648 retDict['overall'] = {'tx': {}, 'rx': {}}
650 retDict['overall']['tx'][key] = \
651 retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
653 retDict['overall']['rx'][key] = \
654 retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
655 total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
656 retDict[ports[1]]['rx']['total_pkts']]
657 avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
658 retDict[ports[1]]['rx']['avg_delay_usec']]
659 max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
660 retDict[ports[1]]['rx']['max_delay_usec']]
661 min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
662 retDict[ports[1]]['rx']['min_delay_usec']]
663 retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
664 retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
665 retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
666 for key in ['pkt_bit_rate', 'pkt_rate']:
667 for dirc in ['tx', 'rx']:
668 retDict['overall'][dirc][key] /= 2.0
670 retDict['overall'] = retDict[ports[0]]
671 retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
674 def __convert_rates(self, rate):
675 return utils.convert_rates(self.run_config['l2frame_size'],
679 def __ndr_pdr_found(self, tag, load):
680 rates = self.__convert_rates({'rate_percent': load})
681 self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
682 last_stats = self.iteration_collector.peek()
683 self.interval_collector.add_ndr_pdr(tag, last_stats)
685 def __format_output_stats(self, stats):
686 for key in self.PORTS + ['overall']:
687 interface = stats[key]
689 'tx_pkts': interface['tx']['total_pkts'],
690 'rx_pkts': interface['rx']['total_pkts'],
691 'drop_percentage': interface['drop_rate_percent'],
692 'drop_pct': interface['rx']['dropped_pkts'],
693 'avg_delay_usec': interface['rx']['avg_delay_usec'],
694 'max_delay_usec': interface['rx']['max_delay_usec'],
695 'min_delay_usec': interface['rx']['min_delay_usec'],
700 def __targets_found(self, rate, targets, results):
701 for tag, target in targets.iteritems():
702 LOG.info('Found %s (%s) load: %s', tag, target, rate)
703 self.__ndr_pdr_found(tag, rate)
704 results[tag]['timestamp_sec'] = time.time()
706 def __range_search(self, left, right, targets, results):
707 """Perform a binary search for a list of targets inside a [left..right] range or rate.
709 left the left side of the range to search as a % the line rate (100 = 100% line rate)
710 indicating the rate to send on each interface
711 right the right side of the range to search as a % of line rate
712 indicating the rate to send on each interface
713 targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
715 results a dict to store results
719 LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
721 # Terminate search when gap is less than load epsilon
722 if right - left < self.config.measurement.load_epsilon:
723 self.__targets_found(left, targets, results)
726 # Obtain the average drop rate in for middle load
727 middle = (left + right) / 2.0
729 stats, rates = self.__run_search_iteration(middle)
731 LOG.exception("Got exception from traffic generator during binary search")
732 self.__targets_found(left, targets, results)
734 # Split target dicts based on the avg drop rate
737 for tag, target in targets.iteritems():
738 if stats['overall']['drop_rate_percent'] <= target:
739 # record the best possible rate found for this target
741 results[tag].update({
742 'load_percent_per_direction': middle,
743 'stats': self.__format_output_stats(dict(stats)),
744 'timestamp_sec': None
746 right_targets[tag] = target
748 # initialize to 0 all fields of result for
749 # the worst case scenario of the binary search (if ndr/pdr is not found)
750 if tag not in results:
751 results[tag] = dict.fromkeys(rates, 0)
752 empty_stats = self.__format_output_stats(dict(stats))
753 for key in empty_stats:
754 if isinstance(empty_stats[key], dict):
755 empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
758 results[tag].update({
759 'load_percent_per_direction': 0,
760 'stats': empty_stats,
761 'timestamp_sec': None
763 left_targets[tag] = target
766 self.__range_search(left, middle, left_targets, results)
768 # search upper half only if the upper rate does not exceed
769 # 100%, this only happens when the first search at 100%
770 # yields a DR that is < target DR
772 self.__targets_found(100, right_targets, results)
774 self.__range_search(middle, right, right_targets, results)
776 def __run_search_iteration(self, rate):
777 """Run one iteration at the given rate level.
779 rate: the rate to send on each port in percent (0 to 100)
781 self._modify_load(rate)
783 # poll interval stats and collect them
784 for stats in self.run_traffic():
785 self.interval_collector.add(stats)
786 time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
787 if time_elapsed_ratio >= 1:
788 self.cancel_traffic()
789 if not self.skip_sleep():
790 time.sleep(self.config.pause_sec)
791 self.interval_collector.reset()
793 # get stats from the run
794 stats = self.runner.client.get_stats()
795 current_traffic_config = self._get_traffic_config()
796 warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
797 stats['total_tx_rate'])
798 if warning is not None:
799 stats['warning'] = warning
801 # save reliable stats from whole iteration
802 self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
803 LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
804 return stats, current_traffic_config['direction-total']
807 def log_stats(stats):
808 """Log estimated stats during run."""
810 'datetime': str(datetime.now()),
811 'tx_packets': stats['overall']['tx']['total_pkts'],
812 'rx_packets': stats['overall']['rx']['total_pkts'],
813 'drop_packets': stats['overall']['rx']['dropped_pkts'],
814 'drop_rate_percent': stats['overall']['drop_rate_percent']
816 LOG.info('TX: %(tx_packets)d; '
817 'RX: %(rx_packets)d; '
818 'Est. Dropped: %(drop_packets)d; '
819 'Est. Drop rate: %(drop_rate_percent).4f%%',
822 def run_traffic(self):
823 """Start traffic and return intermediate stats for each interval."""
824 stats = self.runner.run()
825 while self.runner.is_running:
826 self.log_stats(stats)
828 stats = self.runner.poll_stats()
831 self.log_stats(stats)
832 LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
835 def cancel_traffic(self):
839 def _get_traffic_config(self):
844 for idx, rate in enumerate(self.run_config['rates']):
845 key = 'direction-forward' if idx == 0 else 'direction-reverse'
847 'l2frame_size': self.run_config['l2frame_size'],
848 'duration_sec': self.run_config['duration_sec']
850 config[key].update(rate)
851 config[key].update(self.__convert_rates(rate))
852 load_total += float(config[key]['rate_percent'])
853 bps_total += float(config[key]['rate_bps'])
854 pps_total += float(config[key]['rate_pps'])
855 config['direction-total'] = dict(config['direction-forward'])
856 config['direction-total'].update({
857 'rate_percent': load_total,
858 'rate_pps': cast_integer(pps_total),
859 'rate_bps': bps_total
864 def get_run_config(self, results):
865 """Return configuration which was used for the last run."""
867 # because we want each direction to have the far end RX rates,
868 # use the far end index (1-idx) to retrieve the RX rates
869 for idx, key in enumerate(["direction-forward", "direction-reverse"]):
870 tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
871 rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
873 "orig": self.__convert_rates(self.run_config['rates'][idx]),
874 "tx": self.__convert_rates({'rate_pps': tx_rate}),
875 "rx": self.__convert_rates({'rate_pps': rx_rate})
879 for direction in ['orig', 'tx', 'rx']:
880 total[direction] = {}
881 for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
882 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
884 r['direction-total'] = total
887 def insert_interface_stats(self, pps_list):
888 """Insert interface stats to a list of packet path stats.
890 pps_list: a list of packet path stats instances indexed by chain index
892 This function will insert the packet path stats for the traffic gen ports 0 and 1
893 with itemized per chain tx/rx counters.
894 There will be as many packet path stats as chains.
895 Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
898 PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
899 PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
903 def get_if_stats(chain_idx):
904 return [InterfaceStats('p' + str(port), self.tool)
905 for port in range(2)]
906 # keep the list of list of interface stats indexed by the chain id
907 self.ifstats = [get_if_stats(chain_idx)
908 for chain_idx in range(self.config.service_chain_count)]
909 # note that we need to make a copy of the ifs list so that any modification in the
910 # list from pps will not change the list saved in self.ifstats
911 self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
912 # insert the corresponding pps in the passed list
913 pps_list.extend(self.pps_list)
915 def update_interface_stats(self, diff=False):
916 """Update all interface stats.
918 diff: if False, simply refresh the interface stats values with latest values
919 if True, diff the interface stats with the latest values
920 Make sure that the interface stats inserted in insert_interface_stats() are updated
924 [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
925 [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
930 stats = self.gen.get_stats()
931 for chain_idx, ifs in enumerate(self.ifstats):
932 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
933 # corresponding to the
934 # port 0 and port 1 for the given chain_idx
935 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
936 # interface stats for the pps because it could have been modified to contain
937 # additional interface stats
938 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
942 def compare_tx_rates(required, actual):
943 """Compare the actual TX rate to the required TX rate."""
945 are_different = False
947 if float(actual) / required < threshold:
949 except ZeroDivisionError:
953 msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
954 "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
955 "to achieve the requested TX rate.".format(r=required, a=actual)
961 def get_per_direction_rate(self):
962 """Get the rate for each direction."""
963 divisor = 2 if self.run_config['bidirectional'] else 1
964 if 'rate_percent' in self.current_total_rate:
965 # don't split rate if it's percentage
968 return utils.divide_rate(self.current_total_rate, divisor)
971 """Close this instance."""
973 self.gen.stop_traffic()
976 self.gen.clear_stats()