1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
17 from datetime import datetime
22 from attrdict import AttrDict
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: enable=import-error
32 from packet_stats import InterfaceStats
33 from packet_stats import PacketPathStats
34 from stats_collector import IntervalCollector
35 from stats_collector import IterationCollector
36 import traffic_gen.traffic_utils as utils
37 from utils import cast_integer
40 class TrafficClientException(Exception):
41 """Generic traffic client exception."""
46 class TrafficRunner(object):
47 """Serialize various steps required to run traffic."""
49 def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
50 """Create a traffic runner."""
52 self.start_time = None
53 self.duration_sec = duration_sec
54 self.interval_sec = interval_sec
55 self.service_mode = service_mode
58 """Clear stats and instruct the traffic generator to start generating traffic."""
61 LOG.info('Running traffic generator')
62 self.client.gen.clear_stats()
63 # Debug use only : new '--service-mode' option available for the NFVBench command line.
64 # A read-only mode TRex console would be able to capture the generated traffic.
65 self.client.gen.set_service_mode(enabled=self.service_mode)
66 LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
67 self.client.gen.start_traffic()
68 self.start_time = time.time()
69 return self.poll_stats()
72 """Stop the current run and instruct the traffic generator to stop traffic."""
74 self.start_time = None
75 self.client.gen.stop_traffic()
78 """Check if a run is still pending."""
79 return self.start_time is not None
81 def time_elapsed(self):
82 """Return time elapsed since start of run."""
84 return time.time() - self.start_time
85 return self.duration_sec
88 """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
90 return: latest stats or None if traffic is stopped
92 if not self.is_running():
94 if self.client.skip_sleep():
96 return self.client.get_stats()
97 time_elapsed = self.time_elapsed()
98 if time_elapsed > self.duration_sec:
101 time_left = self.duration_sec - time_elapsed
102 if self.interval_sec > 0.0:
103 if time_left <= self.interval_sec:
104 time.sleep(time_left)
107 time.sleep(self.interval_sec)
109 time.sleep(self.duration_sec)
111 return self.client.get_stats()
114 class IpBlock(object):
115 """Manage a block of IP addresses."""
117 def __init__(self, base_ip, step_ip, count_ip):
118 """Create an IP block."""
119 self.base_ip_int = Device.ip_to_int(base_ip)
120 self.step = Device.ip_to_int(step_ip)
121 self.max_available = count_ip
124 def get_ip(self, index=0):
125 """Return the IP address at given index."""
126 if index < 0 or index >= self.max_available:
127 raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
128 return Device.int_to_ip(self.base_ip_int + index * self.step)
130 def reserve_ip_range(self, count):
131 """Reserve a range of count consecutive IP addresses spaced by step."""
132 if self.next_free + count > self.max_available:
133 raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
137 first_ip = self.get_ip(self.next_free)
138 last_ip = self.get_ip(self.next_free + count - 1)
139 self.next_free += count
140 return (first_ip, last_ip)
142 def reset_reservation(self):
143 """Reset all reservations and restart with a completely unused IP block."""
147 class Device(object):
148 """Represent a port device and all information associated to it.
150 In the curent version we only support 2 port devices for the traffic generator
151 identified as port 0 or port 1.
154 def __init__(self, port, generator_config):
155 """Create a new device for a given port."""
156 self.generator_config = generator_config
157 self.chain_count = generator_config.service_chain_count
158 self.flow_count = generator_config.flow_count / 2
160 self.switch_port = generator_config.interfaces[port].get('switch_port', None)
161 self.vtep_vlan = None
162 self.vtep_src_mac = None
164 self.pci = generator_config.interfaces[port].pci
166 self.dest_macs = None
167 self.vtep_dst_mac = None
168 self.vtep_dst_ip = None
169 if generator_config.vteps is None:
170 self.vtep_src_ip = None
172 self.vtep_src_ip = generator_config.vteps[port]
175 self.ip_addrs = generator_config.ip_addrs[port]
176 subnet = IPNetwork(self.ip_addrs)
177 self.ip = subnet.ip.format()
178 self.ip_addrs_step = generator_config.ip_addrs_step
179 self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
180 self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
181 generator_config.gateway_ip_addrs_step,
183 self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
184 self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
185 generator_config.tg_gateway_ip_addrs_step,
187 self.udp_src_port = generator_config.udp_src_port
188 self.udp_dst_port = generator_config.udp_dst_port
190 def set_mac(self, mac):
191 """Set the local MAC for this port device."""
193 raise TrafficClientException('Trying to set traffic generator MAC address as None')
196 def get_peer_device(self):
197 """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
198 return self.generator_config.devices[1 - self.port]
200 def set_vtep_dst_mac(self, dest_macs):
201 """Set the list of dest MACs indexed by the chain id.
203 This is only called in 2 cases:
204 - VM macs discovered using openstack API
205 - dest MACs provisioned in config file
207 self.vtep_dst_mac = map(str, dest_macs)
209 def set_dest_macs(self, dest_macs):
210 """Set the list of dest MACs indexed by the chain id.
212 This is only called in 2 cases:
213 - VM macs discovered using openstack API
214 - dest MACs provisioned in config file
216 self.dest_macs = map(str, dest_macs)
218 def get_dest_macs(self):
219 """Get the list of dest macs for this device.
221 If set_dest_macs was never called, assumes l2-loopback and return
222 a list of peer mac (as many as chains but normally only 1 chain)
225 return self.dest_macs
226 # assume this is l2-loopback
227 return [self.get_peer_device().mac] * self.chain_count
229 def set_vlans(self, vlans):
230 """Set the list of vlans to use indexed by the chain id."""
232 LOG.info("Port %d: VLANs %s", self.port, self.vlans)
234 def set_vtep_vlan(self, vlan):
235 """Set the vtep vlan to use indexed by specific port."""
236 self.vtep_vlan = vlan
238 self.vlan_tagging = None
239 LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
241 def set_vxlan_endpoints(self, src_ip, dst_ip):
242 self.vtep_dst_ip = dst_ip
243 self.vtep_src_ip = src_ip
244 LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
245 self.vtep_src_ip, self.vtep_dst_ip)
247 def set_vxlans(self, vnis):
249 LOG.info("Port %d: VNIs %s", self.port, self.vnis)
251 def set_gw_ip(self, gateway_ip):
252 self.gw_ip_block = IpBlock(gateway_ip,
253 self.generator_config.gateway_ip_addrs_step,
256 def get_gw_ip(self, chain_index):
257 """Retrieve the IP address assigned for the gateway of a given chain."""
258 return self.gw_ip_block.get_ip(chain_index)
260 def get_stream_configs(self):
261 """Get the stream config for a given chain on this device.
263 Called by the traffic generator driver to program the traffic generator properly
264 before generating traffic
267 # exact flow count for each chain is calculated as follows:
268 # - all chains except the first will have the same flow count
269 # calculated as (total_flows + chain_count - 1) / chain_count
270 # - the first chain will have the remainder
271 # example 11 flows and 3 chains => 3, 4, 4
272 flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
273 cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
274 peer = self.get_peer_device()
275 self.ip_block.reset_reservation()
276 peer.ip_block.reset_reservation()
277 dest_macs = self.get_dest_macs()
279 for chain_idx in xrange(self.chain_count):
280 src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
281 dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
284 'count': cur_chain_flow_count,
286 'mac_dst': dest_macs[chain_idx],
287 'ip_src_addr': src_ip_first,
288 'ip_src_addr_max': src_ip_last,
289 'ip_src_count': cur_chain_flow_count,
290 'ip_dst_addr': dst_ip_first,
291 'ip_dst_addr_max': dst_ip_last,
292 'ip_dst_count': cur_chain_flow_count,
293 'ip_addrs_step': self.ip_addrs_step,
294 'udp_src_port': self.udp_src_port,
295 'udp_dst_port': self.udp_dst_port,
296 'mac_discovery_gw': self.get_gw_ip(chain_idx),
297 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
298 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
299 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
301 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
302 'vtep_src_mac': self.mac if self.vxlan is True else None,
303 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
304 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
305 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
306 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
308 # after first chain, fall back to the flow count for all other chains
309 cur_chain_flow_count = flows_per_chain
314 """Convert an IP address from string to numeric."""
315 return struct.unpack("!I", socket.inet_aton(addr))[0]
318 def int_to_ip(nvalue):
319 """Convert an IP address from numeric to string."""
320 return socket.inet_ntoa(struct.pack("!I", nvalue))
323 class GeneratorConfig(object):
324 """Represents traffic configuration for currently running traffic profile."""
326 DEFAULT_IP_STEP = '0.0.0.1'
327 DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
329 def __init__(self, config):
330 """Create a generator config."""
332 # name of the generator profile (normally trex or dummy)
333 # pick the default one if not specified explicitly from cli options
334 if not config.generator_profile:
335 config.generator_profile = config.traffic_generator.default_profile
336 # pick up the profile dict based on the name
337 gen_config = self.__match_generator_profile(config.traffic_generator,
338 config.generator_profile)
339 self.gen_config = gen_config
340 # copy over fields from the dict
341 self.tool = gen_config.tool
342 self.ip = gen_config.ip
343 # overrides on config.cores and config.mbuf_factor
345 self.cores = config.cores
347 self.cores = gen_config.get('cores', 1)
348 self.mbuf_factor = config.mbuf_factor
349 self.mbuf_64 = config.mbuf_64
350 self.hdrh = not config.disable_hdrh
351 if gen_config.intf_speed:
352 # interface speed is overriden from config
353 self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
355 # interface speed is discovered/provided by the traffic generator
357 self.name = gen_config.name
358 self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
359 self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
360 self.limit_memory = gen_config.get('limit_memory', 1024)
361 self.software_mode = gen_config.get('software_mode', False)
362 self.interfaces = gen_config.interfaces
363 if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
364 raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
365 self.service_chain = config.service_chain
366 self.service_chain_count = config.service_chain_count
367 self.flow_count = config.flow_count
368 self.host_name = gen_config.host_name
370 self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
371 self.ip_addrs = gen_config.ip_addrs
372 self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
373 self.tg_gateway_ip_addrs_step = \
374 gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
375 self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
376 self.gateway_ips = gen_config.gateway_ip_addrs
377 self.udp_src_port = gen_config.udp_src_port
378 self.udp_dst_port = gen_config.udp_dst_port
379 self.vteps = gen_config.get('vteps')
380 self.devices = [Device(port, self) for port in [0, 1]]
381 # This should normally always be [0, 1]
382 self.ports = [device.port for device in self.devices]
384 # check that pci is not empty
385 if not gen_config.interfaces[0].get('pci', None) or \
386 not gen_config.interfaces[1].get('pci', None):
387 raise TrafficClientException("configuration interfaces pci fields cannot be empty")
389 self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
390 self.vlan_tagging = config.vlan_tagging
392 # needed for result/summarizer
393 config['tg-name'] = gen_config.name
394 config['tg-tool'] = self.tool
397 """Get json form to display the content into the overall result dict."""
398 return dict(self.gen_config)
400 def set_dest_macs(self, port_index, dest_macs):
401 """Set the list of dest MACs indexed by the chain id on given port.
403 port_index: the port for which dest macs must be set
404 dest_macs: a list of dest MACs indexed by chain id
406 if len(dest_macs) < self.config.service_chain_count:
407 raise TrafficClientException('Dest MAC list %s must have %d entries' %
408 (dest_macs, self.config.service_chain_count))
409 # only pass the first scc dest MACs
410 self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
411 LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
413 def set_vtep_dest_macs(self, port_index, dest_macs):
414 """Set the list of dest MACs indexed by the chain id on given port.
416 port_index: the port for which dest macs must be set
417 dest_macs: a list of dest MACs indexed by chain id
419 if len(dest_macs) != self.config.service_chain_count:
420 raise TrafficClientException('Dest MAC list %s must have %d entries' %
421 (dest_macs, self.config.service_chain_count))
422 self.devices[port_index].set_vtep_dst_mac(dest_macs)
423 LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
425 def get_dest_macs(self):
426 """Return the list of dest macs indexed by port."""
427 return [dev.get_dest_macs() for dev in self.devices]
429 def set_vlans(self, port_index, vlans):
430 """Set the list of vlans to use indexed by the chain id on given port.
432 port_index: the port for which VLANs must be set
433 vlans: a list of vlan lists indexed by chain id
435 if len(vlans) != self.config.service_chain_count:
436 raise TrafficClientException('VLAN list %s must have %d entries' %
437 (vlans, self.config.service_chain_count))
438 self.devices[port_index].set_vlans(vlans)
440 def set_vxlans(self, port_index, vxlans):
441 """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
443 port_index: the port for which VXLANs must be set
444 VXLANs: a list of VNIs lists indexed by chain id
446 if len(vxlans) != self.config.service_chain_count:
447 raise TrafficClientException('VXLAN list %s must have %d entries' %
448 (vxlans, self.config.service_chain_count))
449 self.devices[port_index].set_vxlans(vxlans)
451 def set_vtep_vlan(self, port_index, vlan):
452 """Set the vtep vlan to use indexed by the chain id on given port.
453 port_index: the port for which VLAN must be set
455 self.devices[port_index].set_vtep_vlan(vlan)
457 def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
458 self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
461 def __match_generator_profile(traffic_generator, generator_profile):
462 gen_config = AttrDict(traffic_generator)
463 gen_config.pop('default_profile')
464 gen_config.pop('generator_profile')
465 matching_profile = [profile for profile in traffic_generator.generator_profile if
466 profile.name == generator_profile]
467 if len(matching_profile) != 1:
468 raise Exception('Traffic generator profile not found: ' + generator_profile)
470 gen_config.update(matching_profile[0])
474 class TrafficClient(object):
475 """Traffic generator client with NDR/PDR binary seearch."""
479 def __init__(self, config, notifier=None):
480 """Create a new TrafficClient instance.
482 config: nfvbench config
483 notifier: notifier (optional)
485 A new instance is created everytime the nfvbench config may have changed.
488 self.generator_config = GeneratorConfig(config)
489 self.tool = self.generator_config.tool
490 self.gen = self._get_generator()
491 self.notifier = notifier
492 self.interval_collector = None
493 self.iteration_collector = None
494 self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
495 self.config.service_mode)
496 self.config.frame_sizes = self._get_frame_sizes()
498 'l2frame_size': None,
499 'duration_sec': self.config.duration_sec,
500 'bidirectional': True,
501 'rates': [] # to avoid unsbuscriptable-obj warning
503 self.current_total_rate = {'rate_percent': '10'}
504 if self.config.single_run:
505 self.current_total_rate = utils.parse_rate_str(self.config.rate)
507 # Speed is either discovered when connecting to TG or set from config
508 # This variable is 0 if not yet discovered from TG or must be the speed of
509 # each interface in bits per second
510 self.intf_speed = self.generator_config.intf_speed
512 def _get_generator(self):
513 tool = self.tool.lower()
515 from traffic_gen import trex_gen
516 return trex_gen.TRex(self)
518 from traffic_gen import dummy
519 return dummy.DummyTG(self)
520 raise TrafficClientException('Unsupported generator tool name:' + self.tool)
522 def skip_sleep(self):
523 """Skip all sleeps when doing unit testing with dummy TG.
525 Must be overriden using mock.patch
529 def _get_frame_sizes(self):
530 traffic_profile_name = self.config.traffic.profile
531 matching_profiles = [profile for profile in self.config.traffic_profile if
532 profile.name == traffic_profile_name]
533 if len(matching_profiles) > 1:
534 raise TrafficClientException('Multiple traffic profiles with name: ' +
535 traffic_profile_name)
536 elif not matching_profiles:
537 raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
538 return matching_profiles[0].l2frame_size
540 def start_traffic_generator(self):
541 """Start the traffic generator process (traffic not started yet)."""
543 # pick up the interface speed if it is not set from config
544 intf_speeds = self.gen.get_port_speed_gbps()
545 # convert Gbps unit into bps
546 tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
548 # interface speed is overriden from config
549 if self.intf_speed != tg_if_speed:
550 # Warn the user if the speed in the config is different
551 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
554 # interface speed not provisioned by config
555 self.intf_speed = tg_if_speed
556 # also update the speed in the tg config
557 self.generator_config.intf_speed = tg_if_speed
559 # Save the traffic generator local MAC
560 for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
564 """Set up the traffic client."""
565 self.gen.clear_stats()
567 def get_version(self):
568 """Get the traffic generator version."""
569 return self.gen.get_version()
571 def ensure_end_to_end(self):
572 """Ensure traffic generator receives packets it has transmitted.
574 This ensures end to end connectivity and also waits until VMs are ready to forward packets.
576 VMs that are started and in active state may not pass traffic yet. It is imperative to make
577 sure that all VMs are passing traffic in both directions before starting any benchmarking.
578 To verify this, we need to send at a low frequency bi-directional packets and make sure
579 that we receive all packets back from all VMs. The number of flows is equal to 2 times
580 the number of chains (1 per direction) and we need to make sure we receive packets coming
581 from exactly 2 x chain count different source MAC addresses.
584 PVP chain (1 VM per chain)
585 N = 10 (number of chains)
586 Flow count = 20 (number of flows)
587 If the number of unique source MAC addresses from received packets is 20 then
588 all 10 VMs 10 VMs are in operational state.
590 LOG.info('Starting traffic generator to ensure end-to-end connectivity')
591 # send 2pps on each chain and each direction
592 rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
593 self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
595 # ensures enough traffic is coming back
596 retry_count = (self.config.check_traffic_time_sec +
597 self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
599 # we expect to see packets coming from 2 unique MAC per chain
600 # because there can be flooding in the case of shared net
601 # we must verify that packets from the right VMs are received
602 # and not just count unique src MAC
603 # create a dict of (port, chain) tuples indexed by dest mac
605 for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
606 for chain, mac in enumerate(dest_macs):
607 mac_map[mac] = (port, chain)
608 unique_src_mac_count = len(mac_map)
609 if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
610 get_mac_id = lambda packet: packet['binary'][60:66]
611 elif self.config.vxlan:
612 get_mac_id = lambda packet: packet['binary'][56:62]
614 get_mac_id = lambda packet: packet['binary'][6:12]
615 for it in xrange(retry_count):
616 self.gen.clear_stats()
617 self.gen.start_traffic()
618 self.gen.start_capture()
619 LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
620 unique_src_mac_count - len(mac_map), unique_src_mac_count,
622 if not self.skip_sleep():
623 time.sleep(self.config.generic_poll_sec)
624 self.gen.stop_traffic()
625 self.gen.fetch_capture_packets()
626 self.gen.stop_capture()
627 for packet in self.gen.packet_list:
628 mac_id = get_mac_id(packet)
629 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
630 if src_mac in mac_map and self.is_udp(packet):
631 port, chain = mac_map[src_mac]
632 LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
633 src_mac, chain, port)
634 mac_map.pop(src_mac, None)
637 LOG.info('End-to-end connectivity established')
639 if self.config.l3_router and not self.config.no_arp:
640 # In case of L3 traffic mode, routers are not able to route traffic
641 # until VM interfaces are up and ARP requests are done
642 LOG.info('Waiting for loopback service completely started...')
643 LOG.info('Sending ARP request to assure end-to-end connectivity established')
644 self.ensure_arp_successful()
645 raise TrafficClientException('End-to-end connectivity cannot be ensured')
647 def is_udp(self, packet):
648 pkt = Ether(packet['binary'])
651 def ensure_arp_successful(self):
652 """Resolve all IP using ARP and throw an exception in case of failure."""
653 dest_macs = self.gen.resolve_arp()
655 # all dest macs are discovered, saved them into the generator config
656 if self.config.vxlan:
657 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
658 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
660 self.generator_config.set_dest_macs(0, dest_macs[0])
661 self.generator_config.set_dest_macs(1, dest_macs[1])
663 raise TrafficClientException('ARP cannot be resolved')
665 def set_traffic(self, frame_size, bidirectional):
666 """Reconfigure the traffic generator for a new frame size."""
667 self.run_config['bidirectional'] = bidirectional
668 self.run_config['l2frame_size'] = frame_size
669 self.run_config['rates'] = [self.get_per_direction_rate()]
671 self.run_config['rates'].append(self.get_per_direction_rate())
673 unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
674 if unidir_reverse_pps > 0:
675 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
676 # Fix for [NFVBENCH-67], convert the rate string to PPS
677 for idx, rate in enumerate(self.run_config['rates']):
678 if 'rate_pps' not in rate:
679 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
681 self.gen.clear_streamblock()
682 if self.config.no_latency_streams:
683 LOG.info("Latency streams are disabled")
684 self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
685 latency=not self.config.no_latency_streams)
687 def _modify_load(self, load):
688 self.current_total_rate = {'rate_percent': str(load)}
689 rate_per_direction = self.get_per_direction_rate()
691 self.gen.modify_rate(rate_per_direction, False)
692 self.run_config['rates'][0] = rate_per_direction
693 if self.run_config['bidirectional']:
694 self.gen.modify_rate(rate_per_direction, True)
695 self.run_config['rates'][1] = rate_per_direction
697 def get_ndr_and_pdr(self):
698 """Start the NDR/PDR iteration and return the results."""
699 dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
701 if self.config.ndr_run:
702 LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
703 targets['ndr'] = self.config.measurement.NDR
704 if self.config.pdr_run:
705 LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
706 targets['pdr'] = self.config.measurement.PDR
708 self.run_config['start_time'] = time.time()
709 self.interval_collector = IntervalCollector(self.run_config['start_time'])
710 self.interval_collector.attach_notifier(self.notifier)
711 self.iteration_collector = IterationCollector(self.run_config['start_time'])
713 self.__range_search(0.0, 200.0, targets, results)
715 results['iteration_stats'] = {
716 'ndr_pdr': self.iteration_collector.get()
719 if self.config.ndr_run:
720 LOG.info('NDR load: %s', results['ndr']['rate_percent'])
721 results['ndr']['time_taken_sec'] = \
722 results['ndr']['timestamp_sec'] - self.run_config['start_time']
723 if self.config.pdr_run:
724 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
725 results['pdr']['time_taken_sec'] = \
726 results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
728 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
729 results['pdr']['time_taken_sec'] = \
730 results['pdr']['timestamp_sec'] - self.run_config['start_time']
733 def __get_dropped_rate(self, result):
734 dropped_pkts = result['rx']['dropped_pkts']
735 total_pkts = result['tx']['total_pkts']
738 return float(dropped_pkts) / total_pkts * 100
741 """Collect final stats for previous run."""
742 stats = self.gen.get_stats()
743 retDict = {'total_tx_rate': stats['total_tx_rate']}
744 for port in self.PORTS:
745 retDict[port] = {'tx': {}, 'rx': {}}
747 tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
748 rx_keys = tx_keys + ['dropped_pkts']
750 for port in self.PORTS:
752 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
755 retDict[port]['rx'][key] = int(stats[port]['rx'][key])
757 retDict[port]['rx'][key] = 0
758 retDict[port]['rx']['avg_delay_usec'] = cast_integer(
759 stats[port]['rx']['avg_delay_usec'])
760 retDict[port]['rx']['min_delay_usec'] = cast_integer(
761 stats[port]['rx']['min_delay_usec'])
762 retDict[port]['rx']['max_delay_usec'] = cast_integer(
763 stats[port]['rx']['max_delay_usec'])
764 retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
766 ports = sorted(retDict.keys())
767 if self.run_config['bidirectional']:
768 retDict['overall'] = {'tx': {}, 'rx': {}}
770 retDict['overall']['tx'][key] = \
771 retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
773 retDict['overall']['rx'][key] = \
774 retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
775 total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
776 retDict[ports[1]]['rx']['total_pkts']]
777 avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
778 retDict[ports[1]]['rx']['avg_delay_usec']]
779 max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
780 retDict[ports[1]]['rx']['max_delay_usec']]
781 min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
782 retDict[ports[1]]['rx']['min_delay_usec']]
783 retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
784 retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
785 retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
786 for key in ['pkt_bit_rate', 'pkt_rate']:
787 for dirc in ['tx', 'rx']:
788 retDict['overall'][dirc][key] /= 2.0
790 retDict['overall'] = retDict[ports[0]]
791 retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
794 def __convert_rates(self, rate):
795 return utils.convert_rates(self.run_config['l2frame_size'],
799 def __ndr_pdr_found(self, tag, load):
800 rates = self.__convert_rates({'rate_percent': load})
801 self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
802 last_stats = self.iteration_collector.peek()
803 self.interval_collector.add_ndr_pdr(tag, last_stats)
805 def __format_output_stats(self, stats):
806 for key in self.PORTS + ['overall']:
807 interface = stats[key]
809 'tx_pkts': interface['tx']['total_pkts'],
810 'rx_pkts': interface['rx']['total_pkts'],
811 'drop_percentage': interface['drop_rate_percent'],
812 'drop_pct': interface['rx']['dropped_pkts'],
813 'avg_delay_usec': interface['rx']['avg_delay_usec'],
814 'max_delay_usec': interface['rx']['max_delay_usec'],
815 'min_delay_usec': interface['rx']['min_delay_usec'],
820 def __targets_found(self, rate, targets, results):
821 for tag, target in targets.iteritems():
822 LOG.info('Found %s (%s) load: %s', tag, target, rate)
823 self.__ndr_pdr_found(tag, rate)
824 results[tag]['timestamp_sec'] = time.time()
826 def __range_search(self, left, right, targets, results):
827 """Perform a binary search for a list of targets inside a [left..right] range or rate.
829 left the left side of the range to search as a % the line rate (100 = 100% line rate)
830 indicating the rate to send on each interface
831 right the right side of the range to search as a % of line rate
832 indicating the rate to send on each interface
833 targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
835 results a dict to store results
839 LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
841 # Terminate search when gap is less than load epsilon
842 if right - left < self.config.measurement.load_epsilon:
843 self.__targets_found(left, targets, results)
846 # Obtain the average drop rate in for middle load
847 middle = (left + right) / 2.0
849 stats, rates = self.__run_search_iteration(middle)
851 LOG.exception("Got exception from traffic generator during binary search")
852 self.__targets_found(left, targets, results)
854 # Split target dicts based on the avg drop rate
857 for tag, target in targets.iteritems():
858 if stats['overall']['drop_rate_percent'] <= target:
859 # record the best possible rate found for this target
861 results[tag].update({
862 'load_percent_per_direction': middle,
863 'stats': self.__format_output_stats(dict(stats)),
864 'timestamp_sec': None
866 right_targets[tag] = target
868 # initialize to 0 all fields of result for
869 # the worst case scenario of the binary search (if ndr/pdr is not found)
870 if tag not in results:
871 results[tag] = dict.fromkeys(rates, 0)
872 empty_stats = self.__format_output_stats(dict(stats))
873 for key in empty_stats:
874 if isinstance(empty_stats[key], dict):
875 empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
878 results[tag].update({
879 'load_percent_per_direction': 0,
880 'stats': empty_stats,
881 'timestamp_sec': None
883 left_targets[tag] = target
886 self.__range_search(left, middle, left_targets, results)
888 # search upper half only if the upper rate does not exceed
889 # 100%, this only happens when the first search at 100%
890 # yields a DR that is < target DR
892 self.__targets_found(100, right_targets, results)
894 self.__range_search(middle, right, right_targets, results)
896 def __run_search_iteration(self, rate):
897 """Run one iteration at the given rate level.
899 rate: the rate to send on each port in percent (0 to 100)
901 self._modify_load(rate)
903 # poll interval stats and collect them
904 for stats in self.run_traffic():
905 self.interval_collector.add(stats)
906 time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
907 if time_elapsed_ratio >= 1:
908 self.cancel_traffic()
909 if not self.skip_sleep():
910 time.sleep(self.config.pause_sec)
911 self.interval_collector.reset()
913 # get stats from the run
914 stats = self.runner.client.get_stats()
915 current_traffic_config = self._get_traffic_config()
916 warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
917 stats['total_tx_rate'])
918 if warning is not None:
919 stats['warning'] = warning
921 # save reliable stats from whole iteration
922 self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
923 LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
924 return stats, current_traffic_config['direction-total']
927 def log_stats(stats):
928 """Log estimated stats during run."""
930 'datetime': str(datetime.now()),
931 'tx_packets': stats['overall']['tx']['total_pkts'],
932 'rx_packets': stats['overall']['rx']['total_pkts'],
933 'drop_packets': stats['overall']['rx']['dropped_pkts'],
934 'drop_rate_percent': stats['overall']['drop_rate_percent']
936 LOG.info('TX: %(tx_packets)d; '
937 'RX: %(rx_packets)d; '
938 'Est. Dropped: %(drop_packets)d; '
939 'Est. Drop rate: %(drop_rate_percent).4f%%',
942 def run_traffic(self):
943 """Start traffic and return intermediate stats for each interval."""
944 stats = self.runner.run()
945 while self.runner.is_running:
946 self.log_stats(stats)
948 stats = self.runner.poll_stats()
951 self.log_stats(stats)
952 LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
955 def cancel_traffic(self):
959 def _get_traffic_config(self):
964 for idx, rate in enumerate(self.run_config['rates']):
965 key = 'direction-forward' if idx == 0 else 'direction-reverse'
967 'l2frame_size': self.run_config['l2frame_size'],
968 'duration_sec': self.run_config['duration_sec']
970 config[key].update(rate)
971 config[key].update(self.__convert_rates(rate))
972 load_total += float(config[key]['rate_percent'])
973 bps_total += float(config[key]['rate_bps'])
974 pps_total += float(config[key]['rate_pps'])
975 config['direction-total'] = dict(config['direction-forward'])
976 config['direction-total'].update({
977 'rate_percent': load_total,
978 'rate_pps': cast_integer(pps_total),
979 'rate_bps': bps_total
984 def get_run_config(self, results):
985 """Return configuration which was used for the last run."""
987 # because we want each direction to have the far end RX rates,
988 # use the far end index (1-idx) to retrieve the RX rates
989 for idx, key in enumerate(["direction-forward", "direction-reverse"]):
990 tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
991 rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
993 "orig": self.__convert_rates(self.run_config['rates'][idx]),
994 "tx": self.__convert_rates({'rate_pps': tx_rate}),
995 "rx": self.__convert_rates({'rate_pps': rx_rate})
999 for direction in ['orig', 'tx', 'rx']:
1000 total[direction] = {}
1001 for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
1002 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
1004 r['direction-total'] = total
1007 def insert_interface_stats(self, pps_list):
1008 """Insert interface stats to a list of packet path stats.
1010 pps_list: a list of packet path stats instances indexed by chain index
1012 This function will insert the packet path stats for the traffic gen ports 0 and 1
1013 with itemized per chain tx/rx counters.
1014 There will be as many packet path stats as chains.
1015 Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1018 PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1019 PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1023 def get_if_stats(chain_idx):
1024 return [InterfaceStats('p' + str(port), self.tool)
1025 for port in range(2)]
1026 # keep the list of list of interface stats indexed by the chain id
1027 self.ifstats = [get_if_stats(chain_idx)
1028 for chain_idx in range(self.config.service_chain_count)]
1029 # note that we need to make a copy of the ifs list so that any modification in the
1030 # list from pps will not change the list saved in self.ifstats
1031 self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1032 # insert the corresponding pps in the passed list
1033 pps_list.extend(self.pps_list)
1035 def update_interface_stats(self, diff=False):
1036 """Update all interface stats.
1038 diff: if False, simply refresh the interface stats values with latest values
1039 if True, diff the interface stats with the latest values
1040 Make sure that the interface stats inserted in insert_interface_stats() are updated
1044 [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1045 [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1050 stats = self.gen.get_stats()
1051 for chain_idx, ifs in enumerate(self.ifstats):
1052 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1053 # corresponding to the
1054 # port 0 and port 1 for the given chain_idx
1055 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1056 # interface stats for the pps because it could have been modified to contain
1057 # additional interface stats
1058 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1059 # special handling for vxlan
1060 # in case of vxlan, flow stats are not available so all rx counters will be
1061 # zeros when the total rx port counter is non zero.
1063 for port in range(2):
1065 for ifs in self.ifstats:
1066 total_rx += ifs[port].rx
1068 # check if the total port rx from Trex is also zero
1069 port_rx = stats[port]['rx']['total_pkts']
1071 # the total rx for all chains from port level stats is non zero
1072 # which means that the per-chain stats are not available
1073 if len(self.ifstats) == 1:
1074 # only one chain, simply report the port level rx to the chain rx stats
1075 self.ifstats[0][port].rx = port_rx
1077 for ifs in self.ifstats:
1078 # mark this data as unavailable
1080 # pitch in the total rx only in the last chain pps
1081 self.ifstats[-1][port].rx_total = port_rx
1084 def compare_tx_rates(required, actual):
1085 """Compare the actual TX rate to the required TX rate."""
1087 are_different = False
1089 if float(actual) / required < threshold:
1090 are_different = True
1091 except ZeroDivisionError:
1092 are_different = True
1095 msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1096 "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1097 "to achieve the requested TX rate.".format(r=required, a=actual)
1103 def get_per_direction_rate(self):
1104 """Get the rate for each direction."""
1105 divisor = 2 if self.run_config['bidirectional'] else 1
1106 if 'rate_percent' in self.current_total_rate:
1107 # don't split rate if it's percentage
1110 return utils.divide_rate(self.current_total_rate, divisor)
1113 """Close this instance."""
1115 self.gen.stop_traffic()
1118 self.gen.clear_stats()