1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
17 from datetime import datetime
22 from attrdict import AttrDict
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: disable=wrong-import-order
30 from scapy.contrib.mpls import MPLS # flake8: noqa
31 # pylint: enable=wrong-import-order
32 # pylint: enable=import-error
35 from .packet_stats import InterfaceStats
36 from .packet_stats import PacketPathStats
37 from .stats_collector import IntervalCollector
38 from .stats_collector import IterationCollector
39 from .traffic_gen import traffic_utils as utils
40 from .utils import cast_integer
42 class TrafficClientException(Exception):
43 """Generic traffic client exception."""
45 class TrafficRunner(object):
46 """Serialize various steps required to run traffic."""
48 def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
49 """Create a traffic runner."""
51 self.start_time = None
52 self.duration_sec = duration_sec
53 self.interval_sec = interval_sec
54 self.service_mode = service_mode
57 """Clear stats and instruct the traffic generator to start generating traffic."""
60 LOG.info('Running traffic generator')
61 self.client.gen.clear_stats()
62 # Debug use only : new '--service-mode' option available for the NFVBench command line.
63 # A read-only mode TRex console would be able to capture the generated traffic.
64 self.client.gen.set_service_mode(enabled=self.service_mode)
65 LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
66 self.client.gen.start_traffic()
67 self.start_time = time.time()
68 return self.poll_stats()
71 """Stop the current run and instruct the traffic generator to stop traffic."""
73 self.start_time = None
74 self.client.gen.stop_traffic()
77 """Check if a run is still pending."""
78 return self.start_time is not None
80 def time_elapsed(self):
81 """Return time elapsed since start of run."""
83 return time.time() - self.start_time
84 return self.duration_sec
87 """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
89 return: latest stats or None if traffic is stopped
91 if not self.is_running():
93 if self.client.skip_sleep():
95 return self.client.get_stats()
96 time_elapsed = self.time_elapsed()
97 if time_elapsed > self.duration_sec:
100 time_left = self.duration_sec - time_elapsed
101 if self.interval_sec > 0.0:
102 if time_left <= self.interval_sec:
103 time.sleep(time_left)
106 time.sleep(self.interval_sec)
108 time.sleep(self.duration_sec)
110 return self.client.get_stats()
113 class IpBlock(object):
114 """Manage a block of IP addresses."""
116 def __init__(self, base_ip, step_ip, count_ip):
117 """Create an IP block."""
118 self.base_ip_int = Device.ip_to_int(base_ip)
119 self.step = Device.ip_to_int(step_ip)
120 self.max_available = count_ip
123 def get_ip(self, index=0):
124 """Return the IP address at given index."""
125 if index < 0 or index >= self.max_available:
126 raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
127 return Device.int_to_ip(self.base_ip_int + index * self.step)
129 def reserve_ip_range(self, count):
130 """Reserve a range of count consecutive IP addresses spaced by step."""
131 if self.next_free + count > self.max_available:
132 raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
136 first_ip = self.get_ip(self.next_free)
137 last_ip = self.get_ip(self.next_free + count - 1)
138 self.next_free += count
139 return (first_ip, last_ip)
141 def reset_reservation(self):
142 """Reset all reservations and restart with a completely unused IP block."""
146 class Device(object):
147 """Represent a port device and all information associated to it.
149 In the curent version we only support 2 port devices for the traffic generator
150 identified as port 0 or port 1.
153 def __init__(self, port, generator_config):
154 """Create a new device for a given port."""
155 self.generator_config = generator_config
156 self.chain_count = generator_config.service_chain_count
157 self.flow_count = generator_config.flow_count / 2
159 self.switch_port = generator_config.interfaces[port].get('switch_port', None)
160 self.vtep_vlan = None
161 self.vtep_src_mac = None
164 self.inner_labels = None
165 self.outer_labels = None
166 self.pci = generator_config.interfaces[port].pci
168 self.dest_macs = None
169 self.vtep_dst_mac = None
170 self.vtep_dst_ip = None
171 if generator_config.vteps is None:
172 self.vtep_src_ip = None
174 self.vtep_src_ip = generator_config.vteps[port]
177 self.ip_addrs = generator_config.ip_addrs[port]
178 subnet = IPNetwork(self.ip_addrs)
179 self.ip = subnet.ip.format()
180 self.ip_addrs_step = generator_config.ip_addrs_step
181 self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
182 self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
183 generator_config.gateway_ip_addrs_step,
185 self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
186 self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
187 generator_config.tg_gateway_ip_addrs_step,
189 self.udp_src_port = generator_config.udp_src_port
190 self.udp_dst_port = generator_config.udp_dst_port
192 def set_mac(self, mac):
193 """Set the local MAC for this port device."""
195 raise TrafficClientException('Trying to set traffic generator MAC address as None')
198 def get_peer_device(self):
199 """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
200 return self.generator_config.devices[1 - self.port]
202 def set_vtep_dst_mac(self, dest_macs):
203 """Set the list of dest MACs indexed by the chain id.
205 This is only called in 2 cases:
206 - VM macs discovered using openstack API
207 - dest MACs provisioned in config file
209 self.vtep_dst_mac = list(map(str, dest_macs))
211 def set_dest_macs(self, dest_macs):
212 """Set the list of dest MACs indexed by the chain id.
214 This is only called in 2 cases:
215 - VM macs discovered using openstack API
216 - dest MACs provisioned in config file
218 self.dest_macs = list(map(str, dest_macs))
220 def get_dest_macs(self):
221 """Get the list of dest macs for this device.
223 If set_dest_macs was never called, assumes l2-loopback and return
224 a list of peer mac (as many as chains but normally only 1 chain)
227 return self.dest_macs
228 # assume this is l2-loopback
229 return [self.get_peer_device().mac] * self.chain_count
231 def set_vlans(self, vlans):
232 """Set the list of vlans to use indexed by the chain id."""
234 LOG.info("Port %d: VLANs %s", self.port, self.vlans)
236 def set_vtep_vlan(self, vlan):
237 """Set the vtep vlan to use indexed by specific port."""
238 self.vtep_vlan = vlan
240 self.vlan_tagging = None
241 LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
243 def set_vxlan_endpoints(self, src_ip, dst_ip):
244 self.vtep_dst_ip = dst_ip
245 self.vtep_src_ip = src_ip
246 LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
247 self.vtep_src_ip, self.vtep_dst_ip)
249 def set_mpls_peers(self, src_ip, dst_ip):
251 self.vtep_dst_ip = dst_ip
252 self.vtep_src_ip = src_ip
253 LOG.info("Port %d: src_mpls_vtep %s, mpls_peer_ip %s", self.port,
254 self.vtep_src_ip, self.vtep_dst_ip)
256 def set_vxlans(self, vnis):
258 LOG.info("Port %d: VNIs %s", self.port, self.vnis)
260 def set_mpls_inner_labels(self, labels):
261 self.inner_labels = labels
262 LOG.info("Port %d: MPLS Inner Labels %s", self.port, self.inner_labels)
264 def set_mpls_outer_labels(self, labels):
265 self.outer_labels = labels
266 LOG.info("Port %d: MPLS Outer Labels %s", self.port, self.outer_labels)
268 def set_gw_ip(self, gateway_ip):
269 self.gw_ip_block = IpBlock(gateway_ip,
270 self.generator_config.gateway_ip_addrs_step,
273 def get_gw_ip(self, chain_index):
274 """Retrieve the IP address assigned for the gateway of a given chain."""
275 return self.gw_ip_block.get_ip(chain_index)
277 def get_stream_configs(self):
278 """Get the stream config for a given chain on this device.
280 Called by the traffic generator driver to program the traffic generator properly
281 before generating traffic
284 # exact flow count for each chain is calculated as follows:
285 # - all chains except the first will have the same flow count
286 # calculated as (total_flows + chain_count - 1) / chain_count
287 # - the first chain will have the remainder
288 # example 11 flows and 3 chains => 3, 4, 4
289 flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count)
290 cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1))
291 peer = self.get_peer_device()
292 self.ip_block.reset_reservation()
293 peer.ip_block.reset_reservation()
294 dest_macs = self.get_dest_macs()
296 for chain_idx in range(self.chain_count):
297 src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
298 dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
301 'count': cur_chain_flow_count,
303 'mac_dst': dest_macs[chain_idx],
304 'ip_src_addr': src_ip_first,
305 'ip_src_addr_max': src_ip_last,
306 'ip_src_count': cur_chain_flow_count,
307 'ip_dst_addr': dst_ip_first,
308 'ip_dst_addr_max': dst_ip_last,
309 'ip_dst_count': cur_chain_flow_count,
310 'ip_addrs_step': self.ip_addrs_step,
311 'udp_src_port': self.udp_src_port,
312 'udp_dst_port': self.udp_dst_port,
313 'mac_discovery_gw': self.get_gw_ip(chain_idx),
314 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
315 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
316 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
318 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
319 'vtep_src_mac': self.mac if (self.vxlan or self.mpls) else None,
320 'vtep_dst_mac': self.vtep_dst_mac if (self.vxlan or self.mpls) else None,
321 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
322 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
323 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None,
325 'mpls_outer_label': self.outer_labels[chain_idx] if self.mpls is True else None,
326 'mpls_inner_label': self.inner_labels[chain_idx] if self.mpls is True else None
329 # after first chain, fall back to the flow count for all other chains
330 cur_chain_flow_count = flows_per_chain
335 """Convert an IP address from string to numeric."""
336 return struct.unpack("!I", socket.inet_aton(addr))[0]
339 def int_to_ip(nvalue):
340 """Convert an IP address from numeric to string."""
341 return socket.inet_ntoa(struct.pack("!I", int(nvalue)))
344 class GeneratorConfig(object):
345 """Represents traffic configuration for currently running traffic profile."""
347 DEFAULT_IP_STEP = '0.0.0.1'
348 DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
350 def __init__(self, config):
351 """Create a generator config."""
353 # name of the generator profile (normally trex or dummy)
354 # pick the default one if not specified explicitly from cli options
355 if not config.generator_profile:
356 config.generator_profile = config.traffic_generator.default_profile
357 # pick up the profile dict based on the name
358 gen_config = self.__match_generator_profile(config.traffic_generator,
359 config.generator_profile)
360 self.gen_config = gen_config
361 # copy over fields from the dict
362 self.tool = gen_config.tool
363 self.ip = gen_config.ip
364 # overrides on config.cores and config.mbuf_factor
366 self.cores = config.cores
368 self.cores = gen_config.get('cores', 1)
369 self.mbuf_factor = config.mbuf_factor
370 self.mbuf_64 = config.mbuf_64
371 self.hdrh = not config.disable_hdrh
372 if gen_config.intf_speed:
373 # interface speed is overriden from config
374 self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
376 # interface speed is discovered/provided by the traffic generator
378 self.name = gen_config.name
379 self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
380 self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
381 self.limit_memory = gen_config.get('limit_memory', 1024)
382 self.software_mode = gen_config.get('software_mode', False)
383 self.interfaces = gen_config.interfaces
384 if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
385 raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
386 self.service_chain = config.service_chain
387 self.service_chain_count = config.service_chain_count
388 self.flow_count = config.flow_count
389 self.host_name = gen_config.host_name
391 self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
392 self.ip_addrs = gen_config.ip_addrs
393 self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
394 self.tg_gateway_ip_addrs_step = \
395 gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
396 self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
397 self.gateway_ips = gen_config.gateway_ip_addrs
398 self.udp_src_port = gen_config.udp_src_port
399 self.udp_dst_port = gen_config.udp_dst_port
400 self.vteps = gen_config.get('vteps')
401 self.devices = [Device(port, self) for port in [0, 1]]
402 # This should normally always be [0, 1]
403 self.ports = [device.port for device in self.devices]
405 # check that pci is not empty
406 if not gen_config.interfaces[0].get('pci', None) or \
407 not gen_config.interfaces[1].get('pci', None):
408 raise TrafficClientException("configuration interfaces pci fields cannot be empty")
410 self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
411 self.vlan_tagging = config.vlan_tagging
413 # needed for result/summarizer
414 config['tg-name'] = gen_config.name
415 config['tg-tool'] = self.tool
418 """Get json form to display the content into the overall result dict."""
419 return dict(self.gen_config)
421 def set_dest_macs(self, port_index, dest_macs):
422 """Set the list of dest MACs indexed by the chain id on given port.
424 port_index: the port for which dest macs must be set
425 dest_macs: a list of dest MACs indexed by chain id
427 if len(dest_macs) < self.config.service_chain_count:
428 raise TrafficClientException('Dest MAC list %s must have %d entries' %
429 (dest_macs, self.config.service_chain_count))
430 # only pass the first scc dest MACs
431 self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
432 LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
434 def set_vtep_dest_macs(self, port_index, dest_macs):
435 """Set the list of dest MACs indexed by the chain id on given port.
437 port_index: the port for which dest macs must be set
438 dest_macs: a list of dest MACs indexed by chain id
440 if len(dest_macs) != self.config.service_chain_count:
441 raise TrafficClientException('Dest MAC list %s must have %d entries' %
442 (dest_macs, self.config.service_chain_count))
443 self.devices[port_index].set_vtep_dst_mac(dest_macs)
444 LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs})
446 def get_dest_macs(self):
447 """Return the list of dest macs indexed by port."""
448 return [dev.get_dest_macs() for dev in self.devices]
450 def set_vlans(self, port_index, vlans):
451 """Set the list of vlans to use indexed by the chain id on given port.
453 port_index: the port for which VLANs must be set
454 vlans: a list of vlan lists indexed by chain id
456 if len(vlans) != self.config.service_chain_count:
457 raise TrafficClientException('VLAN list %s must have %d entries' %
458 (vlans, self.config.service_chain_count))
459 self.devices[port_index].set_vlans(vlans)
461 def set_vxlans(self, port_index, vxlans):
462 """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
464 port_index: the port for which VXLANs must be set
465 VXLANs: a list of VNIs lists indexed by chain id
467 if len(vxlans) != self.config.service_chain_count:
468 raise TrafficClientException('VXLAN list %s must have %d entries' %
469 (vxlans, self.config.service_chain_count))
470 self.devices[port_index].set_vxlans(vxlans)
472 def set_mpls_inner_labels(self, port_index, labels):
473 """Set the list of MPLS Labels to use indexed by the chain id on given port.
475 port_index: the port for which Labels must be set
476 Labels: a list of Labels lists indexed by chain id
478 if len(labels) != self.config.service_chain_count:
479 raise TrafficClientException('Inner MPLS list %s must have %d entries' %
480 (labels, self.config.service_chain_count))
481 self.devices[port_index].set_mpls_inner_labels(labels)
483 def set_mpls_outer_labels(self, port_index, labels):
484 """Set the list of MPLS Labels to use indexed by the chain id on given port.
486 port_index: the port for which Labels must be set
487 Labels: a list of Labels lists indexed by chain id
489 if len(labels) != self.config.service_chain_count:
490 raise TrafficClientException('Outer MPLS list %s must have %d entries' %
491 (labels, self.config.service_chain_count))
492 self.devices[port_index].set_mpls_outer_labels(labels)
494 def set_vtep_vlan(self, port_index, vlan):
495 """Set the vtep vlan to use indexed by the chain id on given port.
496 port_index: the port for which VLAN must be set
498 self.devices[port_index].set_vtep_vlan(vlan)
500 def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
501 self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
503 def set_mpls_peers(self, port_index, src_ip, dst_ip):
504 self.devices[port_index].set_mpls_peers(src_ip, dst_ip)
507 def __match_generator_profile(traffic_generator, generator_profile):
508 gen_config = AttrDict(traffic_generator)
509 gen_config.pop('default_profile')
510 gen_config.pop('generator_profile')
511 matching_profile = [profile for profile in traffic_generator.generator_profile if
512 profile.name == generator_profile]
513 if len(matching_profile) != 1:
514 raise Exception('Traffic generator profile not found: ' + generator_profile)
516 gen_config.update(matching_profile[0])
520 class TrafficClient(object):
521 """Traffic generator client with NDR/PDR binary seearch."""
525 def __init__(self, config, notifier=None):
526 """Create a new TrafficClient instance.
528 config: nfvbench config
529 notifier: notifier (optional)
531 A new instance is created everytime the nfvbench config may have changed.
534 self.generator_config = GeneratorConfig(config)
535 self.tool = self.generator_config.tool
536 self.gen = self._get_generator()
537 self.notifier = notifier
538 self.interval_collector = None
539 self.iteration_collector = None
540 self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
541 self.config.service_mode)
542 self.config.frame_sizes = self._get_frame_sizes()
544 'l2frame_size': None,
545 'duration_sec': self.config.duration_sec,
546 'bidirectional': True,
547 'rates': [] # to avoid unsbuscriptable-obj warning
549 self.current_total_rate = {'rate_percent': '10'}
550 if self.config.single_run:
551 self.current_total_rate = utils.parse_rate_str(self.config.rate)
553 # Speed is either discovered when connecting to TG or set from config
554 # This variable is 0 if not yet discovered from TG or must be the speed of
555 # each interface in bits per second
556 self.intf_speed = self.generator_config.intf_speed
558 def _get_generator(self):
559 tool = self.tool.lower()
561 from .traffic_gen import trex_gen
562 return trex_gen.TRex(self)
564 from .traffic_gen import dummy
565 return dummy.DummyTG(self)
566 raise TrafficClientException('Unsupported generator tool name:' + self.tool)
568 def skip_sleep(self):
569 """Skip all sleeps when doing unit testing with dummy TG.
571 Must be overriden using mock.patch
575 def _get_frame_sizes(self):
576 traffic_profile_name = self.config.traffic.profile
577 matching_profiles = [profile for profile in self.config.traffic_profile if
578 profile.name == traffic_profile_name]
579 if len(matching_profiles) > 1:
580 raise TrafficClientException('Multiple traffic profiles with name: ' +
581 traffic_profile_name)
582 if not matching_profiles:
583 raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
584 return matching_profiles[0].l2frame_size
586 def start_traffic_generator(self):
587 """Start the traffic generator process (traffic not started yet)."""
589 # pick up the interface speed if it is not set from config
590 intf_speeds = self.gen.get_port_speed_gbps()
591 # convert Gbps unit into bps
592 tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
594 # interface speed is overriden from config
595 if self.intf_speed != tg_if_speed:
596 # Warn the user if the speed in the config is different
597 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
600 # interface speed not provisioned by config
601 self.intf_speed = tg_if_speed
602 # also update the speed in the tg config
603 self.generator_config.intf_speed = tg_if_speed
605 # Save the traffic generator local MAC
606 for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
610 """Set up the traffic client."""
611 self.gen.clear_stats()
613 def get_version(self):
614 """Get the traffic generator version."""
615 return self.gen.get_version()
617 def ensure_end_to_end(self):
618 """Ensure traffic generator receives packets it has transmitted.
620 This ensures end to end connectivity and also waits until VMs are ready to forward packets.
622 VMs that are started and in active state may not pass traffic yet. It is imperative to make
623 sure that all VMs are passing traffic in both directions before starting any benchmarking.
624 To verify this, we need to send at a low frequency bi-directional packets and make sure
625 that we receive all packets back from all VMs. The number of flows is equal to 2 times
626 the number of chains (1 per direction) and we need to make sure we receive packets coming
627 from exactly 2 x chain count different source MAC addresses.
630 PVP chain (1 VM per chain)
631 N = 10 (number of chains)
632 Flow count = 20 (number of flows)
633 If the number of unique source MAC addresses from received packets is 20 then
634 all 10 VMs 10 VMs are in operational state.
636 LOG.info('Starting traffic generator to ensure end-to-end connectivity')
637 # send 2pps on each chain and each direction
638 rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
639 self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
641 # ensures enough traffic is coming back
642 retry_count = int((self.config.check_traffic_time_sec +
643 self.config.generic_poll_sec - 1) / self.config.generic_poll_sec)
645 # we expect to see packets coming from 2 unique MAC per chain
646 # because there can be flooding in the case of shared net
647 # we must verify that packets from the right VMs are received
648 # and not just count unique src MAC
649 # create a dict of (port, chain) tuples indexed by dest mac
651 for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
652 for chain, mac in enumerate(dest_macs):
653 mac_map[mac] = (port, chain)
654 unique_src_mac_count = len(mac_map)
655 if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
656 get_mac_id = lambda packet: packet['binary'][60:66]
657 elif self.config.vxlan:
658 get_mac_id = lambda packet: packet['binary'][56:62]
659 elif self.config.mpls:
660 get_mac_id = lambda packet: packet['binary'][24:30]
661 # mpls_transport_label = lambda packet: packet['binary'][14:18]
663 get_mac_id = lambda packet: packet['binary'][6:12]
664 for it in range(retry_count):
665 self.gen.clear_stats()
666 self.gen.start_traffic()
667 self.gen.start_capture()
668 LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
669 unique_src_mac_count - len(mac_map), unique_src_mac_count,
671 if not self.skip_sleep():
672 time.sleep(self.config.generic_poll_sec)
673 self.gen.stop_traffic()
674 self.gen.fetch_capture_packets()
675 self.gen.stop_capture()
676 for packet in self.gen.packet_list:
677 mac_id = get_mac_id(packet).decode('latin-1')
678 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
680 if src_mac in mac_map and self.is_mpls(packet):
681 port, chain = mac_map[src_mac]
682 LOG.info('Received mpls packet from mac: %s (chain=%d, port=%d)',
683 src_mac, chain, port)
684 mac_map.pop(src_mac, None)
686 if src_mac in mac_map and self.is_udp(packet):
687 port, chain = mac_map[src_mac]
688 LOG.info('Received udp packet from mac: %s (chain=%d, port=%d)',
689 src_mac, chain, port)
690 mac_map.pop(src_mac, None)
693 LOG.info('End-to-end connectivity established')
695 if self.config.l3_router and not self.config.no_arp:
696 # In case of L3 traffic mode, routers are not able to route traffic
697 # until VM interfaces are up and ARP requests are done
698 LOG.info('Waiting for loopback service completely started...')
699 LOG.info('Sending ARP request to assure end-to-end connectivity established')
700 self.ensure_arp_successful()
701 raise TrafficClientException('End-to-end connectivity cannot be ensured')
703 def is_udp(self, packet):
704 pkt = Ether(packet['binary'])
707 def is_mpls(self, packet):
708 pkt = Ether(packet['binary'])
711 def ensure_arp_successful(self):
712 """Resolve all IP using ARP and throw an exception in case of failure."""
713 dest_macs = self.gen.resolve_arp()
715 # all dest macs are discovered, saved them into the generator config
716 if self.config.vxlan or self.config.mpls:
717 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
718 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
720 self.generator_config.set_dest_macs(0, dest_macs[0])
721 self.generator_config.set_dest_macs(1, dest_macs[1])
723 raise TrafficClientException('ARP cannot be resolved')
725 def set_traffic(self, frame_size, bidirectional):
726 """Reconfigure the traffic generator for a new frame size."""
727 self.run_config['bidirectional'] = bidirectional
728 self.run_config['l2frame_size'] = frame_size
729 self.run_config['rates'] = [self.get_per_direction_rate()]
731 self.run_config['rates'].append(self.get_per_direction_rate())
733 unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
734 if unidir_reverse_pps > 0:
735 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
736 # Fix for [NFVBENCH-67], convert the rate string to PPS
737 for idx, rate in enumerate(self.run_config['rates']):
738 if 'rate_pps' not in rate:
739 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
741 self.gen.clear_streamblock()
743 if self.config.no_latency_streams:
744 LOG.info("Latency streams are disabled")
745 self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
746 latency=not self.config.no_latency_streams)
748 def _modify_load(self, load):
749 self.current_total_rate = {'rate_percent': str(load)}
750 rate_per_direction = self.get_per_direction_rate()
752 self.gen.modify_rate(rate_per_direction, False)
753 self.run_config['rates'][0] = rate_per_direction
754 if self.run_config['bidirectional']:
755 self.gen.modify_rate(rate_per_direction, True)
756 self.run_config['rates'][1] = rate_per_direction
758 def get_ndr_and_pdr(self):
759 """Start the NDR/PDR iteration and return the results."""
760 dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
762 if self.config.ndr_run:
763 LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
764 targets['ndr'] = self.config.measurement.NDR
765 if self.config.pdr_run:
766 LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
767 targets['pdr'] = self.config.measurement.PDR
769 self.run_config['start_time'] = time.time()
770 self.interval_collector = IntervalCollector(self.run_config['start_time'])
771 self.interval_collector.attach_notifier(self.notifier)
772 self.iteration_collector = IterationCollector(self.run_config['start_time'])
774 self.__range_search(0.0, 200.0, targets, results)
776 results['iteration_stats'] = {
777 'ndr_pdr': self.iteration_collector.get()
780 if self.config.ndr_run:
781 LOG.info('NDR load: %s', results['ndr']['rate_percent'])
782 results['ndr']['time_taken_sec'] = \
783 results['ndr']['timestamp_sec'] - self.run_config['start_time']
784 if self.config.pdr_run:
785 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
786 results['pdr']['time_taken_sec'] = \
787 results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
789 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
790 results['pdr']['time_taken_sec'] = \
791 results['pdr']['timestamp_sec'] - self.run_config['start_time']
794 def __get_dropped_rate(self, result):
795 dropped_pkts = result['rx']['dropped_pkts']
796 total_pkts = result['tx']['total_pkts']
799 return float(dropped_pkts) / total_pkts * 100
802 """Collect final stats for previous run."""
803 stats = self.gen.get_stats()
804 retDict = {'total_tx_rate': stats['total_tx_rate']}
806 tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
807 rx_keys = tx_keys + ['dropped_pkts']
809 for port in self.PORTS:
810 port_stats = {'tx': {}, 'rx': {}}
812 port_stats['tx'][key] = int(stats[port]['tx'][key])
815 port_stats['rx'][key] = int(stats[port]['rx'][key])
817 port_stats['rx'][key] = 0
818 port_stats['rx']['avg_delay_usec'] = cast_integer(
819 stats[port]['rx']['avg_delay_usec'])
820 port_stats['rx']['min_delay_usec'] = cast_integer(
821 stats[port]['rx']['min_delay_usec'])
822 port_stats['rx']['max_delay_usec'] = cast_integer(
823 stats[port]['rx']['max_delay_usec'])
824 port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats)
825 retDict[str(port)] = port_stats
827 ports = sorted(list(retDict.keys()), key=str)
828 if self.run_config['bidirectional']:
829 retDict['overall'] = {'tx': {}, 'rx': {}}
831 retDict['overall']['tx'][key] = \
832 retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
834 retDict['overall']['rx'][key] = \
835 retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
836 total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
837 retDict[ports[1]]['rx']['total_pkts']]
838 avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
839 retDict[ports[1]]['rx']['avg_delay_usec']]
840 max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
841 retDict[ports[1]]['rx']['max_delay_usec']]
842 min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
843 retDict[ports[1]]['rx']['min_delay_usec']]
844 retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
845 retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
846 retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
847 for key in ['pkt_bit_rate', 'pkt_rate']:
848 for dirc in ['tx', 'rx']:
849 retDict['overall'][dirc][key] /= 2.0
851 retDict['overall'] = retDict[ports[0]]
852 retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
855 def __convert_rates(self, rate):
856 return utils.convert_rates(self.run_config['l2frame_size'],
860 def __ndr_pdr_found(self, tag, load):
861 rates = self.__convert_rates({'rate_percent': load})
862 self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
863 last_stats = self.iteration_collector.peek()
864 self.interval_collector.add_ndr_pdr(tag, last_stats)
866 def __format_output_stats(self, stats):
867 for key in self.PORTS + ['overall']:
869 interface = stats[key]
871 'tx_pkts': interface['tx']['total_pkts'],
872 'rx_pkts': interface['rx']['total_pkts'],
873 'drop_percentage': interface['drop_rate_percent'],
874 'drop_pct': interface['rx']['dropped_pkts'],
875 'avg_delay_usec': interface['rx']['avg_delay_usec'],
876 'max_delay_usec': interface['rx']['max_delay_usec'],
877 'min_delay_usec': interface['rx']['min_delay_usec'],
882 def __targets_found(self, rate, targets, results):
883 for tag, target in list(targets.items()):
884 LOG.info('Found %s (%s) load: %s', tag, target, rate)
885 self.__ndr_pdr_found(tag, rate)
886 results[tag]['timestamp_sec'] = time.time()
888 def __range_search(self, left, right, targets, results):
889 """Perform a binary search for a list of targets inside a [left..right] range or rate.
891 left the left side of the range to search as a % the line rate (100 = 100% line rate)
892 indicating the rate to send on each interface
893 right the right side of the range to search as a % of line rate
894 indicating the rate to send on each interface
895 targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
897 results a dict to store results
901 LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
903 # Terminate search when gap is less than load epsilon
904 if right - left < self.config.measurement.load_epsilon:
905 self.__targets_found(left, targets, results)
908 # Obtain the average drop rate in for middle load
909 middle = (left + right) / 2.0
911 stats, rates = self.__run_search_iteration(middle)
913 LOG.exception("Got exception from traffic generator during binary search")
914 self.__targets_found(left, targets, results)
916 # Split target dicts based on the avg drop rate
919 for tag, target in list(targets.items()):
920 if stats['overall']['drop_rate_percent'] <= target:
921 # record the best possible rate found for this target
923 results[tag].update({
924 'load_percent_per_direction': middle,
925 'stats': self.__format_output_stats(dict(stats)),
926 'timestamp_sec': None
928 right_targets[tag] = target
930 # initialize to 0 all fields of result for
931 # the worst case scenario of the binary search (if ndr/pdr is not found)
932 if tag not in results:
933 results[tag] = dict.fromkeys(rates, 0)
934 empty_stats = self.__format_output_stats(dict(stats))
935 for key in empty_stats:
936 if isinstance(empty_stats[key], dict):
937 empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
940 results[tag].update({
941 'load_percent_per_direction': 0,
942 'stats': empty_stats,
943 'timestamp_sec': None
945 left_targets[tag] = target
948 self.__range_search(left, middle, left_targets, results)
950 # search upper half only if the upper rate does not exceed
951 # 100%, this only happens when the first search at 100%
952 # yields a DR that is < target DR
954 self.__targets_found(100, right_targets, results)
956 self.__range_search(middle, right, right_targets, results)
958 def __run_search_iteration(self, rate):
959 """Run one iteration at the given rate level.
961 rate: the rate to send on each port in percent (0 to 100)
963 self._modify_load(rate)
965 # poll interval stats and collect them
966 for stats in self.run_traffic():
967 self.interval_collector.add(stats)
968 time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
969 if time_elapsed_ratio >= 1:
970 self.cancel_traffic()
971 if not self.skip_sleep():
972 time.sleep(self.config.pause_sec)
973 self.interval_collector.reset()
975 # get stats from the run
976 stats = self.runner.client.get_stats()
977 current_traffic_config = self._get_traffic_config()
978 warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
979 stats['total_tx_rate'])
980 if warning is not None:
981 stats['warning'] = warning
983 # save reliable stats from whole iteration
984 self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
985 LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
986 return stats, current_traffic_config['direction-total']
989 def log_stats(stats):
990 """Log estimated stats during run."""
992 'datetime': str(datetime.now()),
993 'tx_packets': stats['overall']['tx']['total_pkts'],
994 'rx_packets': stats['overall']['rx']['total_pkts'],
995 'drop_packets': stats['overall']['rx']['dropped_pkts'],
996 'drop_rate_percent': stats['overall']['drop_rate_percent']
998 LOG.info('TX: %(tx_packets)d; '
999 'RX: %(rx_packets)d; '
1000 'Est. Dropped: %(drop_packets)d; '
1001 'Est. Drop rate: %(drop_rate_percent).4f%%',
1004 def run_traffic(self):
1005 """Start traffic and return intermediate stats for each interval."""
1006 stats = self.runner.run()
1007 while self.runner.is_running:
1008 self.log_stats(stats)
1010 stats = self.runner.poll_stats()
1013 self.log_stats(stats)
1014 LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
1017 def cancel_traffic(self):
1021 def _get_traffic_config(self):
1026 for idx, rate in enumerate(self.run_config['rates']):
1027 key = 'direction-forward' if idx == 0 else 'direction-reverse'
1029 'l2frame_size': self.run_config['l2frame_size'],
1030 'duration_sec': self.run_config['duration_sec']
1032 config[key].update(rate)
1033 config[key].update(self.__convert_rates(rate))
1034 load_total += float(config[key]['rate_percent'])
1035 bps_total += float(config[key]['rate_bps'])
1036 pps_total += float(config[key]['rate_pps'])
1037 config['direction-total'] = dict(config['direction-forward'])
1038 config['direction-total'].update({
1039 'rate_percent': load_total,
1040 'rate_pps': cast_integer(pps_total),
1041 'rate_bps': bps_total
1046 def get_run_config(self, results):
1047 """Return configuration which was used for the last run."""
1049 # because we want each direction to have the far end RX rates,
1050 # use the far end index (1-idx) to retrieve the RX rates
1051 for idx, key in enumerate(["direction-forward", "direction-reverse"]):
1052 tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec
1053 rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec
1055 "orig": self.__convert_rates(self.run_config['rates'][idx]),
1056 "tx": self.__convert_rates({'rate_pps': tx_rate}),
1057 "rx": self.__convert_rates({'rate_pps': rx_rate})
1061 for direction in ['orig', 'tx', 'rx']:
1062 total[direction] = {}
1063 for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
1064 total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())])
1066 r['direction-total'] = total
1069 def insert_interface_stats(self, pps_list):
1070 """Insert interface stats to a list of packet path stats.
1072 pps_list: a list of packet path stats instances indexed by chain index
1074 This function will insert the packet path stats for the traffic gen ports 0 and 1
1075 with itemized per chain tx/rx counters.
1076 There will be as many packet path stats as chains.
1077 Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1080 PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1081 PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1085 def get_if_stats(chain_idx):
1086 return [InterfaceStats('p' + str(port), self.tool)
1087 for port in range(2)]
1088 # keep the list of list of interface stats indexed by the chain id
1089 self.ifstats = [get_if_stats(chain_idx)
1090 for chain_idx in range(self.config.service_chain_count)]
1091 # note that we need to make a copy of the ifs list so that any modification in the
1092 # list from pps will not change the list saved in self.ifstats
1093 self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1094 # insert the corresponding pps in the passed list
1095 pps_list.extend(self.pps_list)
1097 def update_interface_stats(self, diff=False):
1098 """Update all interface stats.
1100 diff: if False, simply refresh the interface stats values with latest values
1101 if True, diff the interface stats with the latest values
1102 Make sure that the interface stats inserted in insert_interface_stats() are updated
1106 [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1107 [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1112 stats = self.gen.get_stats()
1113 for chain_idx, ifs in enumerate(self.ifstats):
1114 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1115 # corresponding to the
1116 # port 0 and port 1 for the given chain_idx
1117 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1118 # interface stats for the pps because it could have been modified to contain
1119 # additional interface stats
1120 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1121 # special handling for vxlan
1122 # in case of vxlan, flow stats are not available so all rx counters will be
1123 # zeros when the total rx port counter is non zero.
1125 for port in range(2):
1127 for ifs in self.ifstats:
1128 total_rx += ifs[port].rx
1130 # check if the total port rx from Trex is also zero
1131 port_rx = stats[port]['rx']['total_pkts']
1133 # the total rx for all chains from port level stats is non zero
1134 # which means that the per-chain stats are not available
1135 if len(self.ifstats) == 1:
1136 # only one chain, simply report the port level rx to the chain rx stats
1137 self.ifstats[0][port].rx = port_rx
1139 for ifs in self.ifstats:
1140 # mark this data as unavailable
1142 # pitch in the total rx only in the last chain pps
1143 self.ifstats[-1][port].rx_total = port_rx
1146 def compare_tx_rates(required, actual):
1147 """Compare the actual TX rate to the required TX rate."""
1149 are_different = False
1151 if float(actual) / required < threshold:
1152 are_different = True
1153 except ZeroDivisionError:
1154 are_different = True
1157 msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1158 "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1159 "to achieve the requested TX rate.".format(r=required, a=actual)
1165 def get_per_direction_rate(self):
1166 """Get the rate for each direction."""
1167 divisor = 2 if self.run_config['bidirectional'] else 1
1168 if 'rate_percent' in self.current_total_rate:
1169 # don't split rate if it's percentage
1172 return utils.divide_rate(self.current_total_rate, divisor)
1175 """Close this instance."""
1177 self.gen.stop_traffic()
1180 self.gen.clear_stats()