1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
17 from datetime import datetime
22 from attrdict import AttrDict
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: enable=import-error
32 from packet_stats import InterfaceStats
33 from packet_stats import PacketPathStats
34 from stats_collector import IntervalCollector
35 from stats_collector import IterationCollector
36 import traffic_gen.traffic_utils as utils
37 from utils import cast_integer
40 class TrafficClientException(Exception):
41 """Generic traffic client exception."""
46 class TrafficRunner(object):
47 """Serialize various steps required to run traffic."""
49 def __init__(self, client, duration_sec, interval_sec=0):
50 """Create a traffic runner."""
52 self.start_time = None
53 self.duration_sec = duration_sec
54 self.interval_sec = interval_sec
57 """Clear stats and instruct the traffic generator to start generating traffic."""
60 LOG.info('Running traffic generator')
61 self.client.gen.clear_stats()
62 self.client.gen.start_traffic()
63 self.start_time = time.time()
64 return self.poll_stats()
67 """Stop the current run and instruct the traffic generator to stop traffic."""
69 self.start_time = None
70 self.client.gen.stop_traffic()
73 """Check if a run is still pending."""
74 return self.start_time is not None
76 def time_elapsed(self):
77 """Return time elapsed since start of run."""
79 return time.time() - self.start_time
80 return self.duration_sec
83 """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
85 return: latest stats or None if traffic is stopped
87 if not self.is_running():
89 if self.client.skip_sleep():
91 return self.client.get_stats()
92 time_elapsed = self.time_elapsed()
93 if time_elapsed > self.duration_sec:
96 time_left = self.duration_sec - time_elapsed
97 if self.interval_sec > 0.0:
98 if time_left <= self.interval_sec:
102 time.sleep(self.interval_sec)
104 time.sleep(self.duration_sec)
106 return self.client.get_stats()
109 class IpBlock(object):
110 """Manage a block of IP addresses."""
112 def __init__(self, base_ip, step_ip, count_ip):
113 """Create an IP block."""
114 self.base_ip_int = Device.ip_to_int(base_ip)
115 self.step = Device.ip_to_int(step_ip)
116 self.max_available = count_ip
119 def get_ip(self, index=0):
120 """Return the IP address at given index."""
121 if index < 0 or index >= self.max_available:
122 raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
123 return Device.int_to_ip(self.base_ip_int + index * self.step)
125 def reserve_ip_range(self, count):
126 """Reserve a range of count consecutive IP addresses spaced by step."""
127 if self.next_free + count > self.max_available:
128 raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
132 first_ip = self.get_ip(self.next_free)
133 last_ip = self.get_ip(self.next_free + count - 1)
134 self.next_free += count
135 return (first_ip, last_ip)
137 def reset_reservation(self):
138 """Reset all reservations and restart with a completely unused IP block."""
142 class Device(object):
143 """Represent a port device and all information associated to it.
145 In the curent version we only support 2 port devices for the traffic generator
146 identified as port 0 or port 1.
149 def __init__(self, port, generator_config):
150 """Create a new device for a given port."""
151 self.generator_config = generator_config
152 self.chain_count = generator_config.service_chain_count
153 self.flow_count = generator_config.flow_count / 2
155 self.switch_port = generator_config.interfaces[port].get('switch_port', None)
156 self.vtep_vlan = None
157 self.vtep_src_mac = None
159 self.pci = generator_config.interfaces[port].pci
161 self.dest_macs = None
162 self.vtep_dst_mac = None
163 self.vtep_dst_ip = None
164 if generator_config.vteps is None:
165 self.vtep_src_ip = None
167 self.vtep_src_ip = generator_config.vteps[port]
170 self.ip_addrs = generator_config.ip_addrs[port]
171 subnet = IPNetwork(self.ip_addrs)
172 self.ip = subnet.ip.format()
173 self.ip_addrs_step = generator_config.ip_addrs_step
174 self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
175 self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
176 generator_config.gateway_ip_addrs_step,
178 self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
179 self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
180 generator_config.tg_gateway_ip_addrs_step,
182 self.udp_src_port = generator_config.udp_src_port
183 self.udp_dst_port = generator_config.udp_dst_port
185 def set_mac(self, mac):
186 """Set the local MAC for this port device."""
188 raise TrafficClientException('Trying to set traffic generator MAC address as None')
191 def get_peer_device(self):
192 """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
193 return self.generator_config.devices[1 - self.port]
195 def set_vtep_dst_mac(self, dest_macs):
196 """Set the list of dest MACs indexed by the chain id.
198 This is only called in 2 cases:
199 - VM macs discovered using openstack API
200 - dest MACs provisioned in config file
202 self.vtep_dst_mac = map(str, dest_macs)
204 def set_dest_macs(self, dest_macs):
205 """Set the list of dest MACs indexed by the chain id.
207 This is only called in 2 cases:
208 - VM macs discovered using openstack API
209 - dest MACs provisioned in config file
211 self.dest_macs = map(str, dest_macs)
213 def get_dest_macs(self):
214 """Get the list of dest macs for this device.
216 If set_dest_macs was never called, assumes l2-loopback and return
217 a list of peer mac (as many as chains but normally only 1 chain)
220 return self.dest_macs
221 # assume this is l2-loopback
222 return [self.get_peer_device().mac] * self.chain_count
224 def set_vlans(self, vlans):
225 """Set the list of vlans to use indexed by the chain id."""
227 LOG.info("Port %d: VLANs %s", self.port, self.vlans)
229 def set_vtep_vlan(self, vlan):
230 """Set the vtep vlan to use indexed by specific port."""
231 self.vtep_vlan = vlan
233 self.vlan_tagging = None
234 LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
236 def set_vxlan_endpoints(self, src_ip, dst_ip):
237 self.vtep_dst_ip = dst_ip
238 self.vtep_src_ip = src_ip
239 LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
240 self.vtep_src_ip, self.vtep_dst_ip)
242 def set_vxlans(self, vnis):
244 LOG.info("Port %d: VNIs %s", self.port, self.vnis)
246 def set_gw_ip(self, gateway_ip):
247 self.gw_ip_block = IpBlock(gateway_ip,
248 self.generator_config.gateway_ip_addrs_step,
251 def get_gw_ip(self, chain_index):
252 """Retrieve the IP address assigned for the gateway of a given chain."""
253 return self.gw_ip_block.get_ip(chain_index)
255 def get_stream_configs(self):
256 """Get the stream config for a given chain on this device.
258 Called by the traffic generator driver to program the traffic generator properly
259 before generating traffic
262 # exact flow count for each chain is calculated as follows:
263 # - all chains except the first will have the same flow count
264 # calculated as (total_flows + chain_count - 1) / chain_count
265 # - the first chain will have the remainder
266 # example 11 flows and 3 chains => 3, 4, 4
267 flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
268 cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
269 peer = self.get_peer_device()
270 self.ip_block.reset_reservation()
271 peer.ip_block.reset_reservation()
272 dest_macs = self.get_dest_macs()
274 for chain_idx in xrange(self.chain_count):
275 src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
276 dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
279 'count': cur_chain_flow_count,
281 'mac_dst': dest_macs[chain_idx],
282 'ip_src_addr': src_ip_first,
283 'ip_src_addr_max': src_ip_last,
284 'ip_src_count': cur_chain_flow_count,
285 'ip_dst_addr': dst_ip_first,
286 'ip_dst_addr_max': dst_ip_last,
287 'ip_dst_count': cur_chain_flow_count,
288 'ip_addrs_step': self.ip_addrs_step,
289 'udp_src_port': self.udp_src_port,
290 'udp_dst_port': self.udp_dst_port,
291 'mac_discovery_gw': self.get_gw_ip(chain_idx),
292 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
293 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
294 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
296 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
297 'vtep_src_mac': self.mac if self.vxlan is True else None,
298 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
299 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
300 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
301 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
303 # after first chain, fall back to the flow count for all other chains
304 cur_chain_flow_count = flows_per_chain
309 """Convert an IP address from string to numeric."""
310 return struct.unpack("!I", socket.inet_aton(addr))[0]
313 def int_to_ip(nvalue):
314 """Convert an IP address from numeric to string."""
315 return socket.inet_ntoa(struct.pack("!I", nvalue))
318 class GeneratorConfig(object):
319 """Represents traffic configuration for currently running traffic profile."""
321 DEFAULT_IP_STEP = '0.0.0.1'
322 DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
324 def __init__(self, config):
325 """Create a generator config."""
327 # name of the generator profile (normally trex or dummy)
328 # pick the default one if not specified explicitly from cli options
329 if not config.generator_profile:
330 config.generator_profile = config.traffic_generator.default_profile
331 # pick up the profile dict based on the name
332 gen_config = self.__match_generator_profile(config.traffic_generator,
333 config.generator_profile)
334 self.gen_config = gen_config
335 # copy over fields from the dict
336 self.tool = gen_config.tool
337 self.ip = gen_config.ip
338 # overrides on config.cores and config.mbuf_factor
340 self.cores = config.cores
342 self.cores = gen_config.get('cores', 1)
343 self.mbuf_factor = config.mbuf_factor
344 self.hdrh = not config.disable_hdrh
345 if gen_config.intf_speed:
346 # interface speed is overriden from config
347 self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
349 # interface speed is discovered/provided by the traffic generator
351 self.name = gen_config.name
352 self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
353 self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
354 self.limit_memory = gen_config.get('limit_memory', 1024)
355 self.software_mode = gen_config.get('software_mode', False)
356 self.interfaces = gen_config.interfaces
357 if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
358 raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
359 if hasattr(gen_config, 'platform'):
360 self.platform = gen_config.platform
361 self.service_chain = config.service_chain
362 self.service_chain_count = config.service_chain_count
363 self.flow_count = config.flow_count
364 self.host_name = gen_config.host_name
366 self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
367 self.ip_addrs = gen_config.ip_addrs
368 self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
369 self.tg_gateway_ip_addrs_step = \
370 gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
371 self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
372 self.gateway_ips = gen_config.gateway_ip_addrs
373 self.udp_src_port = gen_config.udp_src_port
374 self.udp_dst_port = gen_config.udp_dst_port
375 self.vteps = gen_config.get('vteps')
376 self.devices = [Device(port, self) for port in [0, 1]]
377 # This should normally always be [0, 1]
378 self.ports = [device.port for device in self.devices]
380 # check that pci is not empty
381 if not gen_config.interfaces[0].get('pci', None) or \
382 not gen_config.interfaces[1].get('pci', None):
383 raise TrafficClientException("configuration interfaces pci fields cannot be empty")
385 self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
386 self.vlan_tagging = config.vlan_tagging
388 # needed for result/summarizer
389 config['tg-name'] = gen_config.name
390 config['tg-tool'] = self.tool
393 """Get json form to display the content into the overall result dict."""
394 return dict(self.gen_config)
396 def set_dest_macs(self, port_index, dest_macs):
397 """Set the list of dest MACs indexed by the chain id on given port.
399 port_index: the port for which dest macs must be set
400 dest_macs: a list of dest MACs indexed by chain id
402 if len(dest_macs) < self.config.service_chain_count:
403 raise TrafficClientException('Dest MAC list %s must have %d entries' %
404 (dest_macs, self.config.service_chain_count))
405 # only pass the first scc dest MACs
406 self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
407 LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
409 def set_vtep_dest_macs(self, port_index, dest_macs):
410 """Set the list of dest MACs indexed by the chain id on given port.
412 port_index: the port for which dest macs must be set
413 dest_macs: a list of dest MACs indexed by chain id
415 if len(dest_macs) != self.config.service_chain_count:
416 raise TrafficClientException('Dest MAC list %s must have %d entries' %
417 (dest_macs, self.config.service_chain_count))
418 self.devices[port_index].set_vtep_dst_mac(dest_macs)
419 LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
421 def get_dest_macs(self):
422 """Return the list of dest macs indexed by port."""
423 return [dev.get_dest_macs() for dev in self.devices]
425 def set_vlans(self, port_index, vlans):
426 """Set the list of vlans to use indexed by the chain id on given port.
428 port_index: the port for which VLANs must be set
429 vlans: a list of vlan lists indexed by chain id
431 if len(vlans) != self.config.service_chain_count:
432 raise TrafficClientException('VLAN list %s must have %d entries' %
433 (vlans, self.config.service_chain_count))
434 self.devices[port_index].set_vlans(vlans)
436 def set_vxlans(self, port_index, vxlans):
437 """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
439 port_index: the port for which VXLANs must be set
440 VXLANs: a list of VNIs lists indexed by chain id
442 if len(vxlans) != self.config.service_chain_count:
443 raise TrafficClientException('VXLAN list %s must have %d entries' %
444 (vxlans, self.config.service_chain_count))
445 self.devices[port_index].set_vxlans(vxlans)
447 def set_vtep_vlan(self, port_index, vlan):
448 """Set the vtep vlan to use indexed by the chain id on given port.
449 port_index: the port for which VLAN must be set
451 self.devices[port_index].set_vtep_vlan(vlan)
453 def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
454 self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
457 def __match_generator_profile(traffic_generator, generator_profile):
458 gen_config = AttrDict(traffic_generator)
459 gen_config.pop('default_profile')
460 gen_config.pop('generator_profile')
461 matching_profile = [profile for profile in traffic_generator.generator_profile if
462 profile.name == generator_profile]
463 if len(matching_profile) != 1:
464 raise Exception('Traffic generator profile not found: ' + generator_profile)
466 gen_config.update(matching_profile[0])
470 class TrafficClient(object):
471 """Traffic generator client with NDR/PDR binary seearch."""
475 def __init__(self, config, notifier=None):
476 """Create a new TrafficClient instance.
478 config: nfvbench config
479 notifier: notifier (optional)
481 A new instance is created everytime the nfvbench config may have changed.
484 self.generator_config = GeneratorConfig(config)
485 self.tool = self.generator_config.tool
486 self.gen = self._get_generator()
487 self.notifier = notifier
488 self.interval_collector = None
489 self.iteration_collector = None
490 self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
491 self.config.frame_sizes = self._get_frame_sizes()
493 'l2frame_size': None,
494 'duration_sec': self.config.duration_sec,
495 'bidirectional': True,
496 'rates': [] # to avoid unsbuscriptable-obj warning
498 self.current_total_rate = {'rate_percent': '10'}
499 if self.config.single_run:
500 self.current_total_rate = utils.parse_rate_str(self.config.rate)
502 # Speed is either discovered when connecting to TG or set from config
503 # This variable is 0 if not yet discovered from TG or must be the speed of
504 # each interface in bits per second
505 self.intf_speed = self.generator_config.intf_speed
507 def _get_generator(self):
508 tool = self.tool.lower()
510 from traffic_gen import trex_gen
511 return trex_gen.TRex(self)
513 from traffic_gen import dummy
514 return dummy.DummyTG(self)
515 raise TrafficClientException('Unsupported generator tool name:' + self.tool)
517 def skip_sleep(self):
518 """Skip all sleeps when doing unit testing with dummy TG.
520 Must be overriden using mock.patch
524 def _get_frame_sizes(self):
525 traffic_profile_name = self.config.traffic.profile
526 matching_profiles = [profile for profile in self.config.traffic_profile if
527 profile.name == traffic_profile_name]
528 if len(matching_profiles) > 1:
529 raise TrafficClientException('Multiple traffic profiles with name: ' +
530 traffic_profile_name)
531 elif not matching_profiles:
532 raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
533 return matching_profiles[0].l2frame_size
535 def start_traffic_generator(self):
536 """Start the traffic generator process (traffic not started yet)."""
538 # pick up the interface speed if it is not set from config
539 intf_speeds = self.gen.get_port_speed_gbps()
540 # convert Gbps unit into bps
541 tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
543 # interface speed is overriden from config
544 if self.intf_speed != tg_if_speed:
545 # Warn the user if the speed in the config is different
546 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
549 # interface speed not provisioned by config
550 self.intf_speed = tg_if_speed
551 # also update the speed in the tg config
552 self.generator_config.intf_speed = tg_if_speed
554 # Save the traffic generator local MAC
555 for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
559 """Set up the traffic client."""
560 self.gen.clear_stats()
562 def get_version(self):
563 """Get the traffic generator version."""
564 return self.gen.get_version()
566 def ensure_end_to_end(self):
567 """Ensure traffic generator receives packets it has transmitted.
569 This ensures end to end connectivity and also waits until VMs are ready to forward packets.
571 VMs that are started and in active state may not pass traffic yet. It is imperative to make
572 sure that all VMs are passing traffic in both directions before starting any benchmarking.
573 To verify this, we need to send at a low frequency bi-directional packets and make sure
574 that we receive all packets back from all VMs. The number of flows is equal to 2 times
575 the number of chains (1 per direction) and we need to make sure we receive packets coming
576 from exactly 2 x chain count different source MAC addresses.
579 PVP chain (1 VM per chain)
580 N = 10 (number of chains)
581 Flow count = 20 (number of flows)
582 If the number of unique source MAC addresses from received packets is 20 then
583 all 10 VMs 10 VMs are in operational state.
585 LOG.info('Starting traffic generator to ensure end-to-end connectivity')
586 # send 2pps on each chain and each direction
587 rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
588 self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
590 # ensures enough traffic is coming back
591 retry_count = (self.config.check_traffic_time_sec +
592 self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
594 # we expect to see packets coming from 2 unique MAC per chain
595 # because there can be flooding in the case of shared net
596 # we must verify that packets from the right VMs are received
597 # and not just count unique src MAC
598 # create a dict of (port, chain) tuples indexed by dest mac
600 for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
601 for chain, mac in enumerate(dest_macs):
602 mac_map[mac] = (port, chain)
603 unique_src_mac_count = len(mac_map)
604 if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
605 get_mac_id = lambda packet: packet['binary'][60:66]
606 elif self.config.vxlan:
607 get_mac_id = lambda packet: packet['binary'][56:62]
609 get_mac_id = lambda packet: packet['binary'][6:12]
610 for it in xrange(retry_count):
611 self.gen.clear_stats()
612 self.gen.start_traffic()
613 self.gen.start_capture()
614 LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
615 unique_src_mac_count - len(mac_map), unique_src_mac_count,
617 if not self.skip_sleep():
618 time.sleep(self.config.generic_poll_sec)
619 self.gen.stop_traffic()
620 self.gen.fetch_capture_packets()
621 self.gen.stop_capture()
622 for packet in self.gen.packet_list:
623 mac_id = get_mac_id(packet)
624 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
625 if src_mac in mac_map and self.is_udp(packet):
626 port, chain = mac_map[src_mac]
627 LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
628 src_mac, chain, port)
629 mac_map.pop(src_mac, None)
632 LOG.info('End-to-end connectivity established')
634 if self.config.l3_router and not self.config.no_arp:
635 # In case of L3 traffic mode, routers are not able to route traffic
636 # until VM interfaces are up and ARP requests are done
637 LOG.info('Waiting for loopback service completely started...')
638 LOG.info('Sending ARP request to assure end-to-end connectivity established')
639 self.ensure_arp_successful()
640 raise TrafficClientException('End-to-end connectivity cannot be ensured')
642 def is_udp(self, packet):
643 pkt = Ether(packet['binary'])
646 def ensure_arp_successful(self):
647 """Resolve all IP using ARP and throw an exception in case of failure."""
648 dest_macs = self.gen.resolve_arp()
650 # all dest macs are discovered, saved them into the generator config
651 if self.config.vxlan:
652 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
653 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
655 self.generator_config.set_dest_macs(0, dest_macs[0])
656 self.generator_config.set_dest_macs(1, dest_macs[1])
658 raise TrafficClientException('ARP cannot be resolved')
660 def set_traffic(self, frame_size, bidirectional):
661 """Reconfigure the traffic generator for a new frame size."""
662 self.run_config['bidirectional'] = bidirectional
663 self.run_config['l2frame_size'] = frame_size
664 self.run_config['rates'] = [self.get_per_direction_rate()]
666 self.run_config['rates'].append(self.get_per_direction_rate())
668 unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
669 if unidir_reverse_pps > 0:
670 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
671 # Fix for [NFVBENCH-67], convert the rate string to PPS
672 for idx, rate in enumerate(self.run_config['rates']):
673 if 'rate_pps' not in rate:
674 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
676 self.gen.clear_streamblock()
677 if not self.config.vxlan:
678 self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
681 self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
684 def _modify_load(self, load):
685 self.current_total_rate = {'rate_percent': str(load)}
686 rate_per_direction = self.get_per_direction_rate()
688 self.gen.modify_rate(rate_per_direction, False)
689 self.run_config['rates'][0] = rate_per_direction
690 if self.run_config['bidirectional']:
691 self.gen.modify_rate(rate_per_direction, True)
692 self.run_config['rates'][1] = rate_per_direction
694 def get_ndr_and_pdr(self):
695 """Start the NDR/PDR iteration and return the results."""
696 dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
698 if self.config.ndr_run:
699 LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
700 targets['ndr'] = self.config.measurement.NDR
701 if self.config.pdr_run:
702 LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
703 targets['pdr'] = self.config.measurement.PDR
705 self.run_config['start_time'] = time.time()
706 self.interval_collector = IntervalCollector(self.run_config['start_time'])
707 self.interval_collector.attach_notifier(self.notifier)
708 self.iteration_collector = IterationCollector(self.run_config['start_time'])
710 self.__range_search(0.0, 200.0, targets, results)
712 results['iteration_stats'] = {
713 'ndr_pdr': self.iteration_collector.get()
716 if self.config.ndr_run:
717 LOG.info('NDR load: %s', results['ndr']['rate_percent'])
718 results['ndr']['time_taken_sec'] = \
719 results['ndr']['timestamp_sec'] - self.run_config['start_time']
720 if self.config.pdr_run:
721 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
722 results['pdr']['time_taken_sec'] = \
723 results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
725 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
726 results['pdr']['time_taken_sec'] = \
727 results['pdr']['timestamp_sec'] - self.run_config['start_time']
730 def __get_dropped_rate(self, result):
731 dropped_pkts = result['rx']['dropped_pkts']
732 total_pkts = result['tx']['total_pkts']
735 return float(dropped_pkts) / total_pkts * 100
738 """Collect final stats for previous run."""
739 stats = self.gen.get_stats()
740 retDict = {'total_tx_rate': stats['total_tx_rate']}
741 for port in self.PORTS:
742 retDict[port] = {'tx': {}, 'rx': {}}
744 tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
745 rx_keys = tx_keys + ['dropped_pkts']
747 for port in self.PORTS:
749 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
752 retDict[port]['rx'][key] = int(stats[port]['rx'][key])
754 retDict[port]['rx'][key] = 0
755 retDict[port]['rx']['avg_delay_usec'] = cast_integer(
756 stats[port]['rx']['avg_delay_usec'])
757 retDict[port]['rx']['min_delay_usec'] = cast_integer(
758 stats[port]['rx']['min_delay_usec'])
759 retDict[port]['rx']['max_delay_usec'] = cast_integer(
760 stats[port]['rx']['max_delay_usec'])
761 retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
763 ports = sorted(retDict.keys())
764 if self.run_config['bidirectional']:
765 retDict['overall'] = {'tx': {}, 'rx': {}}
767 retDict['overall']['tx'][key] = \
768 retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
770 retDict['overall']['rx'][key] = \
771 retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
772 total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
773 retDict[ports[1]]['rx']['total_pkts']]
774 avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
775 retDict[ports[1]]['rx']['avg_delay_usec']]
776 max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
777 retDict[ports[1]]['rx']['max_delay_usec']]
778 min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
779 retDict[ports[1]]['rx']['min_delay_usec']]
780 retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
781 retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
782 retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
783 for key in ['pkt_bit_rate', 'pkt_rate']:
784 for dirc in ['tx', 'rx']:
785 retDict['overall'][dirc][key] /= 2.0
787 retDict['overall'] = retDict[ports[0]]
788 retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
791 def __convert_rates(self, rate):
792 return utils.convert_rates(self.run_config['l2frame_size'],
796 def __ndr_pdr_found(self, tag, load):
797 rates = self.__convert_rates({'rate_percent': load})
798 self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
799 last_stats = self.iteration_collector.peek()
800 self.interval_collector.add_ndr_pdr(tag, last_stats)
802 def __format_output_stats(self, stats):
803 for key in self.PORTS + ['overall']:
804 interface = stats[key]
806 'tx_pkts': interface['tx']['total_pkts'],
807 'rx_pkts': interface['rx']['total_pkts'],
808 'drop_percentage': interface['drop_rate_percent'],
809 'drop_pct': interface['rx']['dropped_pkts'],
810 'avg_delay_usec': interface['rx']['avg_delay_usec'],
811 'max_delay_usec': interface['rx']['max_delay_usec'],
812 'min_delay_usec': interface['rx']['min_delay_usec'],
817 def __targets_found(self, rate, targets, results):
818 for tag, target in targets.iteritems():
819 LOG.info('Found %s (%s) load: %s', tag, target, rate)
820 self.__ndr_pdr_found(tag, rate)
821 results[tag]['timestamp_sec'] = time.time()
823 def __range_search(self, left, right, targets, results):
824 """Perform a binary search for a list of targets inside a [left..right] range or rate.
826 left the left side of the range to search as a % the line rate (100 = 100% line rate)
827 indicating the rate to send on each interface
828 right the right side of the range to search as a % of line rate
829 indicating the rate to send on each interface
830 targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
832 results a dict to store results
836 LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
838 # Terminate search when gap is less than load epsilon
839 if right - left < self.config.measurement.load_epsilon:
840 self.__targets_found(left, targets, results)
843 # Obtain the average drop rate in for middle load
844 middle = (left + right) / 2.0
846 stats, rates = self.__run_search_iteration(middle)
848 LOG.exception("Got exception from traffic generator during binary search")
849 self.__targets_found(left, targets, results)
851 # Split target dicts based on the avg drop rate
854 for tag, target in targets.iteritems():
855 if stats['overall']['drop_rate_percent'] <= target:
856 # record the best possible rate found for this target
858 results[tag].update({
859 'load_percent_per_direction': middle,
860 'stats': self.__format_output_stats(dict(stats)),
861 'timestamp_sec': None
863 right_targets[tag] = target
865 # initialize to 0 all fields of result for
866 # the worst case scenario of the binary search (if ndr/pdr is not found)
867 if tag not in results:
868 results[tag] = dict.fromkeys(rates, 0)
869 empty_stats = self.__format_output_stats(dict(stats))
870 for key in empty_stats:
871 if isinstance(empty_stats[key], dict):
872 empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
875 results[tag].update({
876 'load_percent_per_direction': 0,
877 'stats': empty_stats,
878 'timestamp_sec': None
880 left_targets[tag] = target
883 self.__range_search(left, middle, left_targets, results)
885 # search upper half only if the upper rate does not exceed
886 # 100%, this only happens when the first search at 100%
887 # yields a DR that is < target DR
889 self.__targets_found(100, right_targets, results)
891 self.__range_search(middle, right, right_targets, results)
893 def __run_search_iteration(self, rate):
894 """Run one iteration at the given rate level.
896 rate: the rate to send on each port in percent (0 to 100)
898 self._modify_load(rate)
900 # poll interval stats and collect them
901 for stats in self.run_traffic():
902 self.interval_collector.add(stats)
903 time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
904 if time_elapsed_ratio >= 1:
905 self.cancel_traffic()
906 if not self.skip_sleep():
907 time.sleep(self.config.pause_sec)
908 self.interval_collector.reset()
910 # get stats from the run
911 stats = self.runner.client.get_stats()
912 current_traffic_config = self._get_traffic_config()
913 warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
914 stats['total_tx_rate'])
915 if warning is not None:
916 stats['warning'] = warning
918 # save reliable stats from whole iteration
919 self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
920 LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
921 return stats, current_traffic_config['direction-total']
924 def log_stats(stats):
925 """Log estimated stats during run."""
927 'datetime': str(datetime.now()),
928 'tx_packets': stats['overall']['tx']['total_pkts'],
929 'rx_packets': stats['overall']['rx']['total_pkts'],
930 'drop_packets': stats['overall']['rx']['dropped_pkts'],
931 'drop_rate_percent': stats['overall']['drop_rate_percent']
933 LOG.info('TX: %(tx_packets)d; '
934 'RX: %(rx_packets)d; '
935 'Est. Dropped: %(drop_packets)d; '
936 'Est. Drop rate: %(drop_rate_percent).4f%%',
939 def run_traffic(self):
940 """Start traffic and return intermediate stats for each interval."""
941 stats = self.runner.run()
942 while self.runner.is_running:
943 self.log_stats(stats)
945 stats = self.runner.poll_stats()
948 self.log_stats(stats)
949 LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
952 def cancel_traffic(self):
956 def _get_traffic_config(self):
961 for idx, rate in enumerate(self.run_config['rates']):
962 key = 'direction-forward' if idx == 0 else 'direction-reverse'
964 'l2frame_size': self.run_config['l2frame_size'],
965 'duration_sec': self.run_config['duration_sec']
967 config[key].update(rate)
968 config[key].update(self.__convert_rates(rate))
969 load_total += float(config[key]['rate_percent'])
970 bps_total += float(config[key]['rate_bps'])
971 pps_total += float(config[key]['rate_pps'])
972 config['direction-total'] = dict(config['direction-forward'])
973 config['direction-total'].update({
974 'rate_percent': load_total,
975 'rate_pps': cast_integer(pps_total),
976 'rate_bps': bps_total
981 def get_run_config(self, results):
982 """Return configuration which was used for the last run."""
984 # because we want each direction to have the far end RX rates,
985 # use the far end index (1-idx) to retrieve the RX rates
986 for idx, key in enumerate(["direction-forward", "direction-reverse"]):
987 tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
988 rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
990 "orig": self.__convert_rates(self.run_config['rates'][idx]),
991 "tx": self.__convert_rates({'rate_pps': tx_rate}),
992 "rx": self.__convert_rates({'rate_pps': rx_rate})
996 for direction in ['orig', 'tx', 'rx']:
997 total[direction] = {}
998 for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
999 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
1001 r['direction-total'] = total
1004 def insert_interface_stats(self, pps_list):
1005 """Insert interface stats to a list of packet path stats.
1007 pps_list: a list of packet path stats instances indexed by chain index
1009 This function will insert the packet path stats for the traffic gen ports 0 and 1
1010 with itemized per chain tx/rx counters.
1011 There will be as many packet path stats as chains.
1012 Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1015 PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1016 PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1020 def get_if_stats(chain_idx):
1021 return [InterfaceStats('p' + str(port), self.tool)
1022 for port in range(2)]
1023 # keep the list of list of interface stats indexed by the chain id
1024 self.ifstats = [get_if_stats(chain_idx)
1025 for chain_idx in range(self.config.service_chain_count)]
1026 # note that we need to make a copy of the ifs list so that any modification in the
1027 # list from pps will not change the list saved in self.ifstats
1028 self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1029 # insert the corresponding pps in the passed list
1030 pps_list.extend(self.pps_list)
1032 def update_interface_stats(self, diff=False):
1033 """Update all interface stats.
1035 diff: if False, simply refresh the interface stats values with latest values
1036 if True, diff the interface stats with the latest values
1037 Make sure that the interface stats inserted in insert_interface_stats() are updated
1041 [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1042 [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1047 stats = self.gen.get_stats()
1048 for chain_idx, ifs in enumerate(self.ifstats):
1049 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1050 # corresponding to the
1051 # port 0 and port 1 for the given chain_idx
1052 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1053 # interface stats for the pps because it could have been modified to contain
1054 # additional interface stats
1055 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1056 # special handling for vxlan
1057 # in case of vxlan, flow stats are not available so all rx counters will be
1058 # zeros when the total rx port counter is non zero.
1060 for port in range(2):
1062 for ifs in self.ifstats:
1063 total_rx += ifs[port].rx
1065 # check if the total port rx from Trex is also zero
1066 port_rx = stats[port]['rx']['total_pkts']
1068 # the total rx for all chains from port level stats is non zero
1069 # which means that the per-chain stats are not available
1070 if len(self.ifstats) == 1:
1071 # only one chain, simply report the port level rx to the chain rx stats
1072 self.ifstats[0][port].rx = port_rx
1074 for ifs in self.ifstats:
1075 # mark this data as unavailable
1077 # pitch in the total rx only in the last chain pps
1078 self.ifstats[-1][port].rx_total = port_rx
1081 def compare_tx_rates(required, actual):
1082 """Compare the actual TX rate to the required TX rate."""
1084 are_different = False
1086 if float(actual) / required < threshold:
1087 are_different = True
1088 except ZeroDivisionError:
1089 are_different = True
1092 msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1093 "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1094 "to achieve the requested TX rate.".format(r=required, a=actual)
1100 def get_per_direction_rate(self):
1101 """Get the rate for each direction."""
1102 divisor = 2 if self.run_config['bidirectional'] else 1
1103 if 'rate_percent' in self.current_total_rate:
1104 # don't split rate if it's percentage
1107 return utils.divide_rate(self.current_total_rate, divisor)
1110 """Close this instance."""
1112 self.gen.stop_traffic()
1115 self.gen.clear_stats()