1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
17 from datetime import datetime
22 from attrdict import AttrDict
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex_stl_lib.api import STLError
27 # pylint: enable=import-error
30 from packet_stats import InterfaceStats
31 from packet_stats import PacketPathStats
32 from stats_collector import IntervalCollector
33 from stats_collector import IterationCollector
34 import traffic_gen.traffic_utils as utils
35 from utils import cast_integer
38 class TrafficClientException(Exception):
39 """Generic traffic client exception."""
44 class TrafficRunner(object):
45 """Serialize various steps required to run traffic."""
47 def __init__(self, client, duration_sec, interval_sec=0):
48 """Create a traffic runner."""
50 self.start_time = None
51 self.duration_sec = duration_sec
52 self.interval_sec = interval_sec
55 """Clear stats and instruct the traffic generator to start generating traffic."""
58 LOG.info('Running traffic generator')
59 self.client.gen.clear_stats()
60 self.client.gen.start_traffic()
61 self.start_time = time.time()
62 return self.poll_stats()
65 """Stop the current run and instruct the traffic generator to stop traffic."""
67 self.start_time = None
68 self.client.gen.stop_traffic()
71 """Check if a run is still pending."""
72 return self.start_time is not None
74 def time_elapsed(self):
75 """Return time elapsed since start of run."""
77 return time.time() - self.start_time
78 return self.duration_sec
81 """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
83 return: latest stats or None if traffic is stopped
85 if not self.is_running():
87 if self.client.skip_sleep():
89 return self.client.get_stats()
90 time_elapsed = self.time_elapsed()
91 if time_elapsed > self.duration_sec:
94 time_left = self.duration_sec - time_elapsed
95 if self.interval_sec > 0.0:
96 if time_left <= self.interval_sec:
100 time.sleep(self.interval_sec)
102 time.sleep(self.duration_sec)
104 return self.client.get_stats()
107 class IpBlock(object):
108 """Manage a block of IP addresses."""
110 def __init__(self, base_ip, step_ip, count_ip):
111 """Create an IP block."""
112 self.base_ip_int = Device.ip_to_int(base_ip)
113 self.step = Device.ip_to_int(step_ip)
114 self.max_available = count_ip
117 def get_ip(self, index=0):
118 """Return the IP address at given index."""
119 if index < 0 or index >= self.max_available:
120 raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
121 return Device.int_to_ip(self.base_ip_int + index * self.step)
123 def reserve_ip_range(self, count):
124 """Reserve a range of count consecutive IP addresses spaced by step."""
125 if self.next_free + count > self.max_available:
126 raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
130 first_ip = self.get_ip(self.next_free)
131 last_ip = self.get_ip(self.next_free + count - 1)
132 self.next_free += count
133 return (first_ip, last_ip)
135 def reset_reservation(self):
136 """Reset all reservations and restart with a completely unused IP block."""
140 class Device(object):
141 """Represent a port device and all information associated to it.
143 In the curent version we only support 2 port devices for the traffic generator
144 identified as port 0 or port 1.
147 def __init__(self, port, generator_config):
148 """Create a new device for a given port."""
149 self.generator_config = generator_config
150 self.chain_count = generator_config.service_chain_count
151 self.flow_count = generator_config.flow_count / 2
153 self.switch_port = generator_config.interfaces[port].get('switch_port', None)
154 self.vtep_vlan = None
155 self.vtep_src_mac = None
157 self.pci = generator_config.interfaces[port].pci
159 self.dest_macs = None
160 self.vtep_dst_mac = None
161 self.vtep_dst_ip = None
162 if generator_config.vteps is None:
163 self.vtep_src_ip = None
165 self.vtep_src_ip = generator_config.vteps[port]
168 self.ip_addrs = generator_config.ip_addrs[port]
169 subnet = IPNetwork(self.ip_addrs)
170 self.ip = subnet.ip.format()
171 self.ip_addrs_step = generator_config.ip_addrs_step
172 self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
173 self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
174 generator_config.gateway_ip_addrs_step,
176 self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
177 self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
178 generator_config.tg_gateway_ip_addrs_step,
180 self.udp_src_port = generator_config.udp_src_port
181 self.udp_dst_port = generator_config.udp_dst_port
183 def set_mac(self, mac):
184 """Set the local MAC for this port device."""
186 raise TrafficClientException('Trying to set traffic generator MAC address as None')
189 def get_peer_device(self):
190 """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
191 return self.generator_config.devices[1 - self.port]
193 def set_vtep_dst_mac(self, dest_macs):
194 """Set the list of dest MACs indexed by the chain id.
196 This is only called in 2 cases:
197 - VM macs discovered using openstack API
198 - dest MACs provisioned in config file
200 self.vtep_dst_mac = map(str, dest_macs)
202 def set_dest_macs(self, dest_macs):
203 """Set the list of dest MACs indexed by the chain id.
205 This is only called in 2 cases:
206 - VM macs discovered using openstack API
207 - dest MACs provisioned in config file
209 self.dest_macs = map(str, dest_macs)
211 def get_dest_macs(self):
212 """Get the list of dest macs for this device.
214 If set_dest_macs was never called, assumes l2-loopback and return
215 a list of peer mac (as many as chains but normally only 1 chain)
218 return self.dest_macs
219 # assume this is l2-loopback
220 return [self.get_peer_device().mac] * self.chain_count
222 def set_vlans(self, vlans):
223 """Set the list of vlans to use indexed by the chain id."""
225 LOG.info("Port %d: VLANs %s", self.port, self.vlans)
227 def set_vtep_vlan(self, vlan):
228 """Set the vtep vlan to use indexed by specific port."""
229 self.vtep_vlan = vlan
231 self.vlan_tagging = None
232 LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
234 def set_vxlan_endpoints(self, src_ip, dst_ip):
235 self.vtep_dst_ip = dst_ip
236 self.vtep_src_ip = src_ip
237 LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
238 self.vtep_src_ip, self.vtep_dst_ip)
240 def set_vxlans(self, vnis):
242 LOG.info("Port %d: VNIs %s", self.port, self.vnis)
244 def get_gw_ip(self, chain_index):
245 """Retrieve the IP address assigned for the gateway of a given chain."""
246 return self.gw_ip_block.get_ip(chain_index)
248 def get_stream_configs(self):
249 """Get the stream config for a given chain on this device.
251 Called by the traffic generator driver to program the traffic generator properly
252 before generating traffic
255 # exact flow count for each chain is calculated as follows:
256 # - all chains except the first will have the same flow count
257 # calculated as (total_flows + chain_count - 1) / chain_count
258 # - the first chain will have the remainder
259 # example 11 flows and 3 chains => 3, 4, 4
260 flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
261 cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
262 peer = self.get_peer_device()
263 self.ip_block.reset_reservation()
264 peer.ip_block.reset_reservation()
265 dest_macs = self.get_dest_macs()
267 for chain_idx in xrange(self.chain_count):
268 src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
269 dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
272 'count': cur_chain_flow_count,
274 'mac_dst': dest_macs[chain_idx],
275 'ip_src_addr': src_ip_first,
276 'ip_src_addr_max': src_ip_last,
277 'ip_src_count': cur_chain_flow_count,
278 'ip_dst_addr': dst_ip_first,
279 'ip_dst_addr_max': dst_ip_last,
280 'ip_dst_count': cur_chain_flow_count,
281 'ip_addrs_step': self.ip_addrs_step,
282 'udp_src_port': self.udp_src_port,
283 'udp_dst_port': self.udp_dst_port,
284 'mac_discovery_gw': self.get_gw_ip(chain_idx),
285 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
286 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
287 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
289 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
290 'vtep_src_mac': self.mac if self.vxlan is True else None,
291 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
292 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
293 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
294 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
296 # after first chain, fall back to the flow count for all other chains
297 cur_chain_flow_count = flows_per_chain
302 """Convert an IP address from string to numeric."""
303 return struct.unpack("!I", socket.inet_aton(addr))[0]
306 def int_to_ip(nvalue):
307 """Convert an IP address from numeric to string."""
308 return socket.inet_ntoa(struct.pack("!I", nvalue))
311 class GeneratorConfig(object):
312 """Represents traffic configuration for currently running traffic profile."""
314 DEFAULT_IP_STEP = '0.0.0.1'
315 DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
317 def __init__(self, config):
318 """Create a generator config."""
320 # name of the generator profile (normally trex or dummy)
321 # pick the default one if not specified explicitly from cli options
322 if not config.generator_profile:
323 config.generator_profile = config.traffic_generator.default_profile
324 # pick up the profile dict based on the name
325 gen_config = self.__match_generator_profile(config.traffic_generator,
326 config.generator_profile)
327 self.gen_config = gen_config
328 # copy over fields from the dict
329 self.tool = gen_config.tool
330 self.ip = gen_config.ip
331 # overrides on config.cores and config.mbuf_factor
333 self.cores = config.cores
335 self.cores = gen_config.get('cores', 1)
336 self.mbuf_factor = config.mbuf_factor
337 if gen_config.intf_speed:
338 # interface speed is overriden from config
339 self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
341 # interface speed is discovered/provided by the traffic generator
343 self.software_mode = gen_config.get('software_mode', False)
344 self.interfaces = gen_config.interfaces
345 if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
346 raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
348 self.service_chain = config.service_chain
349 self.service_chain_count = config.service_chain_count
350 self.flow_count = config.flow_count
351 self.host_name = gen_config.host_name
353 self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
354 self.ip_addrs = gen_config.ip_addrs
355 self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
356 self.tg_gateway_ip_addrs_step = \
357 gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
358 self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
359 self.gateway_ips = gen_config.gateway_ip_addrs
360 self.udp_src_port = gen_config.udp_src_port
361 self.udp_dst_port = gen_config.udp_dst_port
362 self.vteps = gen_config.get('vteps')
363 self.vnis = gen_config.get('vnis')
364 self.devices = [Device(port, self) for port in [0, 1]]
365 # This should normally always be [0, 1]
366 self.ports = [device.port for device in self.devices]
368 # check that pci is not empty
369 if not gen_config.interfaces[0].get('pci', None) or \
370 not gen_config.interfaces[1].get('pci', None):
371 raise TrafficClientException("configuration interfaces pci fields cannot be empty")
373 self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
374 self.vlan_tagging = config.vlan_tagging
376 # needed for result/summarizer
377 config['tg-name'] = gen_config.name
378 config['tg-tool'] = self.tool
381 """Get json form to display the content into the overall result dict."""
382 return dict(self.gen_config)
384 def set_dest_macs(self, port_index, dest_macs):
385 """Set the list of dest MACs indexed by the chain id on given port.
387 port_index: the port for which dest macs must be set
388 dest_macs: a list of dest MACs indexed by chain id
390 if len(dest_macs) != self.config.service_chain_count:
391 raise TrafficClientException('Dest MAC list %s must have %d entries' %
392 (dest_macs, self.config.service_chain_count))
393 self.devices[port_index].set_dest_macs(dest_macs)
394 LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
396 def set_vtep_dest_macs(self, port_index, dest_macs):
397 """Set the list of dest MACs indexed by the chain id on given port.
399 port_index: the port for which dest macs must be set
400 dest_macs: a list of dest MACs indexed by chain id
402 if len(dest_macs) != self.config.service_chain_count:
403 raise TrafficClientException('Dest MAC list %s must have %d entries' %
404 (dest_macs, self.config.service_chain_count))
405 self.devices[port_index].set_vtep_dst_mac(dest_macs)
406 LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
408 def get_dest_macs(self):
409 """Return the list of dest macs indexed by port."""
410 return [dev.get_dest_macs() for dev in self.devices]
412 def set_vlans(self, port_index, vlans):
413 """Set the list of vlans to use indexed by the chain id on given port.
415 port_index: the port for which VLANs must be set
416 vlans: a list of vlan lists indexed by chain id
418 if len(vlans) != self.config.service_chain_count:
419 raise TrafficClientException('VLAN list %s must have %d entries' %
420 (vlans, self.config.service_chain_count))
421 self.devices[port_index].set_vlans(vlans)
423 def set_vxlans(self, port_index, vxlans):
424 """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
426 port_index: the port for which VXLANs must be set
427 VXLANs: a list of VNIs lists indexed by chain id
429 if len(vxlans) != self.config.service_chain_count:
430 raise TrafficClientException('VXLAN list %s must have %d entries' %
431 (vxlans, self.config.service_chain_count))
432 self.devices[port_index].set_vxlans(vxlans)
434 def set_vtep_vlan(self, port_index, vlan):
435 """Set the vtep vlan to use indexed by the chain id on given port.
436 port_index: the port for which VLAN must be set
438 self.devices[port_index].set_vtep_vlan(vlan)
440 def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
441 self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
444 def __match_generator_profile(traffic_generator, generator_profile):
445 gen_config = AttrDict(traffic_generator)
446 gen_config.pop('default_profile')
447 gen_config.pop('generator_profile')
448 matching_profile = [profile for profile in traffic_generator.generator_profile if
449 profile.name == generator_profile]
450 if len(matching_profile) != 1:
451 raise Exception('Traffic generator profile not found: ' + generator_profile)
453 gen_config.update(matching_profile[0])
457 class TrafficClient(object):
458 """Traffic generator client with NDR/PDR binary seearch."""
462 def __init__(self, config, notifier=None):
463 """Create a new TrafficClient instance.
465 config: nfvbench config
466 notifier: notifier (optional)
468 A new instance is created everytime the nfvbench config may have changed.
471 self.generator_config = GeneratorConfig(config)
472 self.tool = self.generator_config.tool
473 self.gen = self._get_generator()
474 self.notifier = notifier
475 self.interval_collector = None
476 self.iteration_collector = None
477 self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
478 self.config.frame_sizes = self._get_frame_sizes()
480 'l2frame_size': None,
481 'duration_sec': self.config.duration_sec,
482 'bidirectional': True,
483 'rates': [] # to avoid unsbuscriptable-obj warning
485 self.current_total_rate = {'rate_percent': '10'}
486 if self.config.single_run:
487 self.current_total_rate = utils.parse_rate_str(self.config.rate)
489 # Speed is either discovered when connecting to TG or set from config
490 # This variable is 0 if not yet discovered from TG or must be the speed of
491 # each interface in bits per second
492 self.intf_speed = self.generator_config.intf_speed
494 def _get_generator(self):
495 tool = self.tool.lower()
497 from traffic_gen import trex
498 return trex.TRex(self)
500 from traffic_gen import dummy
501 return dummy.DummyTG(self)
502 raise TrafficClientException('Unsupported generator tool name:' + self.tool)
504 def skip_sleep(self):
505 """Skip all sleeps when doing unit testing with dummy TG.
507 Must be overriden using mock.patch
511 def _get_frame_sizes(self):
512 traffic_profile_name = self.config.traffic.profile
513 matching_profiles = [profile for profile in self.config.traffic_profile if
514 profile.name == traffic_profile_name]
515 if len(matching_profiles) > 1:
516 raise TrafficClientException('Multiple traffic profiles with name: ' +
517 traffic_profile_name)
518 elif not matching_profiles:
519 raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
520 return matching_profiles[0].l2frame_size
522 def start_traffic_generator(self):
523 """Start the traffic generator process (traffic not started yet)."""
525 # pick up the interface speed if it is not set from config
526 intf_speeds = self.gen.get_port_speed_gbps()
527 # convert Gbps unit into bps
528 tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
530 # interface speed is overriden from config
531 if self.intf_speed != tg_if_speed:
532 # Warn the user if the speed in the config is different
533 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
536 # interface speed not provisioned by config
537 self.intf_speed = tg_if_speed
538 # also update the speed in the tg config
539 self.generator_config.intf_speed = tg_if_speed
541 # Save the traffic generator local MAC
542 for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
546 """Set up the traffic client."""
547 self.gen.clear_stats()
549 def get_version(self):
550 """Get the traffic generator version."""
551 return self.gen.get_version()
553 def ensure_end_to_end(self):
554 """Ensure traffic generator receives packets it has transmitted.
556 This ensures end to end connectivity and also waits until VMs are ready to forward packets.
558 VMs that are started and in active state may not pass traffic yet. It is imperative to make
559 sure that all VMs are passing traffic in both directions before starting any benchmarking.
560 To verify this, we need to send at a low frequency bi-directional packets and make sure
561 that we receive all packets back from all VMs. The number of flows is equal to 2 times
562 the number of chains (1 per direction) and we need to make sure we receive packets coming
563 from exactly 2 x chain count different source MAC addresses.
566 PVP chain (1 VM per chain)
567 N = 10 (number of chains)
568 Flow count = 20 (number of flows)
569 If the number of unique source MAC addresses from received packets is 20 then
570 all 10 VMs 10 VMs are in operational state.
572 LOG.info('Starting traffic generator to ensure end-to-end connectivity')
573 # send 2pps on each chain and each direction
574 rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
575 self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
577 # ensures enough traffic is coming back
578 retry_count = (self.config.check_traffic_time_sec +
579 self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
581 # we expect to see packets coming from 2 unique MAC per chain
582 # because there can be flooding in the case of shared net
583 # we must verify that packets from the right VMs are received
584 # and not just count unique src MAC
585 # create a dict of (port, chain) tuples indexed by dest mac
587 for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
588 for chain, mac in enumerate(dest_macs):
589 mac_map[mac] = (port, chain)
590 unique_src_mac_count = len(mac_map)
591 if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
592 get_mac_id = lambda packet: packet['binary'][60:66]
593 elif self.config.vxlan:
594 get_mac_id = lambda packet: packet['binary'][56:62]
596 get_mac_id = lambda packet: packet['binary'][6:12]
597 for it in xrange(retry_count):
598 self.gen.clear_stats()
599 self.gen.start_traffic()
600 self.gen.start_capture()
601 LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
602 unique_src_mac_count - len(mac_map), unique_src_mac_count,
604 if not self.skip_sleep():
605 time.sleep(self.config.generic_poll_sec)
606 self.gen.stop_traffic()
607 self.gen.fetch_capture_packets()
608 self.gen.stop_capture()
610 for packet in self.gen.packet_list:
611 mac_id = get_mac_id(packet)
612 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
613 if src_mac in mac_map:
614 port, chain = mac_map[src_mac]
615 LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
616 src_mac, chain, port)
617 mac_map.pop(src_mac, None)
620 LOG.info('End-to-end connectivity established')
623 raise TrafficClientException('End-to-end connectivity cannot be ensured')
625 def ensure_arp_successful(self):
626 """Resolve all IP using ARP and throw an exception in case of failure."""
627 dest_macs = self.gen.resolve_arp()
629 # all dest macs are discovered, saved them into the generator config
630 if self.config.vxlan:
631 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
632 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
634 self.generator_config.set_dest_macs(0, dest_macs[0])
635 self.generator_config.set_dest_macs(1, dest_macs[1])
637 raise TrafficClientException('ARP cannot be resolved')
639 def set_traffic(self, frame_size, bidirectional):
640 """Reconfigure the traffic generator for a new frame size."""
641 self.run_config['bidirectional'] = bidirectional
642 self.run_config['l2frame_size'] = frame_size
643 self.run_config['rates'] = [self.get_per_direction_rate()]
645 self.run_config['rates'].append(self.get_per_direction_rate())
647 unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
648 if unidir_reverse_pps > 0:
649 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
650 # Fix for [NFVBENCH-67], convert the rate string to PPS
651 for idx, rate in enumerate(self.run_config['rates']):
652 if 'rate_pps' not in rate:
653 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
655 self.gen.clear_streamblock()
656 self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
658 def _modify_load(self, load):
659 self.current_total_rate = {'rate_percent': str(load)}
660 rate_per_direction = self.get_per_direction_rate()
662 self.gen.modify_rate(rate_per_direction, False)
663 self.run_config['rates'][0] = rate_per_direction
664 if self.run_config['bidirectional']:
665 self.gen.modify_rate(rate_per_direction, True)
666 self.run_config['rates'][1] = rate_per_direction
668 def get_ndr_and_pdr(self):
669 """Start the NDR/PDR iteration and return the results."""
670 dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
672 if self.config.ndr_run:
673 LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
674 targets['ndr'] = self.config.measurement.NDR
675 if self.config.pdr_run:
676 LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
677 targets['pdr'] = self.config.measurement.PDR
679 self.run_config['start_time'] = time.time()
680 self.interval_collector = IntervalCollector(self.run_config['start_time'])
681 self.interval_collector.attach_notifier(self.notifier)
682 self.iteration_collector = IterationCollector(self.run_config['start_time'])
684 self.__range_search(0.0, 200.0, targets, results)
686 results['iteration_stats'] = {
687 'ndr_pdr': self.iteration_collector.get()
690 if self.config.ndr_run:
691 LOG.info('NDR load: %s', results['ndr']['rate_percent'])
692 results['ndr']['time_taken_sec'] = \
693 results['ndr']['timestamp_sec'] - self.run_config['start_time']
694 if self.config.pdr_run:
695 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
696 results['pdr']['time_taken_sec'] = \
697 results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
699 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
700 results['pdr']['time_taken_sec'] = \
701 results['pdr']['timestamp_sec'] - self.run_config['start_time']
704 def __get_dropped_rate(self, result):
705 dropped_pkts = result['rx']['dropped_pkts']
706 total_pkts = result['tx']['total_pkts']
709 return float(dropped_pkts) / total_pkts * 100
712 """Collect final stats for previous run."""
713 stats = self.gen.get_stats()
714 retDict = {'total_tx_rate': stats['total_tx_rate']}
715 for port in self.PORTS:
716 retDict[port] = {'tx': {}, 'rx': {}}
718 tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
719 rx_keys = tx_keys + ['dropped_pkts']
721 for port in self.PORTS:
723 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
726 retDict[port]['rx'][key] = int(stats[port]['rx'][key])
728 retDict[port]['rx'][key] = 0
729 retDict[port]['rx']['avg_delay_usec'] = cast_integer(
730 stats[port]['rx']['avg_delay_usec'])
731 retDict[port]['rx']['min_delay_usec'] = cast_integer(
732 stats[port]['rx']['min_delay_usec'])
733 retDict[port]['rx']['max_delay_usec'] = cast_integer(
734 stats[port]['rx']['max_delay_usec'])
735 retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
737 ports = sorted(retDict.keys())
738 if self.run_config['bidirectional']:
739 retDict['overall'] = {'tx': {}, 'rx': {}}
741 retDict['overall']['tx'][key] = \
742 retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
744 retDict['overall']['rx'][key] = \
745 retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
746 total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
747 retDict[ports[1]]['rx']['total_pkts']]
748 avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
749 retDict[ports[1]]['rx']['avg_delay_usec']]
750 max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
751 retDict[ports[1]]['rx']['max_delay_usec']]
752 min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
753 retDict[ports[1]]['rx']['min_delay_usec']]
754 retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
755 retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
756 retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
757 for key in ['pkt_bit_rate', 'pkt_rate']:
758 for dirc in ['tx', 'rx']:
759 retDict['overall'][dirc][key] /= 2.0
761 retDict['overall'] = retDict[ports[0]]
762 retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
765 def __convert_rates(self, rate):
766 return utils.convert_rates(self.run_config['l2frame_size'],
770 def __ndr_pdr_found(self, tag, load):
771 rates = self.__convert_rates({'rate_percent': load})
772 self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
773 last_stats = self.iteration_collector.peek()
774 self.interval_collector.add_ndr_pdr(tag, last_stats)
776 def __format_output_stats(self, stats):
777 for key in self.PORTS + ['overall']:
778 interface = stats[key]
780 'tx_pkts': interface['tx']['total_pkts'],
781 'rx_pkts': interface['rx']['total_pkts'],
782 'drop_percentage': interface['drop_rate_percent'],
783 'drop_pct': interface['rx']['dropped_pkts'],
784 'avg_delay_usec': interface['rx']['avg_delay_usec'],
785 'max_delay_usec': interface['rx']['max_delay_usec'],
786 'min_delay_usec': interface['rx']['min_delay_usec'],
791 def __targets_found(self, rate, targets, results):
792 for tag, target in targets.iteritems():
793 LOG.info('Found %s (%s) load: %s', tag, target, rate)
794 self.__ndr_pdr_found(tag, rate)
795 results[tag]['timestamp_sec'] = time.time()
797 def __range_search(self, left, right, targets, results):
798 """Perform a binary search for a list of targets inside a [left..right] range or rate.
800 left the left side of the range to search as a % the line rate (100 = 100% line rate)
801 indicating the rate to send on each interface
802 right the right side of the range to search as a % of line rate
803 indicating the rate to send on each interface
804 targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
806 results a dict to store results
810 LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
812 # Terminate search when gap is less than load epsilon
813 if right - left < self.config.measurement.load_epsilon:
814 self.__targets_found(left, targets, results)
817 # Obtain the average drop rate in for middle load
818 middle = (left + right) / 2.0
820 stats, rates = self.__run_search_iteration(middle)
822 LOG.exception("Got exception from traffic generator during binary search")
823 self.__targets_found(left, targets, results)
825 # Split target dicts based on the avg drop rate
828 for tag, target in targets.iteritems():
829 if stats['overall']['drop_rate_percent'] <= target:
830 # record the best possible rate found for this target
832 results[tag].update({
833 'load_percent_per_direction': middle,
834 'stats': self.__format_output_stats(dict(stats)),
835 'timestamp_sec': None
837 right_targets[tag] = target
839 # initialize to 0 all fields of result for
840 # the worst case scenario of the binary search (if ndr/pdr is not found)
841 if tag not in results:
842 results[tag] = dict.fromkeys(rates, 0)
843 empty_stats = self.__format_output_stats(dict(stats))
844 for key in empty_stats:
845 if isinstance(empty_stats[key], dict):
846 empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
849 results[tag].update({
850 'load_percent_per_direction': 0,
851 'stats': empty_stats,
852 'timestamp_sec': None
854 left_targets[tag] = target
857 self.__range_search(left, middle, left_targets, results)
859 # search upper half only if the upper rate does not exceed
860 # 100%, this only happens when the first search at 100%
861 # yields a DR that is < target DR
863 self.__targets_found(100, right_targets, results)
865 self.__range_search(middle, right, right_targets, results)
867 def __run_search_iteration(self, rate):
868 """Run one iteration at the given rate level.
870 rate: the rate to send on each port in percent (0 to 100)
872 self._modify_load(rate)
874 # poll interval stats and collect them
875 for stats in self.run_traffic():
876 self.interval_collector.add(stats)
877 time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
878 if time_elapsed_ratio >= 1:
879 self.cancel_traffic()
880 if not self.skip_sleep():
881 time.sleep(self.config.pause_sec)
882 self.interval_collector.reset()
884 # get stats from the run
885 stats = self.runner.client.get_stats()
886 current_traffic_config = self._get_traffic_config()
887 warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
888 stats['total_tx_rate'])
889 if warning is not None:
890 stats['warning'] = warning
892 # save reliable stats from whole iteration
893 self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
894 LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
895 return stats, current_traffic_config['direction-total']
898 def log_stats(stats):
899 """Log estimated stats during run."""
901 'datetime': str(datetime.now()),
902 'tx_packets': stats['overall']['tx']['total_pkts'],
903 'rx_packets': stats['overall']['rx']['total_pkts'],
904 'drop_packets': stats['overall']['rx']['dropped_pkts'],
905 'drop_rate_percent': stats['overall']['drop_rate_percent']
907 LOG.info('TX: %(tx_packets)d; '
908 'RX: %(rx_packets)d; '
909 'Est. Dropped: %(drop_packets)d; '
910 'Est. Drop rate: %(drop_rate_percent).4f%%',
913 def run_traffic(self):
914 """Start traffic and return intermediate stats for each interval."""
915 stats = self.runner.run()
916 while self.runner.is_running:
917 self.log_stats(stats)
919 stats = self.runner.poll_stats()
922 self.log_stats(stats)
923 LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
926 def cancel_traffic(self):
930 def _get_traffic_config(self):
935 for idx, rate in enumerate(self.run_config['rates']):
936 key = 'direction-forward' if idx == 0 else 'direction-reverse'
938 'l2frame_size': self.run_config['l2frame_size'],
939 'duration_sec': self.run_config['duration_sec']
941 config[key].update(rate)
942 config[key].update(self.__convert_rates(rate))
943 load_total += float(config[key]['rate_percent'])
944 bps_total += float(config[key]['rate_bps'])
945 pps_total += float(config[key]['rate_pps'])
946 config['direction-total'] = dict(config['direction-forward'])
947 config['direction-total'].update({
948 'rate_percent': load_total,
949 'rate_pps': cast_integer(pps_total),
950 'rate_bps': bps_total
955 def get_run_config(self, results):
956 """Return configuration which was used for the last run."""
958 # because we want each direction to have the far end RX rates,
959 # use the far end index (1-idx) to retrieve the RX rates
960 for idx, key in enumerate(["direction-forward", "direction-reverse"]):
961 tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
962 rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
964 "orig": self.__convert_rates(self.run_config['rates'][idx]),
965 "tx": self.__convert_rates({'rate_pps': tx_rate}),
966 "rx": self.__convert_rates({'rate_pps': rx_rate})
970 for direction in ['orig', 'tx', 'rx']:
971 total[direction] = {}
972 for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
973 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
975 r['direction-total'] = total
978 def insert_interface_stats(self, pps_list):
979 """Insert interface stats to a list of packet path stats.
981 pps_list: a list of packet path stats instances indexed by chain index
983 This function will insert the packet path stats for the traffic gen ports 0 and 1
984 with itemized per chain tx/rx counters.
985 There will be as many packet path stats as chains.
986 Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
989 PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
990 PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
994 def get_if_stats(chain_idx):
995 return [InterfaceStats('p' + str(port), self.tool)
996 for port in range(2)]
997 # keep the list of list of interface stats indexed by the chain id
998 self.ifstats = [get_if_stats(chain_idx)
999 for chain_idx in range(self.config.service_chain_count)]
1000 # note that we need to make a copy of the ifs list so that any modification in the
1001 # list from pps will not change the list saved in self.ifstats
1002 self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1003 # insert the corresponding pps in the passed list
1004 pps_list.extend(self.pps_list)
1006 def update_interface_stats(self, diff=False):
1007 """Update all interface stats.
1009 diff: if False, simply refresh the interface stats values with latest values
1010 if True, diff the interface stats with the latest values
1011 Make sure that the interface stats inserted in insert_interface_stats() are updated
1015 [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1016 [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1021 stats = self.gen.get_stats()
1022 for chain_idx, ifs in enumerate(self.ifstats):
1023 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1024 # corresponding to the
1025 # port 0 and port 1 for the given chain_idx
1026 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1027 # interface stats for the pps because it could have been modified to contain
1028 # additional interface stats
1029 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1033 def compare_tx_rates(required, actual):
1034 """Compare the actual TX rate to the required TX rate."""
1036 are_different = False
1038 if float(actual) / required < threshold:
1039 are_different = True
1040 except ZeroDivisionError:
1041 are_different = True
1044 msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1045 "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1046 "to achieve the requested TX rate.".format(r=required, a=actual)
1052 def get_per_direction_rate(self):
1053 """Get the rate for each direction."""
1054 divisor = 2 if self.run_config['bidirectional'] else 1
1055 if 'rate_percent' in self.current_total_rate:
1056 # don't split rate if it's percentage
1059 return utils.divide_rate(self.current_total_rate, divisor)
1062 """Close this instance."""
1064 self.gen.stop_traffic()
1067 self.gen.clear_stats()