1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
17 from datetime import datetime
22 from attrdict import AttrDict
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: enable=import-error
32 from .packet_stats import InterfaceStats
33 from .packet_stats import PacketPathStats
34 from .stats_collector import IntervalCollector
35 from .stats_collector import IterationCollector
36 from .traffic_gen import traffic_utils as utils
37 from .utils import cast_integer
40 class TrafficClientException(Exception):
41 """Generic traffic client exception."""
43 class TrafficRunner(object):
44 """Serialize various steps required to run traffic."""
46 def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
47 """Create a traffic runner."""
49 self.start_time = None
50 self.duration_sec = duration_sec
51 self.interval_sec = interval_sec
52 self.service_mode = service_mode
55 """Clear stats and instruct the traffic generator to start generating traffic."""
58 LOG.info('Running traffic generator')
59 self.client.gen.clear_stats()
60 # Debug use only : new '--service-mode' option available for the NFVBench command line.
61 # A read-only mode TRex console would be able to capture the generated traffic.
62 self.client.gen.set_service_mode(enabled=self.service_mode)
63 LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
64 self.client.gen.start_traffic()
65 self.start_time = time.time()
66 return self.poll_stats()
69 """Stop the current run and instruct the traffic generator to stop traffic."""
71 self.start_time = None
72 self.client.gen.stop_traffic()
75 """Check if a run is still pending."""
76 return self.start_time is not None
78 def time_elapsed(self):
79 """Return time elapsed since start of run."""
81 return time.time() - self.start_time
82 return self.duration_sec
85 """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
87 return: latest stats or None if traffic is stopped
89 if not self.is_running():
91 if self.client.skip_sleep():
93 return self.client.get_stats()
94 time_elapsed = self.time_elapsed()
95 if time_elapsed > self.duration_sec:
98 time_left = self.duration_sec - time_elapsed
99 if self.interval_sec > 0.0:
100 if time_left <= self.interval_sec:
101 time.sleep(time_left)
104 time.sleep(self.interval_sec)
106 time.sleep(self.duration_sec)
108 return self.client.get_stats()
111 class IpBlock(object):
112 """Manage a block of IP addresses."""
114 def __init__(self, base_ip, step_ip, count_ip):
115 """Create an IP block."""
116 self.base_ip_int = Device.ip_to_int(base_ip)
117 self.step = Device.ip_to_int(step_ip)
118 self.max_available = count_ip
121 def get_ip(self, index=0):
122 """Return the IP address at given index."""
123 if index < 0 or index >= self.max_available:
124 raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
125 return Device.int_to_ip(self.base_ip_int + index * self.step)
127 def reserve_ip_range(self, count):
128 """Reserve a range of count consecutive IP addresses spaced by step."""
129 if self.next_free + count > self.max_available:
130 raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
134 first_ip = self.get_ip(self.next_free)
135 last_ip = self.get_ip(self.next_free + count - 1)
136 self.next_free += count
137 return (first_ip, last_ip)
139 def reset_reservation(self):
140 """Reset all reservations and restart with a completely unused IP block."""
144 class Device(object):
145 """Represent a port device and all information associated to it.
147 In the curent version we only support 2 port devices for the traffic generator
148 identified as port 0 or port 1.
151 def __init__(self, port, generator_config):
152 """Create a new device for a given port."""
153 self.generator_config = generator_config
154 self.chain_count = generator_config.service_chain_count
155 self.flow_count = generator_config.flow_count / 2
157 self.switch_port = generator_config.interfaces[port].get('switch_port', None)
158 self.vtep_vlan = None
159 self.vtep_src_mac = None
161 self.pci = generator_config.interfaces[port].pci
163 self.dest_macs = None
164 self.vtep_dst_mac = None
165 self.vtep_dst_ip = None
166 if generator_config.vteps is None:
167 self.vtep_src_ip = None
169 self.vtep_src_ip = generator_config.vteps[port]
172 self.ip_addrs = generator_config.ip_addrs[port]
173 subnet = IPNetwork(self.ip_addrs)
174 self.ip = subnet.ip.format()
175 self.ip_addrs_step = generator_config.ip_addrs_step
176 self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
177 self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
178 generator_config.gateway_ip_addrs_step,
180 self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
181 self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
182 generator_config.tg_gateway_ip_addrs_step,
184 self.udp_src_port = generator_config.udp_src_port
185 self.udp_dst_port = generator_config.udp_dst_port
187 def set_mac(self, mac):
188 """Set the local MAC for this port device."""
190 raise TrafficClientException('Trying to set traffic generator MAC address as None')
193 def get_peer_device(self):
194 """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
195 return self.generator_config.devices[1 - self.port]
197 def set_vtep_dst_mac(self, dest_macs):
198 """Set the list of dest MACs indexed by the chain id.
200 This is only called in 2 cases:
201 - VM macs discovered using openstack API
202 - dest MACs provisioned in config file
204 self.vtep_dst_mac = list(map(str, dest_macs))
206 def set_dest_macs(self, dest_macs):
207 """Set the list of dest MACs indexed by the chain id.
209 This is only called in 2 cases:
210 - VM macs discovered using openstack API
211 - dest MACs provisioned in config file
213 self.dest_macs = list(map(str, dest_macs))
215 def get_dest_macs(self):
216 """Get the list of dest macs for this device.
218 If set_dest_macs was never called, assumes l2-loopback and return
219 a list of peer mac (as many as chains but normally only 1 chain)
222 return self.dest_macs
223 # assume this is l2-loopback
224 return [self.get_peer_device().mac] * self.chain_count
226 def set_vlans(self, vlans):
227 """Set the list of vlans to use indexed by the chain id."""
229 LOG.info("Port %d: VLANs %s", self.port, self.vlans)
231 def set_vtep_vlan(self, vlan):
232 """Set the vtep vlan to use indexed by specific port."""
233 self.vtep_vlan = vlan
235 self.vlan_tagging = None
236 LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
238 def set_vxlan_endpoints(self, src_ip, dst_ip):
239 self.vtep_dst_ip = dst_ip
240 self.vtep_src_ip = src_ip
241 LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
242 self.vtep_src_ip, self.vtep_dst_ip)
244 def set_vxlans(self, vnis):
246 LOG.info("Port %d: VNIs %s", self.port, self.vnis)
248 def set_gw_ip(self, gateway_ip):
249 self.gw_ip_block = IpBlock(gateway_ip,
250 self.generator_config.gateway_ip_addrs_step,
253 def get_gw_ip(self, chain_index):
254 """Retrieve the IP address assigned for the gateway of a given chain."""
255 return self.gw_ip_block.get_ip(chain_index)
257 def get_stream_configs(self):
258 """Get the stream config for a given chain on this device.
260 Called by the traffic generator driver to program the traffic generator properly
261 before generating traffic
264 # exact flow count for each chain is calculated as follows:
265 # - all chains except the first will have the same flow count
266 # calculated as (total_flows + chain_count - 1) / chain_count
267 # - the first chain will have the remainder
268 # example 11 flows and 3 chains => 3, 4, 4
269 flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count)
270 cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1))
271 peer = self.get_peer_device()
272 self.ip_block.reset_reservation()
273 peer.ip_block.reset_reservation()
274 dest_macs = self.get_dest_macs()
276 for chain_idx in range(self.chain_count):
277 src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
278 dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
281 'count': cur_chain_flow_count,
283 'mac_dst': dest_macs[chain_idx],
284 'ip_src_addr': src_ip_first,
285 'ip_src_addr_max': src_ip_last,
286 'ip_src_count': cur_chain_flow_count,
287 'ip_dst_addr': dst_ip_first,
288 'ip_dst_addr_max': dst_ip_last,
289 'ip_dst_count': cur_chain_flow_count,
290 'ip_addrs_step': self.ip_addrs_step,
291 'udp_src_port': self.udp_src_port,
292 'udp_dst_port': self.udp_dst_port,
293 'mac_discovery_gw': self.get_gw_ip(chain_idx),
294 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
295 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
296 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
298 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
299 'vtep_src_mac': self.mac if self.vxlan is True else None,
300 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
301 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
302 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
303 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
305 # after first chain, fall back to the flow count for all other chains
306 cur_chain_flow_count = flows_per_chain
311 """Convert an IP address from string to numeric."""
312 return struct.unpack("!I", socket.inet_aton(addr))[0]
315 def int_to_ip(nvalue):
316 """Convert an IP address from numeric to string."""
317 return socket.inet_ntoa(struct.pack("!I", int(nvalue)))
320 class GeneratorConfig(object):
321 """Represents traffic configuration for currently running traffic profile."""
323 DEFAULT_IP_STEP = '0.0.0.1'
324 DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
326 def __init__(self, config):
327 """Create a generator config."""
329 # name of the generator profile (normally trex or dummy)
330 # pick the default one if not specified explicitly from cli options
331 if not config.generator_profile:
332 config.generator_profile = config.traffic_generator.default_profile
333 # pick up the profile dict based on the name
334 gen_config = self.__match_generator_profile(config.traffic_generator,
335 config.generator_profile)
336 self.gen_config = gen_config
337 # copy over fields from the dict
338 self.tool = gen_config.tool
339 self.ip = gen_config.ip
340 # overrides on config.cores and config.mbuf_factor
342 self.cores = config.cores
344 self.cores = gen_config.get('cores', 1)
345 self.mbuf_factor = config.mbuf_factor
346 self.mbuf_64 = config.mbuf_64
347 self.hdrh = not config.disable_hdrh
348 if gen_config.intf_speed:
349 # interface speed is overriden from config
350 self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
352 # interface speed is discovered/provided by the traffic generator
354 self.name = gen_config.name
355 self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
356 self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
357 self.limit_memory = gen_config.get('limit_memory', 1024)
358 self.software_mode = gen_config.get('software_mode', False)
359 self.interfaces = gen_config.interfaces
360 if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
361 raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
362 self.service_chain = config.service_chain
363 self.service_chain_count = config.service_chain_count
364 self.flow_count = config.flow_count
365 self.host_name = gen_config.host_name
367 self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
368 self.ip_addrs = gen_config.ip_addrs
369 self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
370 self.tg_gateway_ip_addrs_step = \
371 gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
372 self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
373 self.gateway_ips = gen_config.gateway_ip_addrs
374 self.udp_src_port = gen_config.udp_src_port
375 self.udp_dst_port = gen_config.udp_dst_port
376 self.vteps = gen_config.get('vteps')
377 self.devices = [Device(port, self) for port in [0, 1]]
378 # This should normally always be [0, 1]
379 self.ports = [device.port for device in self.devices]
381 # check that pci is not empty
382 if not gen_config.interfaces[0].get('pci', None) or \
383 not gen_config.interfaces[1].get('pci', None):
384 raise TrafficClientException("configuration interfaces pci fields cannot be empty")
386 self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
387 self.vlan_tagging = config.vlan_tagging
389 # needed for result/summarizer
390 config['tg-name'] = gen_config.name
391 config['tg-tool'] = self.tool
394 """Get json form to display the content into the overall result dict."""
395 return dict(self.gen_config)
397 def set_dest_macs(self, port_index, dest_macs):
398 """Set the list of dest MACs indexed by the chain id on given port.
400 port_index: the port for which dest macs must be set
401 dest_macs: a list of dest MACs indexed by chain id
403 if len(dest_macs) < self.config.service_chain_count:
404 raise TrafficClientException('Dest MAC list %s must have %d entries' %
405 (dest_macs, self.config.service_chain_count))
406 # only pass the first scc dest MACs
407 self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
408 LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
410 def set_vtep_dest_macs(self, port_index, dest_macs):
411 """Set the list of dest MACs indexed by the chain id on given port.
413 port_index: the port for which dest macs must be set
414 dest_macs: a list of dest MACs indexed by chain id
416 if len(dest_macs) != self.config.service_chain_count:
417 raise TrafficClientException('Dest MAC list %s must have %d entries' %
418 (dest_macs, self.config.service_chain_count))
419 self.devices[port_index].set_vtep_dst_mac(dest_macs)
420 LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs})
422 def get_dest_macs(self):
423 """Return the list of dest macs indexed by port."""
424 return [dev.get_dest_macs() for dev in self.devices]
426 def set_vlans(self, port_index, vlans):
427 """Set the list of vlans to use indexed by the chain id on given port.
429 port_index: the port for which VLANs must be set
430 vlans: a list of vlan lists indexed by chain id
432 if len(vlans) != self.config.service_chain_count:
433 raise TrafficClientException('VLAN list %s must have %d entries' %
434 (vlans, self.config.service_chain_count))
435 self.devices[port_index].set_vlans(vlans)
437 def set_vxlans(self, port_index, vxlans):
438 """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
440 port_index: the port for which VXLANs must be set
441 VXLANs: a list of VNIs lists indexed by chain id
443 if len(vxlans) != self.config.service_chain_count:
444 raise TrafficClientException('VXLAN list %s must have %d entries' %
445 (vxlans, self.config.service_chain_count))
446 self.devices[port_index].set_vxlans(vxlans)
448 def set_vtep_vlan(self, port_index, vlan):
449 """Set the vtep vlan to use indexed by the chain id on given port.
450 port_index: the port for which VLAN must be set
452 self.devices[port_index].set_vtep_vlan(vlan)
454 def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
455 self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
458 def __match_generator_profile(traffic_generator, generator_profile):
459 gen_config = AttrDict(traffic_generator)
460 gen_config.pop('default_profile')
461 gen_config.pop('generator_profile')
462 matching_profile = [profile for profile in traffic_generator.generator_profile if
463 profile.name == generator_profile]
464 if len(matching_profile) != 1:
465 raise Exception('Traffic generator profile not found: ' + generator_profile)
467 gen_config.update(matching_profile[0])
471 class TrafficClient(object):
472 """Traffic generator client with NDR/PDR binary seearch."""
476 def __init__(self, config, notifier=None):
477 """Create a new TrafficClient instance.
479 config: nfvbench config
480 notifier: notifier (optional)
482 A new instance is created everytime the nfvbench config may have changed.
485 self.generator_config = GeneratorConfig(config)
486 self.tool = self.generator_config.tool
487 self.gen = self._get_generator()
488 self.notifier = notifier
489 self.interval_collector = None
490 self.iteration_collector = None
491 self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
492 self.config.service_mode)
493 self.config.frame_sizes = self._get_frame_sizes()
495 'l2frame_size': None,
496 'duration_sec': self.config.duration_sec,
497 'bidirectional': True,
498 'rates': [] # to avoid unsbuscriptable-obj warning
500 self.current_total_rate = {'rate_percent': '10'}
501 if self.config.single_run:
502 self.current_total_rate = utils.parse_rate_str(self.config.rate)
504 # Speed is either discovered when connecting to TG or set from config
505 # This variable is 0 if not yet discovered from TG or must be the speed of
506 # each interface in bits per second
507 self.intf_speed = self.generator_config.intf_speed
509 def _get_generator(self):
510 tool = self.tool.lower()
512 from .traffic_gen import trex_gen
513 return trex_gen.TRex(self)
515 from .traffic_gen import dummy
516 return dummy.DummyTG(self)
517 raise TrafficClientException('Unsupported generator tool name:' + self.tool)
519 def skip_sleep(self):
520 """Skip all sleeps when doing unit testing with dummy TG.
522 Must be overriden using mock.patch
526 def _get_frame_sizes(self):
527 traffic_profile_name = self.config.traffic.profile
528 matching_profiles = [profile for profile in self.config.traffic_profile if
529 profile.name == traffic_profile_name]
530 if len(matching_profiles) > 1:
531 raise TrafficClientException('Multiple traffic profiles with name: ' +
532 traffic_profile_name)
533 if not matching_profiles:
534 raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
535 return matching_profiles[0].l2frame_size
537 def start_traffic_generator(self):
538 """Start the traffic generator process (traffic not started yet)."""
540 # pick up the interface speed if it is not set from config
541 intf_speeds = self.gen.get_port_speed_gbps()
542 # convert Gbps unit into bps
543 tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
545 # interface speed is overriden from config
546 if self.intf_speed != tg_if_speed:
547 # Warn the user if the speed in the config is different
548 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
551 # interface speed not provisioned by config
552 self.intf_speed = tg_if_speed
553 # also update the speed in the tg config
554 self.generator_config.intf_speed = tg_if_speed
556 # Save the traffic generator local MAC
557 for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
561 """Set up the traffic client."""
562 self.gen.clear_stats()
564 def get_version(self):
565 """Get the traffic generator version."""
566 return self.gen.get_version()
568 def ensure_end_to_end(self):
569 """Ensure traffic generator receives packets it has transmitted.
571 This ensures end to end connectivity and also waits until VMs are ready to forward packets.
573 VMs that are started and in active state may not pass traffic yet. It is imperative to make
574 sure that all VMs are passing traffic in both directions before starting any benchmarking.
575 To verify this, we need to send at a low frequency bi-directional packets and make sure
576 that we receive all packets back from all VMs. The number of flows is equal to 2 times
577 the number of chains (1 per direction) and we need to make sure we receive packets coming
578 from exactly 2 x chain count different source MAC addresses.
581 PVP chain (1 VM per chain)
582 N = 10 (number of chains)
583 Flow count = 20 (number of flows)
584 If the number of unique source MAC addresses from received packets is 20 then
585 all 10 VMs 10 VMs are in operational state.
587 LOG.info('Starting traffic generator to ensure end-to-end connectivity')
588 # send 2pps on each chain and each direction
589 rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
590 self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
592 # ensures enough traffic is coming back
593 retry_count = int((self.config.check_traffic_time_sec +
594 self.config.generic_poll_sec - 1) / self.config.generic_poll_sec)
596 # we expect to see packets coming from 2 unique MAC per chain
597 # because there can be flooding in the case of shared net
598 # we must verify that packets from the right VMs are received
599 # and not just count unique src MAC
600 # create a dict of (port, chain) tuples indexed by dest mac
602 for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
603 for chain, mac in enumerate(dest_macs):
604 mac_map[mac] = (port, chain)
605 unique_src_mac_count = len(mac_map)
606 if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
607 get_mac_id = lambda packet: packet['binary'][60:66]
608 elif self.config.vxlan:
609 get_mac_id = lambda packet: packet['binary'][56:62]
611 get_mac_id = lambda packet: packet['binary'][6:12]
612 for it in range(retry_count):
613 self.gen.clear_stats()
614 self.gen.start_traffic()
615 self.gen.start_capture()
616 LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
617 unique_src_mac_count - len(mac_map), unique_src_mac_count,
619 if not self.skip_sleep():
620 time.sleep(self.config.generic_poll_sec)
621 self.gen.stop_traffic()
622 self.gen.fetch_capture_packets()
623 self.gen.stop_capture()
624 for packet in self.gen.packet_list:
625 mac_id = get_mac_id(packet).decode('latin-1')
626 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
627 if src_mac in mac_map and self.is_udp(packet):
628 port, chain = mac_map[src_mac]
629 LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
630 src_mac, chain, port)
631 mac_map.pop(src_mac, None)
634 LOG.info('End-to-end connectivity established')
636 if self.config.l3_router and not self.config.no_arp:
637 # In case of L3 traffic mode, routers are not able to route traffic
638 # until VM interfaces are up and ARP requests are done
639 LOG.info('Waiting for loopback service completely started...')
640 LOG.info('Sending ARP request to assure end-to-end connectivity established')
641 self.ensure_arp_successful()
642 raise TrafficClientException('End-to-end connectivity cannot be ensured')
644 def is_udp(self, packet):
645 pkt = Ether(packet['binary'])
648 def ensure_arp_successful(self):
649 """Resolve all IP using ARP and throw an exception in case of failure."""
650 dest_macs = self.gen.resolve_arp()
652 # all dest macs are discovered, saved them into the generator config
653 if self.config.vxlan:
654 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
655 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
657 self.generator_config.set_dest_macs(0, dest_macs[0])
658 self.generator_config.set_dest_macs(1, dest_macs[1])
660 raise TrafficClientException('ARP cannot be resolved')
662 def set_traffic(self, frame_size, bidirectional):
663 """Reconfigure the traffic generator for a new frame size."""
664 self.run_config['bidirectional'] = bidirectional
665 self.run_config['l2frame_size'] = frame_size
666 self.run_config['rates'] = [self.get_per_direction_rate()]
668 self.run_config['rates'].append(self.get_per_direction_rate())
670 unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
671 if unidir_reverse_pps > 0:
672 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
673 # Fix for [NFVBENCH-67], convert the rate string to PPS
674 for idx, rate in enumerate(self.run_config['rates']):
675 if 'rate_pps' not in rate:
676 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
678 self.gen.clear_streamblock()
679 if self.config.no_latency_streams:
680 LOG.info("Latency streams are disabled")
681 self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
682 latency=not self.config.no_latency_streams)
684 def _modify_load(self, load):
685 self.current_total_rate = {'rate_percent': str(load)}
686 rate_per_direction = self.get_per_direction_rate()
688 self.gen.modify_rate(rate_per_direction, False)
689 self.run_config['rates'][0] = rate_per_direction
690 if self.run_config['bidirectional']:
691 self.gen.modify_rate(rate_per_direction, True)
692 self.run_config['rates'][1] = rate_per_direction
694 def get_ndr_and_pdr(self):
695 """Start the NDR/PDR iteration and return the results."""
696 dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
698 if self.config.ndr_run:
699 LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
700 targets['ndr'] = self.config.measurement.NDR
701 if self.config.pdr_run:
702 LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
703 targets['pdr'] = self.config.measurement.PDR
705 self.run_config['start_time'] = time.time()
706 self.interval_collector = IntervalCollector(self.run_config['start_time'])
707 self.interval_collector.attach_notifier(self.notifier)
708 self.iteration_collector = IterationCollector(self.run_config['start_time'])
710 self.__range_search(0.0, 200.0, targets, results)
712 results['iteration_stats'] = {
713 'ndr_pdr': self.iteration_collector.get()
716 if self.config.ndr_run:
717 LOG.info('NDR load: %s', results['ndr']['rate_percent'])
718 results['ndr']['time_taken_sec'] = \
719 results['ndr']['timestamp_sec'] - self.run_config['start_time']
720 if self.config.pdr_run:
721 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
722 results['pdr']['time_taken_sec'] = \
723 results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
725 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
726 results['pdr']['time_taken_sec'] = \
727 results['pdr']['timestamp_sec'] - self.run_config['start_time']
730 def __get_dropped_rate(self, result):
731 dropped_pkts = result['rx']['dropped_pkts']
732 total_pkts = result['tx']['total_pkts']
735 return float(dropped_pkts) / total_pkts * 100
738 """Collect final stats for previous run."""
739 stats = self.gen.get_stats()
740 retDict = {'total_tx_rate': stats['total_tx_rate']}
742 tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
743 rx_keys = tx_keys + ['dropped_pkts']
745 for port in self.PORTS:
746 port_stats = {'tx': {}, 'rx': {}}
748 port_stats['tx'][key] = int(stats[port]['tx'][key])
751 port_stats['rx'][key] = int(stats[port]['rx'][key])
753 port_stats['rx'][key] = 0
754 port_stats['rx']['avg_delay_usec'] = cast_integer(
755 stats[port]['rx']['avg_delay_usec'])
756 port_stats['rx']['min_delay_usec'] = cast_integer(
757 stats[port]['rx']['min_delay_usec'])
758 port_stats['rx']['max_delay_usec'] = cast_integer(
759 stats[port]['rx']['max_delay_usec'])
760 port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats)
761 retDict[str(port)] = port_stats
763 ports = sorted(list(retDict.keys()), key=str)
764 if self.run_config['bidirectional']:
765 retDict['overall'] = {'tx': {}, 'rx': {}}
767 retDict['overall']['tx'][key] = \
768 retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
770 retDict['overall']['rx'][key] = \
771 retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
772 total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
773 retDict[ports[1]]['rx']['total_pkts']]
774 avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
775 retDict[ports[1]]['rx']['avg_delay_usec']]
776 max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
777 retDict[ports[1]]['rx']['max_delay_usec']]
778 min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
779 retDict[ports[1]]['rx']['min_delay_usec']]
780 retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
781 retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
782 retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
783 for key in ['pkt_bit_rate', 'pkt_rate']:
784 for dirc in ['tx', 'rx']:
785 retDict['overall'][dirc][key] /= 2.0
787 retDict['overall'] = retDict[ports[0]]
788 retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
791 def __convert_rates(self, rate):
792 return utils.convert_rates(self.run_config['l2frame_size'],
796 def __ndr_pdr_found(self, tag, load):
797 rates = self.__convert_rates({'rate_percent': load})
798 self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
799 last_stats = self.iteration_collector.peek()
800 self.interval_collector.add_ndr_pdr(tag, last_stats)
802 def __format_output_stats(self, stats):
803 for key in self.PORTS + ['overall']:
805 interface = stats[key]
807 'tx_pkts': interface['tx']['total_pkts'],
808 'rx_pkts': interface['rx']['total_pkts'],
809 'drop_percentage': interface['drop_rate_percent'],
810 'drop_pct': interface['rx']['dropped_pkts'],
811 'avg_delay_usec': interface['rx']['avg_delay_usec'],
812 'max_delay_usec': interface['rx']['max_delay_usec'],
813 'min_delay_usec': interface['rx']['min_delay_usec'],
818 def __targets_found(self, rate, targets, results):
819 for tag, target in list(targets.items()):
820 LOG.info('Found %s (%s) load: %s', tag, target, rate)
821 self.__ndr_pdr_found(tag, rate)
822 results[tag]['timestamp_sec'] = time.time()
824 def __range_search(self, left, right, targets, results):
825 """Perform a binary search for a list of targets inside a [left..right] range or rate.
827 left the left side of the range to search as a % the line rate (100 = 100% line rate)
828 indicating the rate to send on each interface
829 right the right side of the range to search as a % of line rate
830 indicating the rate to send on each interface
831 targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
833 results a dict to store results
837 LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
839 # Terminate search when gap is less than load epsilon
840 if right - left < self.config.measurement.load_epsilon:
841 self.__targets_found(left, targets, results)
844 # Obtain the average drop rate in for middle load
845 middle = (left + right) / 2.0
847 stats, rates = self.__run_search_iteration(middle)
849 LOG.exception("Got exception from traffic generator during binary search")
850 self.__targets_found(left, targets, results)
852 # Split target dicts based on the avg drop rate
855 for tag, target in list(targets.items()):
856 if stats['overall']['drop_rate_percent'] <= target:
857 # record the best possible rate found for this target
859 results[tag].update({
860 'load_percent_per_direction': middle,
861 'stats': self.__format_output_stats(dict(stats)),
862 'timestamp_sec': None
864 right_targets[tag] = target
866 # initialize to 0 all fields of result for
867 # the worst case scenario of the binary search (if ndr/pdr is not found)
868 if tag not in results:
869 results[tag] = dict.fromkeys(rates, 0)
870 empty_stats = self.__format_output_stats(dict(stats))
871 for key in empty_stats:
872 if isinstance(empty_stats[key], dict):
873 empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
876 results[tag].update({
877 'load_percent_per_direction': 0,
878 'stats': empty_stats,
879 'timestamp_sec': None
881 left_targets[tag] = target
884 self.__range_search(left, middle, left_targets, results)
886 # search upper half only if the upper rate does not exceed
887 # 100%, this only happens when the first search at 100%
888 # yields a DR that is < target DR
890 self.__targets_found(100, right_targets, results)
892 self.__range_search(middle, right, right_targets, results)
894 def __run_search_iteration(self, rate):
895 """Run one iteration at the given rate level.
897 rate: the rate to send on each port in percent (0 to 100)
899 self._modify_load(rate)
901 # poll interval stats and collect them
902 for stats in self.run_traffic():
903 self.interval_collector.add(stats)
904 time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
905 if time_elapsed_ratio >= 1:
906 self.cancel_traffic()
907 if not self.skip_sleep():
908 time.sleep(self.config.pause_sec)
909 self.interval_collector.reset()
911 # get stats from the run
912 stats = self.runner.client.get_stats()
913 current_traffic_config = self._get_traffic_config()
914 warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
915 stats['total_tx_rate'])
916 if warning is not None:
917 stats['warning'] = warning
919 # save reliable stats from whole iteration
920 self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
921 LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
922 return stats, current_traffic_config['direction-total']
925 def log_stats(stats):
926 """Log estimated stats during run."""
928 'datetime': str(datetime.now()),
929 'tx_packets': stats['overall']['tx']['total_pkts'],
930 'rx_packets': stats['overall']['rx']['total_pkts'],
931 'drop_packets': stats['overall']['rx']['dropped_pkts'],
932 'drop_rate_percent': stats['overall']['drop_rate_percent']
934 LOG.info('TX: %(tx_packets)d; '
935 'RX: %(rx_packets)d; '
936 'Est. Dropped: %(drop_packets)d; '
937 'Est. Drop rate: %(drop_rate_percent).4f%%',
940 def run_traffic(self):
941 """Start traffic and return intermediate stats for each interval."""
942 stats = self.runner.run()
943 while self.runner.is_running:
944 self.log_stats(stats)
946 stats = self.runner.poll_stats()
949 self.log_stats(stats)
950 LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
953 def cancel_traffic(self):
957 def _get_traffic_config(self):
962 for idx, rate in enumerate(self.run_config['rates']):
963 key = 'direction-forward' if idx == 0 else 'direction-reverse'
965 'l2frame_size': self.run_config['l2frame_size'],
966 'duration_sec': self.run_config['duration_sec']
968 config[key].update(rate)
969 config[key].update(self.__convert_rates(rate))
970 load_total += float(config[key]['rate_percent'])
971 bps_total += float(config[key]['rate_bps'])
972 pps_total += float(config[key]['rate_pps'])
973 config['direction-total'] = dict(config['direction-forward'])
974 config['direction-total'].update({
975 'rate_percent': load_total,
976 'rate_pps': cast_integer(pps_total),
977 'rate_bps': bps_total
982 def get_run_config(self, results):
983 """Return configuration which was used for the last run."""
985 # because we want each direction to have the far end RX rates,
986 # use the far end index (1-idx) to retrieve the RX rates
987 for idx, key in enumerate(["direction-forward", "direction-reverse"]):
988 tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec
989 rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec
991 "orig": self.__convert_rates(self.run_config['rates'][idx]),
992 "tx": self.__convert_rates({'rate_pps': tx_rate}),
993 "rx": self.__convert_rates({'rate_pps': rx_rate})
997 for direction in ['orig', 'tx', 'rx']:
998 total[direction] = {}
999 for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
1000 total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())])
1002 r['direction-total'] = total
1005 def insert_interface_stats(self, pps_list):
1006 """Insert interface stats to a list of packet path stats.
1008 pps_list: a list of packet path stats instances indexed by chain index
1010 This function will insert the packet path stats for the traffic gen ports 0 and 1
1011 with itemized per chain tx/rx counters.
1012 There will be as many packet path stats as chains.
1013 Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1016 PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1017 PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1021 def get_if_stats(chain_idx):
1022 return [InterfaceStats('p' + str(port), self.tool)
1023 for port in range(2)]
1024 # keep the list of list of interface stats indexed by the chain id
1025 self.ifstats = [get_if_stats(chain_idx)
1026 for chain_idx in range(self.config.service_chain_count)]
1027 # note that we need to make a copy of the ifs list so that any modification in the
1028 # list from pps will not change the list saved in self.ifstats
1029 self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1030 # insert the corresponding pps in the passed list
1031 pps_list.extend(self.pps_list)
1033 def update_interface_stats(self, diff=False):
1034 """Update all interface stats.
1036 diff: if False, simply refresh the interface stats values with latest values
1037 if True, diff the interface stats with the latest values
1038 Make sure that the interface stats inserted in insert_interface_stats() are updated
1042 [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1043 [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1048 stats = self.gen.get_stats()
1049 for chain_idx, ifs in enumerate(self.ifstats):
1050 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1051 # corresponding to the
1052 # port 0 and port 1 for the given chain_idx
1053 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1054 # interface stats for the pps because it could have been modified to contain
1055 # additional interface stats
1056 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1057 # special handling for vxlan
1058 # in case of vxlan, flow stats are not available so all rx counters will be
1059 # zeros when the total rx port counter is non zero.
1061 for port in range(2):
1063 for ifs in self.ifstats:
1064 total_rx += ifs[port].rx
1066 # check if the total port rx from Trex is also zero
1067 port_rx = stats[port]['rx']['total_pkts']
1069 # the total rx for all chains from port level stats is non zero
1070 # which means that the per-chain stats are not available
1071 if len(self.ifstats) == 1:
1072 # only one chain, simply report the port level rx to the chain rx stats
1073 self.ifstats[0][port].rx = port_rx
1075 for ifs in self.ifstats:
1076 # mark this data as unavailable
1078 # pitch in the total rx only in the last chain pps
1079 self.ifstats[-1][port].rx_total = port_rx
1082 def compare_tx_rates(required, actual):
1083 """Compare the actual TX rate to the required TX rate."""
1085 are_different = False
1087 if float(actual) / required < threshold:
1088 are_different = True
1089 except ZeroDivisionError:
1090 are_different = True
1093 msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1094 "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1095 "to achieve the requested TX rate.".format(r=required, a=actual)
1101 def get_per_direction_rate(self):
1102 """Get the rate for each direction."""
1103 divisor = 2 if self.run_config['bidirectional'] else 1
1104 if 'rate_percent' in self.current_total_rate:
1105 # don't split rate if it's percentage
1108 return utils.divide_rate(self.current_total_rate, divisor)
1111 """Close this instance."""
1113 self.gen.stop_traffic()
1116 self.gen.clear_stats()