MPLS support + loop_vm_arp test fix
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: disable=wrong-import-order
30 from scapy.contrib.mpls import MPLS  # flake8: noqa
31 # pylint: enable=wrong-import-order
32 # pylint: enable=import-error
33
34 from .log import LOG
35 from .packet_stats import InterfaceStats
36 from .packet_stats import PacketPathStats
37 from .stats_collector import IntervalCollector
38 from .stats_collector import IterationCollector
39 from .traffic_gen import traffic_utils as utils
40 from .utils import cast_integer
41
42 class TrafficClientException(Exception):
43     """Generic traffic client exception."""
44
45 class TrafficRunner(object):
46     """Serialize various steps required to run traffic."""
47
48     def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
49         """Create a traffic runner."""
50         self.client = client
51         self.start_time = None
52         self.duration_sec = duration_sec
53         self.interval_sec = interval_sec
54         self.service_mode = service_mode
55
56     def run(self):
57         """Clear stats and instruct the traffic generator to start generating traffic."""
58         if self.is_running():
59             return None
60         LOG.info('Running traffic generator')
61         self.client.gen.clear_stats()
62         # Debug use only : new '--service-mode' option available for the NFVBench command line.
63         # A read-only mode TRex console would be able to capture the generated traffic.
64         self.client.gen.set_service_mode(enabled=self.service_mode)
65         LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
66         self.client.gen.start_traffic()
67         self.start_time = time.time()
68         return self.poll_stats()
69
70     def stop(self):
71         """Stop the current run and instruct the traffic generator to stop traffic."""
72         if self.is_running():
73             self.start_time = None
74             self.client.gen.stop_traffic()
75
76     def is_running(self):
77         """Check if a run is still pending."""
78         return self.start_time is not None
79
80     def time_elapsed(self):
81         """Return time elapsed since start of run."""
82         if self.is_running():
83             return time.time() - self.start_time
84         return self.duration_sec
85
86     def poll_stats(self):
87         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
88
89         return: latest stats or None if traffic is stopped
90         """
91         if not self.is_running():
92             return None
93         if self.client.skip_sleep():
94             self.stop()
95             return self.client.get_stats()
96         time_elapsed = self.time_elapsed()
97         if time_elapsed > self.duration_sec:
98             self.stop()
99             return None
100         time_left = self.duration_sec - time_elapsed
101         if self.interval_sec > 0.0:
102             if time_left <= self.interval_sec:
103                 time.sleep(time_left)
104                 self.stop()
105             else:
106                 time.sleep(self.interval_sec)
107         else:
108             time.sleep(self.duration_sec)
109             self.stop()
110         return self.client.get_stats()
111
112
113 class IpBlock(object):
114     """Manage a block of IP addresses."""
115
116     def __init__(self, base_ip, step_ip, count_ip):
117         """Create an IP block."""
118         self.base_ip_int = Device.ip_to_int(base_ip)
119         self.step = Device.ip_to_int(step_ip)
120         self.max_available = count_ip
121         self.next_free = 0
122
123     def get_ip(self, index=0):
124         """Return the IP address at given index."""
125         if index < 0 or index >= self.max_available:
126             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
127         return Device.int_to_ip(self.base_ip_int + index * self.step)
128
129     def reserve_ip_range(self, count):
130         """Reserve a range of count consecutive IP addresses spaced by step."""
131         if self.next_free + count > self.max_available:
132             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
133                              (self.next_free,
134                               self.max_available,
135                               count))
136         first_ip = self.get_ip(self.next_free)
137         last_ip = self.get_ip(self.next_free + count - 1)
138         self.next_free += count
139         return (first_ip, last_ip)
140
141     def reset_reservation(self):
142         """Reset all reservations and restart with a completely unused IP block."""
143         self.next_free = 0
144
145
146 class Device(object):
147     """Represent a port device and all information associated to it.
148
149     In the curent version we only support 2 port devices for the traffic generator
150     identified as port 0 or port 1.
151     """
152
153     def __init__(self, port, generator_config):
154         """Create a new device for a given port."""
155         self.generator_config = generator_config
156         self.chain_count = generator_config.service_chain_count
157         self.flow_count = generator_config.flow_count / 2
158         self.port = port
159         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
160         self.vtep_vlan = None
161         self.vtep_src_mac = None
162         self.vxlan = False
163         self.mpls = False
164         self.inner_labels = None
165         self.outer_labels = None
166         self.pci = generator_config.interfaces[port].pci
167         self.mac = None
168         self.dest_macs = None
169         self.vtep_dst_mac = None
170         self.vtep_dst_ip = None
171         if generator_config.vteps is None:
172             self.vtep_src_ip = None
173         else:
174             self.vtep_src_ip = generator_config.vteps[port]
175         self.vnis = None
176         self.vlans = None
177         self.ip_addrs = generator_config.ip_addrs[port]
178         subnet = IPNetwork(self.ip_addrs)
179         self.ip = subnet.ip.format()
180         self.ip_addrs_step = generator_config.ip_addrs_step
181         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
182         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
183                                    generator_config.gateway_ip_addrs_step,
184                                    self.chain_count)
185         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
186         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
187                                       generator_config.tg_gateway_ip_addrs_step,
188                                       self.chain_count)
189         self.udp_src_port = generator_config.udp_src_port
190         self.udp_dst_port = generator_config.udp_dst_port
191
192     def set_mac(self, mac):
193         """Set the local MAC for this port device."""
194         if mac is None:
195             raise TrafficClientException('Trying to set traffic generator MAC address as None')
196         self.mac = mac
197
198     def get_peer_device(self):
199         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
200         return self.generator_config.devices[1 - self.port]
201
202     def set_vtep_dst_mac(self, dest_macs):
203         """Set the list of dest MACs indexed by the chain id.
204
205         This is only called in 2 cases:
206         - VM macs discovered using openstack API
207         - dest MACs provisioned in config file
208         """
209         self.vtep_dst_mac = list(map(str, dest_macs))
210
211     def set_dest_macs(self, dest_macs):
212         """Set the list of dest MACs indexed by the chain id.
213
214         This is only called in 2 cases:
215         - VM macs discovered using openstack API
216         - dest MACs provisioned in config file
217         """
218         self.dest_macs = list(map(str, dest_macs))
219
220     def get_dest_macs(self):
221         """Get the list of dest macs for this device.
222
223         If set_dest_macs was never called, assumes l2-loopback and return
224         a list of peer mac (as many as chains but normally only 1 chain)
225         """
226         if self.dest_macs:
227             return self.dest_macs
228         # assume this is l2-loopback
229         return [self.get_peer_device().mac] * self.chain_count
230
231     def set_vlans(self, vlans):
232         """Set the list of vlans to use indexed by the chain id."""
233         self.vlans = vlans
234         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
235
236     def set_vtep_vlan(self, vlan):
237         """Set the vtep vlan to use indexed by specific port."""
238         self.vtep_vlan = vlan
239         self.vxlan = True
240         self.vlan_tagging = None
241         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
242
243     def set_vxlan_endpoints(self, src_ip, dst_ip):
244         self.vtep_dst_ip = dst_ip
245         self.vtep_src_ip = src_ip
246         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
247                  self.vtep_src_ip, self.vtep_dst_ip)
248
249     def set_mpls_peers(self, src_ip, dst_ip):
250         self.mpls = True
251         self.vtep_dst_ip = dst_ip
252         self.vtep_src_ip = src_ip
253         LOG.info("Port %d: src_mpls_vtep %s, mpls_peer_ip %s", self.port,
254                  self.vtep_src_ip, self.vtep_dst_ip)
255
256     def set_vxlans(self, vnis):
257         self.vnis = vnis
258         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
259
260     def set_mpls_inner_labels(self, labels):
261         self.inner_labels = labels
262         LOG.info("Port %d: MPLS Inner Labels %s", self.port, self.inner_labels)
263
264     def set_mpls_outer_labels(self, labels):
265         self.outer_labels = labels
266         LOG.info("Port %d: MPLS Outer Labels %s", self.port, self.outer_labels)
267
268     def set_gw_ip(self, gateway_ip):
269         self.gw_ip_block = IpBlock(gateway_ip,
270                                    self.generator_config.gateway_ip_addrs_step,
271                                    self.chain_count)
272
273     def get_gw_ip(self, chain_index):
274         """Retrieve the IP address assigned for the gateway of a given chain."""
275         return self.gw_ip_block.get_ip(chain_index)
276
277     def get_stream_configs(self):
278         """Get the stream config for a given chain on this device.
279
280         Called by the traffic generator driver to program the traffic generator properly
281         before generating traffic
282         """
283         configs = []
284         # exact flow count for each chain is calculated as follows:
285         # - all chains except the first will have the same flow count
286         #   calculated as (total_flows + chain_count - 1) / chain_count
287         # - the first chain will have the remainder
288         # example 11 flows and 3 chains => 3, 4, 4
289         flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count)
290         cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1))
291         peer = self.get_peer_device()
292         self.ip_block.reset_reservation()
293         peer.ip_block.reset_reservation()
294         dest_macs = self.get_dest_macs()
295
296         for chain_idx in range(self.chain_count):
297             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
298             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
299
300             configs.append({
301                 'count': cur_chain_flow_count,
302                 'mac_src': self.mac,
303                 'mac_dst': dest_macs[chain_idx],
304                 'ip_src_addr': src_ip_first,
305                 'ip_src_addr_max': src_ip_last,
306                 'ip_src_count': cur_chain_flow_count,
307                 'ip_dst_addr': dst_ip_first,
308                 'ip_dst_addr_max': dst_ip_last,
309                 'ip_dst_count': cur_chain_flow_count,
310                 'ip_addrs_step': self.ip_addrs_step,
311                 'udp_src_port': self.udp_src_port,
312                 'udp_dst_port': self.udp_dst_port,
313                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
314                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
315                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
316                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
317                 'vxlan': self.vxlan,
318                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
319                 'vtep_src_mac': self.mac if (self.vxlan or self.mpls) else None,
320                 'vtep_dst_mac': self.vtep_dst_mac if (self.vxlan or self.mpls) else None,
321                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
322                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
323                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None,
324                 'mpls': self.mpls,
325                 'mpls_outer_label': self.outer_labels[chain_idx] if self.mpls is True else None,
326                 'mpls_inner_label': self.inner_labels[chain_idx] if self.mpls is True else None
327
328             })
329             # after first chain, fall back to the flow count for all other chains
330             cur_chain_flow_count = flows_per_chain
331         return configs
332
333     @staticmethod
334     def ip_to_int(addr):
335         """Convert an IP address from string to numeric."""
336         return struct.unpack("!I", socket.inet_aton(addr))[0]
337
338     @staticmethod
339     def int_to_ip(nvalue):
340         """Convert an IP address from numeric to string."""
341         return socket.inet_ntoa(struct.pack("!I", int(nvalue)))
342
343
344 class GeneratorConfig(object):
345     """Represents traffic configuration for currently running traffic profile."""
346
347     DEFAULT_IP_STEP = '0.0.0.1'
348     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
349
350     def __init__(self, config):
351         """Create a generator config."""
352         self.config = config
353         # name of the generator profile (normally trex or dummy)
354         # pick the default one if not specified explicitly from cli options
355         if not config.generator_profile:
356             config.generator_profile = config.traffic_generator.default_profile
357         # pick up the profile dict based on the name
358         gen_config = self.__match_generator_profile(config.traffic_generator,
359                                                     config.generator_profile)
360         self.gen_config = gen_config
361         # copy over fields from the dict
362         self.tool = gen_config.tool
363         self.ip = gen_config.ip
364         # overrides on config.cores and config.mbuf_factor
365         if config.cores:
366             self.cores = config.cores
367         else:
368             self.cores = gen_config.get('cores', 1)
369         self.mbuf_factor = config.mbuf_factor
370         self.mbuf_64 = config.mbuf_64
371         self.hdrh = not config.disable_hdrh
372         if gen_config.intf_speed:
373             # interface speed is overriden from config
374             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
375         else:
376             # interface speed is discovered/provided by the traffic generator
377             self.intf_speed = 0
378         self.name = gen_config.name
379         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
380         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
381         self.limit_memory = gen_config.get('limit_memory', 1024)
382         self.software_mode = gen_config.get('software_mode', False)
383         self.interfaces = gen_config.interfaces
384         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
385             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
386         self.service_chain = config.service_chain
387         self.service_chain_count = config.service_chain_count
388         self.flow_count = config.flow_count
389         self.host_name = gen_config.host_name
390
391         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
392         self.ip_addrs = gen_config.ip_addrs
393         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
394         self.tg_gateway_ip_addrs_step = \
395             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
396         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
397         self.gateway_ips = gen_config.gateway_ip_addrs
398         self.udp_src_port = gen_config.udp_src_port
399         self.udp_dst_port = gen_config.udp_dst_port
400         self.vteps = gen_config.get('vteps')
401         self.devices = [Device(port, self) for port in [0, 1]]
402         # This should normally always be [0, 1]
403         self.ports = [device.port for device in self.devices]
404
405         # check that pci is not empty
406         if not gen_config.interfaces[0].get('pci', None) or \
407            not gen_config.interfaces[1].get('pci', None):
408             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
409
410         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
411         self.vlan_tagging = config.vlan_tagging
412
413         # needed for result/summarizer
414         config['tg-name'] = gen_config.name
415         config['tg-tool'] = self.tool
416
417     def to_json(self):
418         """Get json form to display the content into the overall result dict."""
419         return dict(self.gen_config)
420
421     def set_dest_macs(self, port_index, dest_macs):
422         """Set the list of dest MACs indexed by the chain id on given port.
423
424         port_index: the port for which dest macs must be set
425         dest_macs: a list of dest MACs indexed by chain id
426         """
427         if len(dest_macs) < self.config.service_chain_count:
428             raise TrafficClientException('Dest MAC list %s must have %d entries' %
429                                          (dest_macs, self.config.service_chain_count))
430         # only pass the first scc dest MACs
431         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
432         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
433
434     def set_vtep_dest_macs(self, port_index, dest_macs):
435         """Set the list of dest MACs indexed by the chain id on given port.
436
437         port_index: the port for which dest macs must be set
438         dest_macs: a list of dest MACs indexed by chain id
439         """
440         if len(dest_macs) != self.config.service_chain_count:
441             raise TrafficClientException('Dest MAC list %s must have %d entries' %
442                                          (dest_macs, self.config.service_chain_count))
443         self.devices[port_index].set_vtep_dst_mac(dest_macs)
444         LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs})
445
446     def get_dest_macs(self):
447         """Return the list of dest macs indexed by port."""
448         return [dev.get_dest_macs() for dev in self.devices]
449
450     def set_vlans(self, port_index, vlans):
451         """Set the list of vlans to use indexed by the chain id on given port.
452
453         port_index: the port for which VLANs must be set
454         vlans: a  list of vlan lists indexed by chain id
455         """
456         if len(vlans) != self.config.service_chain_count:
457             raise TrafficClientException('VLAN list %s must have %d entries' %
458                                          (vlans, self.config.service_chain_count))
459         self.devices[port_index].set_vlans(vlans)
460
461     def set_vxlans(self, port_index, vxlans):
462         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
463
464         port_index: the port for which VXLANs must be set
465         VXLANs: a  list of VNIs lists indexed by chain id
466         """
467         if len(vxlans) != self.config.service_chain_count:
468             raise TrafficClientException('VXLAN list %s must have %d entries' %
469                                          (vxlans, self.config.service_chain_count))
470         self.devices[port_index].set_vxlans(vxlans)
471
472     def set_mpls_inner_labels(self, port_index, labels):
473         """Set the list of MPLS Labels to use indexed by the chain id on given port.
474
475         port_index: the port for which Labels must be set
476         Labels: a list of Labels lists indexed by chain id
477         """
478         if len(labels) != self.config.service_chain_count:
479             raise TrafficClientException('Inner MPLS list %s must have %d entries' %
480                                          (labels, self.config.service_chain_count))
481         self.devices[port_index].set_mpls_inner_labels(labels)
482
483     def set_mpls_outer_labels(self, port_index, labels):
484         """Set the list of MPLS Labels to use indexed by the chain id on given port.
485
486         port_index: the port for which Labels must be set
487         Labels: a list of Labels lists indexed by chain id
488         """
489         if len(labels) != self.config.service_chain_count:
490             raise TrafficClientException('Outer MPLS list %s must have %d entries' %
491                                          (labels, self.config.service_chain_count))
492         self.devices[port_index].set_mpls_outer_labels(labels)
493
494     def set_vtep_vlan(self, port_index, vlan):
495         """Set the vtep vlan to use indexed by the chain id on given port.
496         port_index: the port for which VLAN must be set
497         """
498         self.devices[port_index].set_vtep_vlan(vlan)
499
500     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
501         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
502
503     def set_mpls_peers(self, port_index, src_ip, dst_ip):
504         self.devices[port_index].set_mpls_peers(src_ip, dst_ip)
505
506     @staticmethod
507     def __match_generator_profile(traffic_generator, generator_profile):
508         gen_config = AttrDict(traffic_generator)
509         gen_config.pop('default_profile')
510         gen_config.pop('generator_profile')
511         matching_profile = [profile for profile in traffic_generator.generator_profile if
512                             profile.name == generator_profile]
513         if len(matching_profile) != 1:
514             raise Exception('Traffic generator profile not found: ' + generator_profile)
515
516         gen_config.update(matching_profile[0])
517         return gen_config
518
519
520 class TrafficClient(object):
521     """Traffic generator client with NDR/PDR binary seearch."""
522
523     PORTS = [0, 1]
524
525     def __init__(self, config, notifier=None):
526         """Create a new TrafficClient instance.
527
528         config: nfvbench config
529         notifier: notifier (optional)
530
531         A new instance is created everytime the nfvbench config may have changed.
532         """
533         self.config = config
534         self.generator_config = GeneratorConfig(config)
535         self.tool = self.generator_config.tool
536         self.gen = self._get_generator()
537         self.notifier = notifier
538         self.interval_collector = None
539         self.iteration_collector = None
540         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
541                                     self.config.service_mode)
542         self.config.frame_sizes = self._get_frame_sizes()
543         self.run_config = {
544             'l2frame_size': None,
545             'duration_sec': self.config.duration_sec,
546             'bidirectional': True,
547             'rates': []  # to avoid unsbuscriptable-obj warning
548         }
549         self.current_total_rate = {'rate_percent': '10'}
550         if self.config.single_run:
551             self.current_total_rate = utils.parse_rate_str(self.config.rate)
552         self.ifstats = None
553         # Speed is either discovered when connecting to TG or set from config
554         # This variable is 0 if not yet discovered from TG or must be the speed of
555         # each interface in bits per second
556         self.intf_speed = self.generator_config.intf_speed
557
558     def _get_generator(self):
559         tool = self.tool.lower()
560         if tool == 'trex':
561             from .traffic_gen import trex_gen
562             return trex_gen.TRex(self)
563         if tool == 'dummy':
564             from .traffic_gen import dummy
565             return dummy.DummyTG(self)
566         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
567
568     def skip_sleep(self):
569         """Skip all sleeps when doing unit testing with dummy TG.
570
571         Must be overriden using mock.patch
572         """
573         return False
574
575     def _get_frame_sizes(self):
576         traffic_profile_name = self.config.traffic.profile
577         matching_profiles = [profile for profile in self.config.traffic_profile if
578                              profile.name == traffic_profile_name]
579         if len(matching_profiles) > 1:
580             raise TrafficClientException('Multiple traffic profiles with name: ' +
581                                          traffic_profile_name)
582         if not matching_profiles:
583             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
584         return matching_profiles[0].l2frame_size
585
586     def start_traffic_generator(self):
587         """Start the traffic generator process (traffic not started yet)."""
588         self.gen.connect()
589         # pick up the interface speed if it is not set from config
590         intf_speeds = self.gen.get_port_speed_gbps()
591         # convert Gbps unit into bps
592         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
593         if self.intf_speed:
594             # interface speed is overriden from config
595             if self.intf_speed != tg_if_speed:
596                 # Warn the user if the speed in the config is different
597                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
598                             intf_speeds[0])
599         else:
600             # interface speed not provisioned by config
601             self.intf_speed = tg_if_speed
602             # also update the speed in the tg config
603             self.generator_config.intf_speed = tg_if_speed
604
605         # Save the traffic generator local MAC
606         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
607             device.set_mac(mac)
608
609     def setup(self):
610         """Set up the traffic client."""
611         self.gen.clear_stats()
612
613     def get_version(self):
614         """Get the traffic generator version."""
615         return self.gen.get_version()
616
617     def ensure_end_to_end(self):
618         """Ensure traffic generator receives packets it has transmitted.
619
620         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
621
622         VMs that are started and in active state may not pass traffic yet. It is imperative to make
623         sure that all VMs are passing traffic in both directions before starting any benchmarking.
624         To verify this, we need to send at a low frequency bi-directional packets and make sure
625         that we receive all packets back from all VMs. The number of flows is equal to 2 times
626         the number of chains (1 per direction) and we need to make sure we receive packets coming
627         from exactly 2 x chain count different source MAC addresses.
628
629         Example:
630             PVP chain (1 VM per chain)
631             N = 10 (number of chains)
632             Flow count = 20 (number of flows)
633             If the number of unique source MAC addresses from received packets is 20 then
634             all 10 VMs 10 VMs are in operational state.
635         """
636         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
637         # send 2pps on each chain and each direction
638         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
639         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
640                                 e2e=True)
641         # ensures enough traffic is coming back
642         retry_count = int((self.config.check_traffic_time_sec +
643                            self.config.generic_poll_sec - 1) / self.config.generic_poll_sec)
644
645         # we expect to see packets coming from 2 unique MAC per chain
646         # because there can be flooding in the case of shared net
647         # we must verify that packets from the right VMs are received
648         # and not just count unique src MAC
649         # create a dict of (port, chain) tuples indexed by dest mac
650         mac_map = {}
651         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
652             for chain, mac in enumerate(dest_macs):
653                 mac_map[mac] = (port, chain)
654         unique_src_mac_count = len(mac_map)
655         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
656             get_mac_id = lambda packet: packet['binary'][60:66]
657         elif self.config.vxlan:
658             get_mac_id = lambda packet: packet['binary'][56:62]
659         elif self.config.mpls:
660             get_mac_id = lambda packet: packet['binary'][24:30]
661             # mpls_transport_label = lambda packet: packet['binary'][14:18]
662         else:
663             get_mac_id = lambda packet: packet['binary'][6:12]
664         for it in range(retry_count):
665             self.gen.clear_stats()
666             self.gen.start_traffic()
667             self.gen.start_capture()
668             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
669                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
670                      it + 1, retry_count)
671             if not self.skip_sleep():
672                 time.sleep(self.config.generic_poll_sec)
673             self.gen.stop_traffic()
674             self.gen.fetch_capture_packets()
675             self.gen.stop_capture()
676             for packet in self.gen.packet_list:
677                 mac_id = get_mac_id(packet).decode('latin-1')
678                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
679                 if self.config.mpls:
680                     if src_mac in mac_map and self.is_mpls(packet):
681                         port, chain = mac_map[src_mac]
682                         LOG.info('Received mpls packet from mac: %s (chain=%d, port=%d)',
683                                  src_mac, chain, port)
684                         mac_map.pop(src_mac, None)
685                 else:
686                     if src_mac in mac_map and self.is_udp(packet):
687                         port, chain = mac_map[src_mac]
688                         LOG.info('Received udp packet from mac: %s (chain=%d, port=%d)',
689                                  src_mac, chain, port)
690                         mac_map.pop(src_mac, None)
691
692                 if not mac_map:
693                     LOG.info('End-to-end connectivity established')
694                     return
695             if self.config.l3_router and not self.config.no_arp:
696                 # In case of L3 traffic mode, routers are not able to route traffic
697                 # until VM interfaces are up and ARP requests are done
698                 LOG.info('Waiting for loopback service completely started...')
699                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
700                 self.ensure_arp_successful()
701         raise TrafficClientException('End-to-end connectivity cannot be ensured')
702
703     def is_udp(self, packet):
704         pkt = Ether(packet['binary'])
705         return UDP in pkt
706
707     def is_mpls(self, packet):
708         pkt = Ether(packet['binary'])
709         return MPLS in pkt
710
711     def ensure_arp_successful(self):
712         """Resolve all IP using ARP and throw an exception in case of failure."""
713         dest_macs = self.gen.resolve_arp()
714         if dest_macs:
715             # all dest macs are discovered, saved them into the generator config
716             if self.config.vxlan or self.config.mpls:
717                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
718                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
719             else:
720                 self.generator_config.set_dest_macs(0, dest_macs[0])
721                 self.generator_config.set_dest_macs(1, dest_macs[1])
722         else:
723             raise TrafficClientException('ARP cannot be resolved')
724
725     def set_traffic(self, frame_size, bidirectional):
726         """Reconfigure the traffic generator for a new frame size."""
727         self.run_config['bidirectional'] = bidirectional
728         self.run_config['l2frame_size'] = frame_size
729         self.run_config['rates'] = [self.get_per_direction_rate()]
730         if bidirectional:
731             self.run_config['rates'].append(self.get_per_direction_rate())
732         else:
733             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
734             if unidir_reverse_pps > 0:
735                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
736         # Fix for [NFVBENCH-67], convert the rate string to PPS
737         for idx, rate in enumerate(self.run_config['rates']):
738             if 'rate_pps' not in rate:
739                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
740
741         self.gen.clear_streamblock()
742
743         if self.config.no_latency_streams:
744             LOG.info("Latency streams are disabled")
745         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
746                                 latency=not self.config.no_latency_streams)
747
748     def _modify_load(self, load):
749         self.current_total_rate = {'rate_percent': str(load)}
750         rate_per_direction = self.get_per_direction_rate()
751
752         self.gen.modify_rate(rate_per_direction, False)
753         self.run_config['rates'][0] = rate_per_direction
754         if self.run_config['bidirectional']:
755             self.gen.modify_rate(rate_per_direction, True)
756             self.run_config['rates'][1] = rate_per_direction
757
758     def get_ndr_and_pdr(self):
759         """Start the NDR/PDR iteration and return the results."""
760         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
761         targets = {}
762         if self.config.ndr_run:
763             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
764             targets['ndr'] = self.config.measurement.NDR
765         if self.config.pdr_run:
766             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
767             targets['pdr'] = self.config.measurement.PDR
768
769         self.run_config['start_time'] = time.time()
770         self.interval_collector = IntervalCollector(self.run_config['start_time'])
771         self.interval_collector.attach_notifier(self.notifier)
772         self.iteration_collector = IterationCollector(self.run_config['start_time'])
773         results = {}
774         self.__range_search(0.0, 200.0, targets, results)
775
776         results['iteration_stats'] = {
777             'ndr_pdr': self.iteration_collector.get()
778         }
779
780         if self.config.ndr_run:
781             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
782             results['ndr']['time_taken_sec'] = \
783                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
784             if self.config.pdr_run:
785                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
786                 results['pdr']['time_taken_sec'] = \
787                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
788         else:
789             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
790             results['pdr']['time_taken_sec'] = \
791                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
792         return results
793
794     def __get_dropped_rate(self, result):
795         dropped_pkts = result['rx']['dropped_pkts']
796         total_pkts = result['tx']['total_pkts']
797         if not total_pkts:
798             return float('inf')
799         return float(dropped_pkts) / total_pkts * 100
800
801     def get_stats(self):
802         """Collect final stats for previous run."""
803         stats = self.gen.get_stats()
804         retDict = {'total_tx_rate': stats['total_tx_rate']}
805
806         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
807         rx_keys = tx_keys + ['dropped_pkts']
808
809         for port in self.PORTS:
810             port_stats = {'tx': {}, 'rx': {}}
811             for key in tx_keys:
812                 port_stats['tx'][key] = int(stats[port]['tx'][key])
813             for key in rx_keys:
814                 try:
815                     port_stats['rx'][key] = int(stats[port]['rx'][key])
816                 except ValueError:
817                     port_stats['rx'][key] = 0
818             port_stats['rx']['avg_delay_usec'] = cast_integer(
819                 stats[port]['rx']['avg_delay_usec'])
820             port_stats['rx']['min_delay_usec'] = cast_integer(
821                 stats[port]['rx']['min_delay_usec'])
822             port_stats['rx']['max_delay_usec'] = cast_integer(
823                 stats[port]['rx']['max_delay_usec'])
824             port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats)
825             retDict[str(port)] = port_stats
826
827         ports = sorted(list(retDict.keys()), key=str)
828         if self.run_config['bidirectional']:
829             retDict['overall'] = {'tx': {}, 'rx': {}}
830             for key in tx_keys:
831                 retDict['overall']['tx'][key] = \
832                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
833             for key in rx_keys:
834                 retDict['overall']['rx'][key] = \
835                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
836             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
837                           retDict[ports[1]]['rx']['total_pkts']]
838             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
839                           retDict[ports[1]]['rx']['avg_delay_usec']]
840             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
841                           retDict[ports[1]]['rx']['max_delay_usec']]
842             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
843                           retDict[ports[1]]['rx']['min_delay_usec']]
844             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
845             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
846             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
847             for key in ['pkt_bit_rate', 'pkt_rate']:
848                 for dirc in ['tx', 'rx']:
849                     retDict['overall'][dirc][key] /= 2.0
850         else:
851             retDict['overall'] = retDict[ports[0]]
852         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
853         return retDict
854
855     def __convert_rates(self, rate):
856         return utils.convert_rates(self.run_config['l2frame_size'],
857                                    rate,
858                                    self.intf_speed)
859
860     def __ndr_pdr_found(self, tag, load):
861         rates = self.__convert_rates({'rate_percent': load})
862         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
863         last_stats = self.iteration_collector.peek()
864         self.interval_collector.add_ndr_pdr(tag, last_stats)
865
866     def __format_output_stats(self, stats):
867         for key in self.PORTS + ['overall']:
868             key = str(key)
869             interface = stats[key]
870             stats[key] = {
871                 'tx_pkts': interface['tx']['total_pkts'],
872                 'rx_pkts': interface['rx']['total_pkts'],
873                 'drop_percentage': interface['drop_rate_percent'],
874                 'drop_pct': interface['rx']['dropped_pkts'],
875                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
876                 'max_delay_usec': interface['rx']['max_delay_usec'],
877                 'min_delay_usec': interface['rx']['min_delay_usec'],
878             }
879
880         return stats
881
882     def __targets_found(self, rate, targets, results):
883         for tag, target in list(targets.items()):
884             LOG.info('Found %s (%s) load: %s', tag, target, rate)
885             self.__ndr_pdr_found(tag, rate)
886             results[tag]['timestamp_sec'] = time.time()
887
888     def __range_search(self, left, right, targets, results):
889         """Perform a binary search for a list of targets inside a [left..right] range or rate.
890
891         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
892                 indicating the rate to send on each interface
893         right   the right side of the range to search as a % of line rate
894                 indicating the rate to send on each interface
895         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
896                 ('ndr', 'pdr')
897         results a dict to store results
898         """
899         if not targets:
900             return
901         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
902
903         # Terminate search when gap is less than load epsilon
904         if right - left < self.config.measurement.load_epsilon:
905             self.__targets_found(left, targets, results)
906             return
907
908         # Obtain the average drop rate in for middle load
909         middle = (left + right) / 2.0
910         try:
911             stats, rates = self.__run_search_iteration(middle)
912         except STLError:
913             LOG.exception("Got exception from traffic generator during binary search")
914             self.__targets_found(left, targets, results)
915             return
916         # Split target dicts based on the avg drop rate
917         left_targets = {}
918         right_targets = {}
919         for tag, target in list(targets.items()):
920             if stats['overall']['drop_rate_percent'] <= target:
921                 # record the best possible rate found for this target
922                 results[tag] = rates
923                 results[tag].update({
924                     'load_percent_per_direction': middle,
925                     'stats': self.__format_output_stats(dict(stats)),
926                     'timestamp_sec': None
927                 })
928                 right_targets[tag] = target
929             else:
930                 # initialize to 0 all fields of result for
931                 # the worst case scenario of the binary search (if ndr/pdr is not found)
932                 if tag not in results:
933                     results[tag] = dict.fromkeys(rates, 0)
934                     empty_stats = self.__format_output_stats(dict(stats))
935                     for key in empty_stats:
936                         if isinstance(empty_stats[key], dict):
937                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
938                         else:
939                             empty_stats[key] = 0
940                     results[tag].update({
941                         'load_percent_per_direction': 0,
942                         'stats': empty_stats,
943                         'timestamp_sec': None
944                     })
945                 left_targets[tag] = target
946
947         # search lower half
948         self.__range_search(left, middle, left_targets, results)
949
950         # search upper half only if the upper rate does not exceed
951         # 100%, this only happens when the first search at 100%
952         # yields a DR that is < target DR
953         if middle >= 100:
954             self.__targets_found(100, right_targets, results)
955         else:
956             self.__range_search(middle, right, right_targets, results)
957
958     def __run_search_iteration(self, rate):
959         """Run one iteration at the given rate level.
960
961         rate: the rate to send on each port in percent (0 to 100)
962         """
963         self._modify_load(rate)
964
965         # poll interval stats and collect them
966         for stats in self.run_traffic():
967             self.interval_collector.add(stats)
968             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
969             if time_elapsed_ratio >= 1:
970                 self.cancel_traffic()
971                 if not self.skip_sleep():
972                     time.sleep(self.config.pause_sec)
973         self.interval_collector.reset()
974
975         # get stats from the run
976         stats = self.runner.client.get_stats()
977         current_traffic_config = self._get_traffic_config()
978         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
979                                         stats['total_tx_rate'])
980         if warning is not None:
981             stats['warning'] = warning
982
983         # save reliable stats from whole iteration
984         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
985         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
986         return stats, current_traffic_config['direction-total']
987
988     @staticmethod
989     def log_stats(stats):
990         """Log estimated stats during run."""
991         report = {
992             'datetime': str(datetime.now()),
993             'tx_packets': stats['overall']['tx']['total_pkts'],
994             'rx_packets': stats['overall']['rx']['total_pkts'],
995             'drop_packets': stats['overall']['rx']['dropped_pkts'],
996             'drop_rate_percent': stats['overall']['drop_rate_percent']
997         }
998         LOG.info('TX: %(tx_packets)d; '
999                  'RX: %(rx_packets)d; '
1000                  'Est. Dropped: %(drop_packets)d; '
1001                  'Est. Drop rate: %(drop_rate_percent).4f%%',
1002                  report)
1003
1004     def run_traffic(self):
1005         """Start traffic and return intermediate stats for each interval."""
1006         stats = self.runner.run()
1007         while self.runner.is_running:
1008             self.log_stats(stats)
1009             yield stats
1010             stats = self.runner.poll_stats()
1011             if stats is None:
1012                 return
1013         self.log_stats(stats)
1014         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
1015         yield stats
1016
1017     def cancel_traffic(self):
1018         """Stop traffic."""
1019         self.runner.stop()
1020
1021     def _get_traffic_config(self):
1022         config = {}
1023         load_total = 0.0
1024         bps_total = 0.0
1025         pps_total = 0.0
1026         for idx, rate in enumerate(self.run_config['rates']):
1027             key = 'direction-forward' if idx == 0 else 'direction-reverse'
1028             config[key] = {
1029                 'l2frame_size': self.run_config['l2frame_size'],
1030                 'duration_sec': self.run_config['duration_sec']
1031             }
1032             config[key].update(rate)
1033             config[key].update(self.__convert_rates(rate))
1034             load_total += float(config[key]['rate_percent'])
1035             bps_total += float(config[key]['rate_bps'])
1036             pps_total += float(config[key]['rate_pps'])
1037         config['direction-total'] = dict(config['direction-forward'])
1038         config['direction-total'].update({
1039             'rate_percent': load_total,
1040             'rate_pps': cast_integer(pps_total),
1041             'rate_bps': bps_total
1042         })
1043
1044         return config
1045
1046     def get_run_config(self, results):
1047         """Return configuration which was used for the last run."""
1048         r = {}
1049         # because we want each direction to have the far end RX rates,
1050         # use the far end index (1-idx) to retrieve the RX rates
1051         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
1052             tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec
1053             rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec
1054             r[key] = {
1055                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
1056                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
1057                 "rx": self.__convert_rates({'rate_pps': rx_rate})
1058             }
1059
1060         total = {}
1061         for direction in ['orig', 'tx', 'rx']:
1062             total[direction] = {}
1063             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
1064                 total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())])
1065
1066         r['direction-total'] = total
1067         return r
1068
1069     def insert_interface_stats(self, pps_list):
1070         """Insert interface stats to a list of packet path stats.
1071
1072         pps_list: a list of packet path stats instances indexed by chain index
1073
1074         This function will insert the packet path stats for the traffic gen ports 0 and 1
1075         with itemized per chain tx/rx counters.
1076         There will be as many packet path stats as chains.
1077         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1078         self.pps_list:
1079         [
1080         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1081         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1082         ...
1083         ]
1084         """
1085         def get_if_stats(chain_idx):
1086             return [InterfaceStats('p' + str(port), self.tool)
1087                     for port in range(2)]
1088         # keep the list of list of interface stats indexed by the chain id
1089         self.ifstats = [get_if_stats(chain_idx)
1090                         for chain_idx in range(self.config.service_chain_count)]
1091         # note that we need to make a copy of the ifs list so that any modification in the
1092         # list from pps will not change the list saved in self.ifstats
1093         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1094         # insert the corresponding pps in the passed list
1095         pps_list.extend(self.pps_list)
1096
1097     def update_interface_stats(self, diff=False):
1098         """Update all interface stats.
1099
1100         diff: if False, simply refresh the interface stats values with latest values
1101               if True, diff the interface stats with the latest values
1102         Make sure that the interface stats inserted in insert_interface_stats() are updated
1103         with proper values.
1104         self.ifstats:
1105         [
1106         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1107         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1108         ...
1109         ]
1110         """
1111         if diff:
1112             stats = self.gen.get_stats()
1113             for chain_idx, ifs in enumerate(self.ifstats):
1114                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1115                 # corresponding to the
1116                 # port 0 and port 1 for the given chain_idx
1117                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1118                 # interface stats for the pps because it could have been modified to contain
1119                 # additional interface stats
1120                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1121             # special handling for vxlan
1122             # in case of vxlan, flow stats are not available so all rx counters will be
1123             # zeros when the total rx port counter is non zero.
1124             # in that case,
1125             for port in range(2):
1126                 total_rx = 0
1127                 for ifs in self.ifstats:
1128                     total_rx += ifs[port].rx
1129                 if total_rx == 0:
1130                     # check if the total port rx from Trex is also zero
1131                     port_rx = stats[port]['rx']['total_pkts']
1132                     if port_rx:
1133                         # the total rx for all chains from port level stats is non zero
1134                         # which means that the per-chain stats are not available
1135                         if len(self.ifstats) == 1:
1136                             # only one chain, simply report the port level rx to the chain rx stats
1137                             self.ifstats[0][port].rx = port_rx
1138                         else:
1139                             for ifs in self.ifstats:
1140                                 # mark this data as unavailable
1141                                 ifs[port].rx = None
1142                             # pitch in the total rx only in the last chain pps
1143                             self.ifstats[-1][port].rx_total = port_rx
1144
1145     @staticmethod
1146     def compare_tx_rates(required, actual):
1147         """Compare the actual TX rate to the required TX rate."""
1148         threshold = 0.9
1149         are_different = False
1150         try:
1151             if float(actual) / required < threshold:
1152                 are_different = True
1153         except ZeroDivisionError:
1154             are_different = True
1155
1156         if are_different:
1157             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1158                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1159                   "to achieve the requested TX rate.".format(r=required, a=actual)
1160             LOG.info(msg)
1161             return msg
1162
1163         return None
1164
1165     def get_per_direction_rate(self):
1166         """Get the rate for each direction."""
1167         divisor = 2 if self.run_config['bidirectional'] else 1
1168         if 'rate_percent' in self.current_total_rate:
1169             # don't split rate if it's percentage
1170             divisor = 1
1171
1172         return utils.divide_rate(self.current_total_rate, divisor)
1173
1174     def close(self):
1175         """Close this instance."""
1176         try:
1177             self.gen.stop_traffic()
1178         except Exception:
1179             pass
1180         self.gen.clear_stats()
1181         self.gen.cleanup()