1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
17 from datetime import datetime
22 from attrdict import AttrDict
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex_stl_lib.api import STLError
27 # pylint: enable=import-error
30 from packet_stats import InterfaceStats
31 from packet_stats import PacketPathStats
32 from stats_collector import IntervalCollector
33 from stats_collector import IterationCollector
34 import traffic_gen.traffic_utils as utils
35 from utils import cast_integer
38 class TrafficClientException(Exception):
39 """Generic traffic client exception."""
44 class TrafficRunner(object):
45 """Serialize various steps required to run traffic."""
47 def __init__(self, client, duration_sec, interval_sec=0):
48 """Create a traffic runner."""
50 self.start_time = None
51 self.duration_sec = duration_sec
52 self.interval_sec = interval_sec
55 """Clear stats and instruct the traffic generator to start generating traffic."""
58 LOG.info('Running traffic generator')
59 self.client.gen.clear_stats()
60 self.client.gen.start_traffic()
61 self.start_time = time.time()
62 return self.poll_stats()
65 """Stop the current run and instruct the traffic generator to stop traffic."""
67 self.start_time = None
68 self.client.gen.stop_traffic()
71 """Check if a run is still pending."""
72 return self.start_time is not None
74 def time_elapsed(self):
75 """Return time elapsed since start of run."""
77 return time.time() - self.start_time
78 return self.duration_sec
81 """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
83 return: latest stats or None if traffic is stopped
85 if not self.is_running():
87 if self.client.skip_sleep():
89 return self.client.get_stats()
90 time_elapsed = self.time_elapsed()
91 if time_elapsed > self.duration_sec:
94 time_left = self.duration_sec - time_elapsed
95 if self.interval_sec > 0.0:
96 if time_left <= self.interval_sec:
100 time.sleep(self.interval_sec)
102 time.sleep(self.duration_sec)
104 return self.client.get_stats()
107 class IpBlock(object):
108 """Manage a block of IP addresses."""
110 def __init__(self, base_ip, step_ip, count_ip):
111 """Create an IP block."""
112 self.base_ip_int = Device.ip_to_int(base_ip)
113 self.step = Device.ip_to_int(step_ip)
114 self.max_available = count_ip
117 def get_ip(self, index=0):
118 """Return the IP address at given index."""
119 if index < 0 or index >= self.max_available:
120 raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
121 return Device.int_to_ip(self.base_ip_int + index * self.step)
123 def reserve_ip_range(self, count):
124 """Reserve a range of count consecutive IP addresses spaced by step."""
125 if self.next_free + count > self.max_available:
126 raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
130 first_ip = self.get_ip(self.next_free)
131 last_ip = self.get_ip(self.next_free + count - 1)
132 self.next_free += count
133 return (first_ip, last_ip)
135 def reset_reservation(self):
136 """Reset all reservations and restart with a completely unused IP block."""
140 class Device(object):
141 """Represent a port device and all information associated to it.
143 In the curent version we only support 2 port devices for the traffic generator
144 identified as port 0 or port 1.
147 def __init__(self, port, generator_config):
148 """Create a new device for a given port."""
149 self.generator_config = generator_config
150 self.chain_count = generator_config.service_chain_count
151 self.flow_count = generator_config.flow_count / 2
153 self.switch_port = generator_config.interfaces[port].get('switch_port', None)
154 self.vtep_vlan = None
155 self.vtep_src_mac = None
157 self.pci = generator_config.interfaces[port].pci
159 self.dest_macs = None
160 self.vtep_dst_mac = None
161 self.vtep_dst_ip = None
162 if generator_config.vteps is None:
163 self.vtep_src_ip = None
165 self.vtep_src_ip = generator_config.vteps[port]
168 self.ip_addrs = generator_config.ip_addrs[port]
169 subnet = IPNetwork(self.ip_addrs)
170 self.ip = subnet.ip.format()
171 self.ip_addrs_step = generator_config.ip_addrs_step
172 self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
173 self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
174 generator_config.gateway_ip_addrs_step,
176 self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
177 self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
178 generator_config.tg_gateway_ip_addrs_step,
180 self.udp_src_port = generator_config.udp_src_port
181 self.udp_dst_port = generator_config.udp_dst_port
183 def set_mac(self, mac):
184 """Set the local MAC for this port device."""
186 raise TrafficClientException('Trying to set traffic generator MAC address as None')
189 def get_peer_device(self):
190 """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
191 return self.generator_config.devices[1 - self.port]
193 def set_vtep_dst_mac(self, dest_macs):
194 """Set the list of dest MACs indexed by the chain id.
196 This is only called in 2 cases:
197 - VM macs discovered using openstack API
198 - dest MACs provisioned in config file
200 self.vtep_dst_mac = map(str, dest_macs)
202 def set_dest_macs(self, dest_macs):
203 """Set the list of dest MACs indexed by the chain id.
205 This is only called in 2 cases:
206 - VM macs discovered using openstack API
207 - dest MACs provisioned in config file
209 self.dest_macs = map(str, dest_macs)
211 def get_dest_macs(self):
212 """Get the list of dest macs for this device.
214 If set_dest_macs was never called, assumes l2-loopback and return
215 a list of peer mac (as many as chains but normally only 1 chain)
218 return self.dest_macs
219 # assume this is l2-loopback
220 return [self.get_peer_device().mac] * self.chain_count
222 def set_vlans(self, vlans):
223 """Set the list of vlans to use indexed by the chain id."""
225 LOG.info("Port %d: VLANs %s", self.port, self.vlans)
227 def set_vtep_vlan(self, vlan):
228 """Set the vtep vlan to use indexed by specific port."""
229 self.vtep_vlan = vlan
231 self.vlan_tagging = None
232 LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
234 def set_vxlan_endpoints(self, src_ip, dst_ip):
235 self.vtep_dst_ip = dst_ip
236 self.vtep_src_ip = src_ip
237 LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
238 self.vtep_src_ip, self.vtep_dst_ip)
240 def set_vxlans(self, vnis):
242 LOG.info("Port %d: VNIs %s", self.port, self.vnis)
244 def get_gw_ip(self, chain_index):
245 """Retrieve the IP address assigned for the gateway of a given chain."""
246 return self.gw_ip_block.get_ip(chain_index)
248 def get_stream_configs(self):
249 """Get the stream config for a given chain on this device.
251 Called by the traffic generator driver to program the traffic generator properly
252 before generating traffic
255 # exact flow count for each chain is calculated as follows:
256 # - all chains except the first will have the same flow count
257 # calculated as (total_flows + chain_count - 1) / chain_count
258 # - the first chain will have the remainder
259 # example 11 flows and 3 chains => 3, 4, 4
260 flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
261 cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
262 peer = self.get_peer_device()
263 self.ip_block.reset_reservation()
264 peer.ip_block.reset_reservation()
265 dest_macs = self.get_dest_macs()
267 for chain_idx in xrange(self.chain_count):
268 src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
269 dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
272 'count': cur_chain_flow_count,
274 'mac_dst': dest_macs[chain_idx],
275 'ip_src_addr': src_ip_first,
276 'ip_src_addr_max': src_ip_last,
277 'ip_src_count': cur_chain_flow_count,
278 'ip_dst_addr': dst_ip_first,
279 'ip_dst_addr_max': dst_ip_last,
280 'ip_dst_count': cur_chain_flow_count,
281 'ip_addrs_step': self.ip_addrs_step,
282 'udp_src_port': self.udp_src_port,
283 'udp_dst_port': self.udp_dst_port,
284 'mac_discovery_gw': self.get_gw_ip(chain_idx),
285 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
286 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
287 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
289 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
290 'vtep_src_mac': self.mac if self.vxlan is True else None,
291 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
292 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
293 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
294 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
296 # after first chain, fall back to the flow count for all other chains
297 cur_chain_flow_count = flows_per_chain
302 """Convert an IP address from string to numeric."""
303 return struct.unpack("!I", socket.inet_aton(addr))[0]
306 def int_to_ip(nvalue):
307 """Convert an IP address from numeric to string."""
308 return socket.inet_ntoa(struct.pack("!I", nvalue))
311 class GeneratorConfig(object):
312 """Represents traffic configuration for currently running traffic profile."""
314 DEFAULT_IP_STEP = '0.0.0.1'
315 DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
317 def __init__(self, config):
318 """Create a generator config."""
320 # name of the generator profile (normally trex or dummy)
321 # pick the default one if not specified explicitly from cli options
322 if not config.generator_profile:
323 config.generator_profile = config.traffic_generator.default_profile
324 # pick up the profile dict based on the name
325 gen_config = self.__match_generator_profile(config.traffic_generator,
326 config.generator_profile)
327 self.gen_config = gen_config
328 # copy over fields from the dict
329 self.tool = gen_config.tool
330 self.ip = gen_config.ip
331 # overrides on config.cores and config.mbuf_factor
333 self.cores = config.cores
335 self.cores = gen_config.get('cores', 1)
336 self.mbuf_factor = config.mbuf_factor
337 if gen_config.intf_speed:
338 # interface speed is overriden from config
339 self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
341 # interface speed is discovered/provided by the traffic generator
343 self.name = gen_config.name
344 self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
345 self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
346 self.limit_memory = gen_config.get('limit_memory', 1024)
347 self.software_mode = gen_config.get('software_mode', False)
348 self.interfaces = gen_config.interfaces
349 if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
350 raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
351 if hasattr(gen_config, 'platform'):
352 self.platform = gen_config.platform
353 self.service_chain = config.service_chain
354 self.service_chain_count = config.service_chain_count
355 self.flow_count = config.flow_count
356 self.host_name = gen_config.host_name
358 self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
359 self.ip_addrs = gen_config.ip_addrs
360 self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
361 self.tg_gateway_ip_addrs_step = \
362 gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
363 self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
364 self.gateway_ips = gen_config.gateway_ip_addrs
365 self.udp_src_port = gen_config.udp_src_port
366 self.udp_dst_port = gen_config.udp_dst_port
367 self.vteps = gen_config.get('vteps')
368 self.devices = [Device(port, self) for port in [0, 1]]
369 # This should normally always be [0, 1]
370 self.ports = [device.port for device in self.devices]
372 # check that pci is not empty
373 if not gen_config.interfaces[0].get('pci', None) or \
374 not gen_config.interfaces[1].get('pci', None):
375 raise TrafficClientException("configuration interfaces pci fields cannot be empty")
377 self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
378 self.vlan_tagging = config.vlan_tagging
380 # needed for result/summarizer
381 config['tg-name'] = gen_config.name
382 config['tg-tool'] = self.tool
385 """Get json form to display the content into the overall result dict."""
386 return dict(self.gen_config)
388 def set_dest_macs(self, port_index, dest_macs):
389 """Set the list of dest MACs indexed by the chain id on given port.
391 port_index: the port for which dest macs must be set
392 dest_macs: a list of dest MACs indexed by chain id
394 if len(dest_macs) < self.config.service_chain_count:
395 raise TrafficClientException('Dest MAC list %s must have %d entries' %
396 (dest_macs, self.config.service_chain_count))
397 # only pass the first scc dest MACs
398 self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
399 LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
401 def set_vtep_dest_macs(self, port_index, dest_macs):
402 """Set the list of dest MACs indexed by the chain id on given port.
404 port_index: the port for which dest macs must be set
405 dest_macs: a list of dest MACs indexed by chain id
407 if len(dest_macs) != self.config.service_chain_count:
408 raise TrafficClientException('Dest MAC list %s must have %d entries' %
409 (dest_macs, self.config.service_chain_count))
410 self.devices[port_index].set_vtep_dst_mac(dest_macs)
411 LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
413 def get_dest_macs(self):
414 """Return the list of dest macs indexed by port."""
415 return [dev.get_dest_macs() for dev in self.devices]
417 def set_vlans(self, port_index, vlans):
418 """Set the list of vlans to use indexed by the chain id on given port.
420 port_index: the port for which VLANs must be set
421 vlans: a list of vlan lists indexed by chain id
423 if len(vlans) != self.config.service_chain_count:
424 raise TrafficClientException('VLAN list %s must have %d entries' %
425 (vlans, self.config.service_chain_count))
426 self.devices[port_index].set_vlans(vlans)
428 def set_vxlans(self, port_index, vxlans):
429 """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
431 port_index: the port for which VXLANs must be set
432 VXLANs: a list of VNIs lists indexed by chain id
434 if len(vxlans) != self.config.service_chain_count:
435 raise TrafficClientException('VXLAN list %s must have %d entries' %
436 (vxlans, self.config.service_chain_count))
437 self.devices[port_index].set_vxlans(vxlans)
439 def set_vtep_vlan(self, port_index, vlan):
440 """Set the vtep vlan to use indexed by the chain id on given port.
441 port_index: the port for which VLAN must be set
443 self.devices[port_index].set_vtep_vlan(vlan)
445 def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
446 self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
449 def __match_generator_profile(traffic_generator, generator_profile):
450 gen_config = AttrDict(traffic_generator)
451 gen_config.pop('default_profile')
452 gen_config.pop('generator_profile')
453 matching_profile = [profile for profile in traffic_generator.generator_profile if
454 profile.name == generator_profile]
455 if len(matching_profile) != 1:
456 raise Exception('Traffic generator profile not found: ' + generator_profile)
458 gen_config.update(matching_profile[0])
462 class TrafficClient(object):
463 """Traffic generator client with NDR/PDR binary seearch."""
467 def __init__(self, config, notifier=None):
468 """Create a new TrafficClient instance.
470 config: nfvbench config
471 notifier: notifier (optional)
473 A new instance is created everytime the nfvbench config may have changed.
476 self.generator_config = GeneratorConfig(config)
477 self.tool = self.generator_config.tool
478 self.gen = self._get_generator()
479 self.notifier = notifier
480 self.interval_collector = None
481 self.iteration_collector = None
482 self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
483 self.config.frame_sizes = self._get_frame_sizes()
485 'l2frame_size': None,
486 'duration_sec': self.config.duration_sec,
487 'bidirectional': True,
488 'rates': [] # to avoid unsbuscriptable-obj warning
490 self.current_total_rate = {'rate_percent': '10'}
491 if self.config.single_run:
492 self.current_total_rate = utils.parse_rate_str(self.config.rate)
494 # Speed is either discovered when connecting to TG or set from config
495 # This variable is 0 if not yet discovered from TG or must be the speed of
496 # each interface in bits per second
497 self.intf_speed = self.generator_config.intf_speed
499 def _get_generator(self):
500 tool = self.tool.lower()
502 from traffic_gen import trex
503 return trex.TRex(self)
505 from traffic_gen import dummy
506 return dummy.DummyTG(self)
507 raise TrafficClientException('Unsupported generator tool name:' + self.tool)
509 def skip_sleep(self):
510 """Skip all sleeps when doing unit testing with dummy TG.
512 Must be overriden using mock.patch
516 def _get_frame_sizes(self):
517 traffic_profile_name = self.config.traffic.profile
518 matching_profiles = [profile for profile in self.config.traffic_profile if
519 profile.name == traffic_profile_name]
520 if len(matching_profiles) > 1:
521 raise TrafficClientException('Multiple traffic profiles with name: ' +
522 traffic_profile_name)
523 elif not matching_profiles:
524 raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
525 return matching_profiles[0].l2frame_size
527 def start_traffic_generator(self):
528 """Start the traffic generator process (traffic not started yet)."""
530 # pick up the interface speed if it is not set from config
531 intf_speeds = self.gen.get_port_speed_gbps()
532 # convert Gbps unit into bps
533 tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
535 # interface speed is overriden from config
536 if self.intf_speed != tg_if_speed:
537 # Warn the user if the speed in the config is different
538 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
541 # interface speed not provisioned by config
542 self.intf_speed = tg_if_speed
543 # also update the speed in the tg config
544 self.generator_config.intf_speed = tg_if_speed
546 # Save the traffic generator local MAC
547 for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
551 """Set up the traffic client."""
552 self.gen.clear_stats()
554 def get_version(self):
555 """Get the traffic generator version."""
556 return self.gen.get_version()
558 def ensure_end_to_end(self):
559 """Ensure traffic generator receives packets it has transmitted.
561 This ensures end to end connectivity and also waits until VMs are ready to forward packets.
563 VMs that are started and in active state may not pass traffic yet. It is imperative to make
564 sure that all VMs are passing traffic in both directions before starting any benchmarking.
565 To verify this, we need to send at a low frequency bi-directional packets and make sure
566 that we receive all packets back from all VMs. The number of flows is equal to 2 times
567 the number of chains (1 per direction) and we need to make sure we receive packets coming
568 from exactly 2 x chain count different source MAC addresses.
571 PVP chain (1 VM per chain)
572 N = 10 (number of chains)
573 Flow count = 20 (number of flows)
574 If the number of unique source MAC addresses from received packets is 20 then
575 all 10 VMs 10 VMs are in operational state.
577 LOG.info('Starting traffic generator to ensure end-to-end connectivity')
578 # send 2pps on each chain and each direction
579 rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
580 self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
582 # ensures enough traffic is coming back
583 retry_count = (self.config.check_traffic_time_sec +
584 self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
586 # we expect to see packets coming from 2 unique MAC per chain
587 # because there can be flooding in the case of shared net
588 # we must verify that packets from the right VMs are received
589 # and not just count unique src MAC
590 # create a dict of (port, chain) tuples indexed by dest mac
592 for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
593 for chain, mac in enumerate(dest_macs):
594 mac_map[mac] = (port, chain)
595 unique_src_mac_count = len(mac_map)
596 if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
597 get_mac_id = lambda packet: packet['binary'][60:66]
598 elif self.config.vxlan:
599 get_mac_id = lambda packet: packet['binary'][56:62]
601 get_mac_id = lambda packet: packet['binary'][6:12]
602 for it in xrange(retry_count):
603 self.gen.clear_stats()
604 self.gen.start_traffic()
605 self.gen.start_capture()
606 LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
607 unique_src_mac_count - len(mac_map), unique_src_mac_count,
609 if not self.skip_sleep():
610 time.sleep(self.config.generic_poll_sec)
611 self.gen.stop_traffic()
612 self.gen.fetch_capture_packets()
613 self.gen.stop_capture()
615 for packet in self.gen.packet_list:
616 mac_id = get_mac_id(packet)
617 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
618 if src_mac in mac_map:
619 port, chain = mac_map[src_mac]
620 LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
621 src_mac, chain, port)
622 mac_map.pop(src_mac, None)
625 LOG.info('End-to-end connectivity established')
628 raise TrafficClientException('End-to-end connectivity cannot be ensured')
630 def ensure_arp_successful(self):
631 """Resolve all IP using ARP and throw an exception in case of failure."""
632 dest_macs = self.gen.resolve_arp()
634 # all dest macs are discovered, saved them into the generator config
635 if self.config.vxlan:
636 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
637 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
639 self.generator_config.set_dest_macs(0, dest_macs[0])
640 self.generator_config.set_dest_macs(1, dest_macs[1])
642 raise TrafficClientException('ARP cannot be resolved')
644 def set_traffic(self, frame_size, bidirectional):
645 """Reconfigure the traffic generator for a new frame size."""
646 self.run_config['bidirectional'] = bidirectional
647 self.run_config['l2frame_size'] = frame_size
648 self.run_config['rates'] = [self.get_per_direction_rate()]
650 self.run_config['rates'].append(self.get_per_direction_rate())
652 unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
653 if unidir_reverse_pps > 0:
654 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
655 # Fix for [NFVBENCH-67], convert the rate string to PPS
656 for idx, rate in enumerate(self.run_config['rates']):
657 if 'rate_pps' not in rate:
658 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
660 self.gen.clear_streamblock()
661 self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
663 def _modify_load(self, load):
664 self.current_total_rate = {'rate_percent': str(load)}
665 rate_per_direction = self.get_per_direction_rate()
667 self.gen.modify_rate(rate_per_direction, False)
668 self.run_config['rates'][0] = rate_per_direction
669 if self.run_config['bidirectional']:
670 self.gen.modify_rate(rate_per_direction, True)
671 self.run_config['rates'][1] = rate_per_direction
673 def get_ndr_and_pdr(self):
674 """Start the NDR/PDR iteration and return the results."""
675 dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
677 if self.config.ndr_run:
678 LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
679 targets['ndr'] = self.config.measurement.NDR
680 if self.config.pdr_run:
681 LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
682 targets['pdr'] = self.config.measurement.PDR
684 self.run_config['start_time'] = time.time()
685 self.interval_collector = IntervalCollector(self.run_config['start_time'])
686 self.interval_collector.attach_notifier(self.notifier)
687 self.iteration_collector = IterationCollector(self.run_config['start_time'])
689 self.__range_search(0.0, 200.0, targets, results)
691 results['iteration_stats'] = {
692 'ndr_pdr': self.iteration_collector.get()
695 if self.config.ndr_run:
696 LOG.info('NDR load: %s', results['ndr']['rate_percent'])
697 results['ndr']['time_taken_sec'] = \
698 results['ndr']['timestamp_sec'] - self.run_config['start_time']
699 if self.config.pdr_run:
700 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
701 results['pdr']['time_taken_sec'] = \
702 results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
704 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
705 results['pdr']['time_taken_sec'] = \
706 results['pdr']['timestamp_sec'] - self.run_config['start_time']
709 def __get_dropped_rate(self, result):
710 dropped_pkts = result['rx']['dropped_pkts']
711 total_pkts = result['tx']['total_pkts']
714 return float(dropped_pkts) / total_pkts * 100
717 """Collect final stats for previous run."""
718 stats = self.gen.get_stats()
719 retDict = {'total_tx_rate': stats['total_tx_rate']}
720 for port in self.PORTS:
721 retDict[port] = {'tx': {}, 'rx': {}}
723 tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
724 rx_keys = tx_keys + ['dropped_pkts']
726 for port in self.PORTS:
728 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
731 retDict[port]['rx'][key] = int(stats[port]['rx'][key])
733 retDict[port]['rx'][key] = 0
734 retDict[port]['rx']['avg_delay_usec'] = cast_integer(
735 stats[port]['rx']['avg_delay_usec'])
736 retDict[port]['rx']['min_delay_usec'] = cast_integer(
737 stats[port]['rx']['min_delay_usec'])
738 retDict[port]['rx']['max_delay_usec'] = cast_integer(
739 stats[port]['rx']['max_delay_usec'])
740 retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
742 ports = sorted(retDict.keys())
743 if self.run_config['bidirectional']:
744 retDict['overall'] = {'tx': {}, 'rx': {}}
746 retDict['overall']['tx'][key] = \
747 retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
749 retDict['overall']['rx'][key] = \
750 retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
751 total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
752 retDict[ports[1]]['rx']['total_pkts']]
753 avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
754 retDict[ports[1]]['rx']['avg_delay_usec']]
755 max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
756 retDict[ports[1]]['rx']['max_delay_usec']]
757 min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
758 retDict[ports[1]]['rx']['min_delay_usec']]
759 retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
760 retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
761 retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
762 for key in ['pkt_bit_rate', 'pkt_rate']:
763 for dirc in ['tx', 'rx']:
764 retDict['overall'][dirc][key] /= 2.0
766 retDict['overall'] = retDict[ports[0]]
767 retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
770 def __convert_rates(self, rate):
771 return utils.convert_rates(self.run_config['l2frame_size'],
775 def __ndr_pdr_found(self, tag, load):
776 rates = self.__convert_rates({'rate_percent': load})
777 self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
778 last_stats = self.iteration_collector.peek()
779 self.interval_collector.add_ndr_pdr(tag, last_stats)
781 def __format_output_stats(self, stats):
782 for key in self.PORTS + ['overall']:
783 interface = stats[key]
785 'tx_pkts': interface['tx']['total_pkts'],
786 'rx_pkts': interface['rx']['total_pkts'],
787 'drop_percentage': interface['drop_rate_percent'],
788 'drop_pct': interface['rx']['dropped_pkts'],
789 'avg_delay_usec': interface['rx']['avg_delay_usec'],
790 'max_delay_usec': interface['rx']['max_delay_usec'],
791 'min_delay_usec': interface['rx']['min_delay_usec'],
796 def __targets_found(self, rate, targets, results):
797 for tag, target in targets.iteritems():
798 LOG.info('Found %s (%s) load: %s', tag, target, rate)
799 self.__ndr_pdr_found(tag, rate)
800 results[tag]['timestamp_sec'] = time.time()
802 def __range_search(self, left, right, targets, results):
803 """Perform a binary search for a list of targets inside a [left..right] range or rate.
805 left the left side of the range to search as a % the line rate (100 = 100% line rate)
806 indicating the rate to send on each interface
807 right the right side of the range to search as a % of line rate
808 indicating the rate to send on each interface
809 targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
811 results a dict to store results
815 LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
817 # Terminate search when gap is less than load epsilon
818 if right - left < self.config.measurement.load_epsilon:
819 self.__targets_found(left, targets, results)
822 # Obtain the average drop rate in for middle load
823 middle = (left + right) / 2.0
825 stats, rates = self.__run_search_iteration(middle)
827 LOG.exception("Got exception from traffic generator during binary search")
828 self.__targets_found(left, targets, results)
830 # Split target dicts based on the avg drop rate
833 for tag, target in targets.iteritems():
834 if stats['overall']['drop_rate_percent'] <= target:
835 # record the best possible rate found for this target
837 results[tag].update({
838 'load_percent_per_direction': middle,
839 'stats': self.__format_output_stats(dict(stats)),
840 'timestamp_sec': None
842 right_targets[tag] = target
844 # initialize to 0 all fields of result for
845 # the worst case scenario of the binary search (if ndr/pdr is not found)
846 if tag not in results:
847 results[tag] = dict.fromkeys(rates, 0)
848 empty_stats = self.__format_output_stats(dict(stats))
849 for key in empty_stats:
850 if isinstance(empty_stats[key], dict):
851 empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
854 results[tag].update({
855 'load_percent_per_direction': 0,
856 'stats': empty_stats,
857 'timestamp_sec': None
859 left_targets[tag] = target
862 self.__range_search(left, middle, left_targets, results)
864 # search upper half only if the upper rate does not exceed
865 # 100%, this only happens when the first search at 100%
866 # yields a DR that is < target DR
868 self.__targets_found(100, right_targets, results)
870 self.__range_search(middle, right, right_targets, results)
872 def __run_search_iteration(self, rate):
873 """Run one iteration at the given rate level.
875 rate: the rate to send on each port in percent (0 to 100)
877 self._modify_load(rate)
879 # poll interval stats and collect them
880 for stats in self.run_traffic():
881 self.interval_collector.add(stats)
882 time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
883 if time_elapsed_ratio >= 1:
884 self.cancel_traffic()
885 if not self.skip_sleep():
886 time.sleep(self.config.pause_sec)
887 self.interval_collector.reset()
889 # get stats from the run
890 stats = self.runner.client.get_stats()
891 current_traffic_config = self._get_traffic_config()
892 warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
893 stats['total_tx_rate'])
894 if warning is not None:
895 stats['warning'] = warning
897 # save reliable stats from whole iteration
898 self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
899 LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
900 return stats, current_traffic_config['direction-total']
903 def log_stats(stats):
904 """Log estimated stats during run."""
906 'datetime': str(datetime.now()),
907 'tx_packets': stats['overall']['tx']['total_pkts'],
908 'rx_packets': stats['overall']['rx']['total_pkts'],
909 'drop_packets': stats['overall']['rx']['dropped_pkts'],
910 'drop_rate_percent': stats['overall']['drop_rate_percent']
912 LOG.info('TX: %(tx_packets)d; '
913 'RX: %(rx_packets)d; '
914 'Est. Dropped: %(drop_packets)d; '
915 'Est. Drop rate: %(drop_rate_percent).4f%%',
918 def run_traffic(self):
919 """Start traffic and return intermediate stats for each interval."""
920 stats = self.runner.run()
921 while self.runner.is_running:
922 self.log_stats(stats)
924 stats = self.runner.poll_stats()
927 self.log_stats(stats)
928 LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
931 def cancel_traffic(self):
935 def _get_traffic_config(self):
940 for idx, rate in enumerate(self.run_config['rates']):
941 key = 'direction-forward' if idx == 0 else 'direction-reverse'
943 'l2frame_size': self.run_config['l2frame_size'],
944 'duration_sec': self.run_config['duration_sec']
946 config[key].update(rate)
947 config[key].update(self.__convert_rates(rate))
948 load_total += float(config[key]['rate_percent'])
949 bps_total += float(config[key]['rate_bps'])
950 pps_total += float(config[key]['rate_pps'])
951 config['direction-total'] = dict(config['direction-forward'])
952 config['direction-total'].update({
953 'rate_percent': load_total,
954 'rate_pps': cast_integer(pps_total),
955 'rate_bps': bps_total
960 def get_run_config(self, results):
961 """Return configuration which was used for the last run."""
963 # because we want each direction to have the far end RX rates,
964 # use the far end index (1-idx) to retrieve the RX rates
965 for idx, key in enumerate(["direction-forward", "direction-reverse"]):
966 tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
967 rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
969 "orig": self.__convert_rates(self.run_config['rates'][idx]),
970 "tx": self.__convert_rates({'rate_pps': tx_rate}),
971 "rx": self.__convert_rates({'rate_pps': rx_rate})
975 for direction in ['orig', 'tx', 'rx']:
976 total[direction] = {}
977 for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
978 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
980 r['direction-total'] = total
983 def insert_interface_stats(self, pps_list):
984 """Insert interface stats to a list of packet path stats.
986 pps_list: a list of packet path stats instances indexed by chain index
988 This function will insert the packet path stats for the traffic gen ports 0 and 1
989 with itemized per chain tx/rx counters.
990 There will be as many packet path stats as chains.
991 Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
994 PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
995 PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
999 def get_if_stats(chain_idx):
1000 return [InterfaceStats('p' + str(port), self.tool)
1001 for port in range(2)]
1002 # keep the list of list of interface stats indexed by the chain id
1003 self.ifstats = [get_if_stats(chain_idx)
1004 for chain_idx in range(self.config.service_chain_count)]
1005 # note that we need to make a copy of the ifs list so that any modification in the
1006 # list from pps will not change the list saved in self.ifstats
1007 self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1008 # insert the corresponding pps in the passed list
1009 pps_list.extend(self.pps_list)
1011 def update_interface_stats(self, diff=False):
1012 """Update all interface stats.
1014 diff: if False, simply refresh the interface stats values with latest values
1015 if True, diff the interface stats with the latest values
1016 Make sure that the interface stats inserted in insert_interface_stats() are updated
1020 [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1021 [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1026 stats = self.gen.get_stats()
1027 for chain_idx, ifs in enumerate(self.ifstats):
1028 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1029 # corresponding to the
1030 # port 0 and port 1 for the given chain_idx
1031 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1032 # interface stats for the pps because it could have been modified to contain
1033 # additional interface stats
1034 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1035 # special handling for vxlan
1036 # in case of vxlan, flow stats are not available so all rx counters will be
1037 # zeros when the total rx port counter is non zero.
1039 for port in range(2):
1041 for ifs in self.ifstats:
1042 total_rx += ifs[port].rx
1044 # check if the total port rx from Trex is also zero
1045 port_rx = stats[port]['rx']['total_pkts']
1047 # the total rx for all chains from port level stats is non zero
1048 # which means that the per-chain stats are not available
1049 if len(self.ifstats) == 1:
1050 # only one chain, simply report the port level rx to the chain rx stats
1051 self.ifstats[0][port].rx = port_rx
1053 for ifs in self.ifstats:
1054 # mark this data as unavailable
1056 # pitch in the total rx only in the last chain pps
1057 self.ifstats[-1][port].rx_total = port_rx
1060 def compare_tx_rates(required, actual):
1061 """Compare the actual TX rate to the required TX rate."""
1063 are_different = False
1065 if float(actual) / required < threshold:
1066 are_different = True
1067 except ZeroDivisionError:
1068 are_different = True
1071 msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1072 "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1073 "to achieve the requested TX rate.".format(r=required, a=actual)
1079 def get_per_direction_rate(self):
1080 """Get the rate for each direction."""
1081 divisor = 2 if self.run_config['bidirectional'] else 1
1082 if 'rate_percent' in self.current_total_rate:
1083 # don't split rate if it's percentage
1086 return utils.divide_rate(self.current_total_rate, divisor)
1089 """Close this instance."""
1091 self.gen.stop_traffic()
1094 self.gen.clear_stats()