NFVBENCH-172: Add quartiles and 99 percentile latency values
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16 from math import gcd
17 import socket
18 import struct
19 import time
20
21 from attrdict import AttrDict
22 import bitmath
23 from hdrh.histogram import HdrHistogram
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: disable=wrong-import-order
30 from scapy.contrib.mpls import MPLS  # flake8: noqa
31 # pylint: enable=wrong-import-order
32 # pylint: enable=import-error
33
34 from .log import LOG
35 from .packet_stats import InterfaceStats
36 from .packet_stats import PacketPathStats
37 from .stats_collector import IntervalCollector
38 from .stats_collector import IterationCollector
39 from .traffic_gen import traffic_utils as utils
40 from .utils import cast_integer
41
42 class TrafficClientException(Exception):
43     """Generic traffic client exception."""
44
45 class TrafficRunner(object):
46     """Serialize various steps required to run traffic."""
47
48     def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
49         """Create a traffic runner."""
50         self.client = client
51         self.start_time = None
52         self.duration_sec = duration_sec
53         self.interval_sec = interval_sec
54         self.service_mode = service_mode
55
56     def run(self):
57         """Clear stats and instruct the traffic generator to start generating traffic."""
58         if self.is_running():
59             return None
60         LOG.info('Running traffic generator')
61         self.client.gen.clear_stats()
62         # Debug use only : new '--service-mode' option available for the NFVBench command line.
63         # A read-only mode TRex console would be able to capture the generated traffic.
64         self.client.gen.set_service_mode(enabled=self.service_mode)
65         LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
66         self.client.gen.start_traffic()
67         self.start_time = time.time()
68         return self.poll_stats()
69
70     def stop(self):
71         """Stop the current run and instruct the traffic generator to stop traffic."""
72         if self.is_running():
73             self.start_time = None
74             self.client.gen.stop_traffic()
75
76     def is_running(self):
77         """Check if a run is still pending."""
78         return self.start_time is not None
79
80     def time_elapsed(self):
81         """Return time elapsed since start of run."""
82         if self.is_running():
83             return time.time() - self.start_time
84         return self.duration_sec
85
86     def poll_stats(self):
87         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
88
89         return: latest stats or None if traffic is stopped
90         """
91         if not self.is_running():
92             return None
93         if self.client.skip_sleep():
94             self.stop()
95             return self.client.get_stats()
96         time_elapsed = self.time_elapsed()
97         if time_elapsed > self.duration_sec:
98             self.stop()
99             return None
100         time_left = self.duration_sec - time_elapsed
101         if self.interval_sec > 0.0:
102             if time_left <= self.interval_sec:
103                 time.sleep(time_left)
104                 self.stop()
105             else:
106                 time.sleep(self.interval_sec)
107         else:
108             time.sleep(self.duration_sec)
109             self.stop()
110         return self.client.get_stats()
111
112
113 class IpBlock(object):
114     """Manage a block of IP addresses."""
115
116     def __init__(self, base_ip, step_ip, count_ip):
117         """Create an IP block."""
118         self.base_ip_int = Device.ip_to_int(base_ip)
119         self.step = Device.ip_to_int(step_ip)
120         self.max_available = count_ip
121         self.next_free = 0
122
123     def get_ip(self, index=0):
124         """Return the IP address at given index."""
125         if index < 0 or index >= self.max_available:
126             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
127         return Device.int_to_ip(self.base_ip_int + index * self.step)
128
129     def reserve_ip_range(self, count, force_ip_reservation=False):
130         """Reserve a range of count consecutive IP addresses spaced by step.
131         force_ip_reservation parameter allows to continue the calculation of IPs when
132         the 2 sides (ports) have different size and the flow is greater than
133         the size as well.
134         """
135         if self.next_free + count > self.max_available and force_ip_reservation is False:
136             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
137                              (self.next_free,
138                               self.max_available,
139                               count))
140         if self.next_free + count > self.max_available and force_ip_reservation is True:
141             first_ip = self.get_ip(self.next_free)
142             last_ip = self.get_ip(self.next_free + self.max_available - 1)
143             self.next_free += self.max_available
144         else:
145             first_ip = self.get_ip(self.next_free)
146             last_ip = self.get_ip(self.next_free + count - 1)
147             self.next_free += count
148         return (first_ip, last_ip)
149
150     def reset_reservation(self):
151         """Reset all reservations and restart with a completely unused IP block."""
152         self.next_free = 0
153
154
155 class UdpPorts(object):
156
157     def __init__(self, src_min, src_max, dst_min, dst_max, step):
158
159         self.src_min = src_min
160         self.src_max = src_max
161         self.dst_min = dst_min
162         self.dst_max = dst_max
163         self.step = step
164
165
166 class Device(object):
167     """Represent a port device and all information associated to it.
168
169     In the curent version we only support 2 port devices for the traffic generator
170     identified as port 0 or port 1.
171     """
172
173     def __init__(self, port, generator_config):
174         """Create a new device for a given port."""
175         self.generator_config = generator_config
176         self.chain_count = generator_config.service_chain_count
177         if generator_config.bidirectional:
178             self.flow_count = generator_config.flow_count / 2
179         else:
180             self.flow_count = generator_config.flow_count
181
182         self.port = port
183         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
184         self.vtep_vlan = None
185         self.vtep_src_mac = None
186         self.vxlan = False
187         self.mpls = False
188         self.inner_labels = None
189         self.outer_labels = None
190         self.pci = generator_config.interfaces[port].pci
191         self.mac = None
192         self.dest_macs = None
193         self.vtep_dst_mac = None
194         self.vtep_dst_ip = None
195         if generator_config.vteps is None:
196             self.vtep_src_ip = None
197         else:
198             self.vtep_src_ip = generator_config.vteps[port]
199         self.vnis = None
200         self.vlans = None
201         self.ip_addrs = generator_config.ip_addrs[port]
202         self.ip_src_static = generator_config.ip_src_static
203         self.ip_addrs_step = generator_config.ip_addrs_step
204         if self.ip_addrs_step == 'random':
205             # Set step to 1 to calculate the IP range size (see check_ip_size below)
206             step = '0.0.0.1'
207         else:
208             step = self.ip_addrs_step
209         self.ip_size = self.check_ipsize(IPNetwork(self.ip_addrs).size, Device.ip_to_int(step))
210         self.ip = str(IPNetwork(self.ip_addrs).network)
211         ip_addrs_left = generator_config.ip_addrs[0]
212         ip_addrs_right = generator_config.ip_addrs[1]
213         self.ip_addrs_size = {
214             'left': self.check_ipsize(IPNetwork(ip_addrs_left).size, Device.ip_to_int(step)),
215             'right': self.check_ipsize(IPNetwork(ip_addrs_right).size, Device.ip_to_int(step))}
216         udp_src_port = generator_config.gen_config.udp_src_port
217         if udp_src_port is None:
218             udp_src_port = 53
219         udp_dst_port = generator_config.gen_config.udp_dst_port
220         if udp_dst_port is None:
221             udp_dst_port = 53
222         src_max, src_min = self.define_udp_range(udp_src_port, 'udp_src_port')
223         dst_max, dst_min = self.define_udp_range(udp_dst_port, 'udp_dst_port')
224         udp_src_range = int(src_max) - int(src_min) + 1
225         udp_dst_range = int(dst_max) - int(dst_min) + 1
226         lcm_port = self.lcm(udp_src_range, udp_dst_range)
227         if self.ip_src_static is True:
228             lcm_ip = self.lcm(1, min(self.ip_addrs_size['left'], self.ip_addrs_size['right']))
229         else:
230             lcm_ip = self.lcm(self.ip_addrs_size['left'], self.ip_addrs_size['right'])
231         flow_max = self.lcm(lcm_port, lcm_ip)
232         if self.flow_count > flow_max:
233             raise TrafficClientException('Trying to set unachievable traffic (%d > %d)' %
234                                          (self.flow_count, flow_max))
235
236         # manage udp range regarding flow count value
237         # UDP dst range is greater than FC => range will be limited to min + FC
238         if self.flow_count <= udp_dst_range:
239             dst_max = int(dst_min) + self.flow_count - 1
240         # UDP src range is greater than FC => range will be limited to min + FC
241         if self.flow_count <= udp_src_range:
242             src_max = int(src_min) + self.flow_count - 1
243         # Define IP block limit regarding flow count
244         if self.flow_count <= self.ip_size:
245             self.ip_block = IpBlock(self.ip, step, self.flow_count)
246         else:
247             self.ip_block = IpBlock(self.ip, step, self.ip_size)
248
249         self.udp_ports = UdpPorts(src_min, src_max, dst_min, dst_max,
250                                   generator_config.gen_config.udp_port_step)
251         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
252                                    generator_config.gateway_ip_addrs_step,
253                                    self.chain_count)
254         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
255         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
256                                       generator_config.tg_gateway_ip_addrs_step,
257                                       self.chain_count)
258
259     @staticmethod
260     def define_udp_range(udp_port, property_name):
261         if isinstance(udp_port, int):
262             min = udp_port
263             max = min
264         elif isinstance(udp_port, tuple):
265             min = udp_port[0]
266             max = udp_port[1]
267         else:
268             raise TrafficClientException('Invalid %s property value (53 or [\'53\',\'1024\'])'
269                                          % property_name)
270         return max, min
271
272     @staticmethod
273     def lcm(a, b):
274         """Calculate the maximum possible value for both IP and ports,
275         eventually for maximum possible flux."""
276         if a != 0 and b != 0:
277             lcm_value = a * b // gcd(a, b)
278             return lcm_value
279         raise TypeError(" IP size or port range can't be zero !")
280
281     @staticmethod
282     def check_ipsize(ip_size, step):
283         """Check and set the available IPs, considering the step."""
284         try:
285             if ip_size % step == 0:
286                 value = int(ip_size / step)
287             else:
288                 value = int((ip_size / step)) + 1
289             return value
290         except ZeroDivisionError:
291             raise ZeroDivisionError("step can't be zero !")
292
293     def set_mac(self, mac):
294         """Set the local MAC for this port device."""
295         if mac is None:
296             raise TrafficClientException('Trying to set traffic generator MAC address as None')
297         self.mac = mac
298
299     def get_peer_device(self):
300         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
301         return self.generator_config.devices[1 - self.port]
302
303     def set_vtep_dst_mac(self, dest_macs):
304         """Set the list of dest MACs indexed by the chain id.
305
306         This is only called in 2 cases:
307         - VM macs discovered using openstack API
308         - dest MACs provisioned in config file
309         """
310         self.vtep_dst_mac = list(map(str, dest_macs))
311
312     def set_dest_macs(self, dest_macs):
313         """Set the list of dest MACs indexed by the chain id.
314
315         This is only called in 2 cases:
316         - VM macs discovered using openstack API
317         - dest MACs provisioned in config file
318         """
319         self.dest_macs = list(map(str, dest_macs))
320
321     def get_dest_macs(self):
322         """Get the list of dest macs for this device.
323
324         If set_dest_macs was never called, assumes l2-loopback and return
325         a list of peer mac (as many as chains but normally only 1 chain)
326         """
327         if self.dest_macs:
328             return self.dest_macs
329         # assume this is l2-loopback
330         return [self.get_peer_device().mac] * self.chain_count
331
332     def set_vlans(self, vlans):
333         """Set the list of vlans to use indexed by the chain id."""
334         self.vlans = vlans
335         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
336
337     def set_vtep_vlan(self, vlan):
338         """Set the vtep vlan to use indexed by specific port."""
339         self.vtep_vlan = vlan
340         self.vxlan = True
341         self.vlan_tagging = None
342         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
343
344     def set_vxlan_endpoints(self, src_ip, dst_ip):
345         self.vtep_dst_ip = dst_ip
346         self.vtep_src_ip = src_ip
347         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
348                  self.vtep_src_ip, self.vtep_dst_ip)
349
350     def set_mpls_peers(self, src_ip, dst_ip):
351         self.mpls = True
352         self.vtep_dst_ip = dst_ip
353         self.vtep_src_ip = src_ip
354         LOG.info("Port %d: src_mpls_vtep %s, mpls_peer_ip %s", self.port,
355                  self.vtep_src_ip, self.vtep_dst_ip)
356
357     def set_vxlans(self, vnis):
358         self.vnis = vnis
359         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
360
361     def set_mpls_inner_labels(self, labels):
362         self.inner_labels = labels
363         LOG.info("Port %d: MPLS Inner Labels %s", self.port, self.inner_labels)
364
365     def set_mpls_outer_labels(self, labels):
366         self.outer_labels = labels
367         LOG.info("Port %d: MPLS Outer Labels %s", self.port, self.outer_labels)
368
369     def set_gw_ip(self, gateway_ip):
370         self.gw_ip_block = IpBlock(gateway_ip,
371                                    self.generator_config.gateway_ip_addrs_step,
372                                    self.chain_count)
373
374     def get_gw_ip(self, chain_index):
375         """Retrieve the IP address assigned for the gateway of a given chain."""
376         return self.gw_ip_block.get_ip(chain_index)
377
378     def get_stream_configs(self):
379         """Get the stream config for a given chain on this device.
380
381         Called by the traffic generator driver to program the traffic generator properly
382         before generating traffic
383         """
384         configs = []
385         # exact flow count for each chain is calculated as follows:
386         # - all chains except the first will have the same flow count
387         #   calculated as (total_flows + chain_count - 1) / chain_count
388         # - the first chain will have the remainder
389         # example 11 flows and 3 chains => 3, 4, 4
390         flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count)
391         cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1))
392         force_ip_reservation = False
393         # use case example of this parameter:
394         # - static IP addresses (source & destination), netmask = /30
395         # - 4 varying UDP source ports | 1 UDP destination port
396         # - Flow count = 8
397         # --> parameter 'reserve_ip_range' should have flag 'force_ip_reservation'
398         # in order to assign the maximum available IP on each iteration
399         if self.ip_size < cur_chain_flow_count \
400                 or self.ip_addrs_size['left'] != self.ip_addrs_size['right']:
401             force_ip_reservation = True
402
403         peer = self.get_peer_device()
404         self.ip_block.reset_reservation()
405         peer.ip_block.reset_reservation()
406         dest_macs = self.get_dest_macs()
407
408         for chain_idx in range(self.chain_count):
409             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range\
410             (cur_chain_flow_count, force_ip_reservation)
411             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range\
412             (cur_chain_flow_count, force_ip_reservation)
413
414             configs.append({
415                 'count': cur_chain_flow_count,
416                 'mac_src': self.mac,
417                 'mac_dst': dest_macs[chain_idx],
418                 'ip_src_addr': src_ip_first,
419                 'ip_src_addr_max': src_ip_last,
420                 'ip_src_count': cur_chain_flow_count,
421                 'ip_dst_addr': dst_ip_first,
422                 'ip_dst_addr_max': dst_ip_last,
423                 'ip_dst_count': cur_chain_flow_count,
424                 'ip_addrs_step': self.ip_addrs_step,
425                 'ip_src_static': self.ip_src_static,
426                 'udp_src_port': self.udp_ports.src_min,
427                 'udp_src_port_max': self.udp_ports.src_max,
428                 'udp_src_count': int(self.udp_ports.src_max) - int(self.udp_ports.src_min) + 1,
429                 'udp_dst_port': self.udp_ports.dst_min,
430                 'udp_dst_port_max': self.udp_ports.dst_max,
431                 'udp_dst_count': int(self.udp_ports.dst_max) - int(self.udp_ports.dst_min) + 1,
432                 'udp_port_step': self.udp_ports.step,
433                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
434                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
435                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
436                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
437                 'vxlan': self.vxlan,
438                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
439                 'vtep_src_mac': self.mac if (self.vxlan or self.mpls) else None,
440                 'vtep_dst_mac': self.vtep_dst_mac if (self.vxlan or self.mpls) else None,
441                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
442                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
443                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None,
444                 'mpls': self.mpls,
445                 'mpls_outer_label': self.outer_labels[chain_idx] if self.mpls is True else None,
446                 'mpls_inner_label': self.inner_labels[chain_idx] if self.mpls is True else None
447
448             })
449             # after first chain, fall back to the flow count for all other chains
450             cur_chain_flow_count = flows_per_chain
451         return configs
452
453     @staticmethod
454     def ip_to_int(addr):
455         """Convert an IP address from string to numeric."""
456         return struct.unpack("!I", socket.inet_aton(addr))[0]
457
458     @staticmethod
459     def int_to_ip(nvalue):
460         """Convert an IP address from numeric to string."""
461         return socket.inet_ntoa(struct.pack("!I", int(nvalue)))
462
463
464 class GeneratorConfig(object):
465     """Represents traffic configuration for currently running traffic profile."""
466
467     DEFAULT_IP_STEP = '0.0.0.1'
468     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
469
470     def __init__(self, config):
471         """Create a generator config."""
472         self.config = config
473         # name of the generator profile (normally trex or dummy)
474         # pick the default one if not specified explicitly from cli options
475         if not config.generator_profile:
476             config.generator_profile = config.traffic_generator.default_profile
477         # pick up the profile dict based on the name
478         gen_config = self.__match_generator_profile(config.traffic_generator,
479                                                     config.generator_profile)
480         self.gen_config = gen_config
481         # copy over fields from the dict
482         self.tool = gen_config.tool
483         self.ip = gen_config.ip
484         # overrides on config.cores and config.mbuf_factor
485         if config.cores:
486             self.cores = config.cores
487         else:
488             self.cores = gen_config.get('cores', 1)
489         self.mbuf_factor = config.mbuf_factor
490         self.mbuf_64 = config.mbuf_64
491         self.hdrh = not config.disable_hdrh
492         if gen_config.intf_speed:
493             # interface speed is overriden from config
494             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
495         else:
496             # interface speed is discovered/provided by the traffic generator
497             self.intf_speed = 0
498         self.name = gen_config.name
499         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
500         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
501         self.limit_memory = gen_config.get('limit_memory', 1024)
502         self.software_mode = gen_config.get('software_mode', False)
503         self.interfaces = gen_config.interfaces
504         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
505             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
506         self.service_chain = config.service_chain
507         self.service_chain_count = config.service_chain_count
508         self.flow_count = config.flow_count
509         self.host_name = gen_config.host_name
510         self.bidirectional = config.traffic.bidirectional
511         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
512         self.ip_addrs = gen_config.ip_addrs
513         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
514         self.tg_gateway_ip_addrs_step = \
515             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
516         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
517         self.gateway_ips = gen_config.gateway_ip_addrs
518         self.ip_src_static = gen_config.ip_src_static
519         self.vteps = gen_config.get('vteps')
520         self.devices = [Device(port, self) for port in [0, 1]]
521         # This should normally always be [0, 1]
522         self.ports = [device.port for device in self.devices]
523
524         # check that pci is not empty
525         if not gen_config.interfaces[0].get('pci', None) or \
526            not gen_config.interfaces[1].get('pci', None):
527             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
528
529         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
530         self.vlan_tagging = config.vlan_tagging
531
532         # needed for result/summarizer
533         config['tg-name'] = gen_config.name
534         config['tg-tool'] = self.tool
535
536     def to_json(self):
537         """Get json form to display the content into the overall result dict."""
538         return dict(self.gen_config)
539
540     def set_dest_macs(self, port_index, dest_macs):
541         """Set the list of dest MACs indexed by the chain id on given port.
542
543         port_index: the port for which dest macs must be set
544         dest_macs: a list of dest MACs indexed by chain id
545         """
546         if len(dest_macs) < self.config.service_chain_count:
547             raise TrafficClientException('Dest MAC list %s must have %d entries' %
548                                          (dest_macs, self.config.service_chain_count))
549         # only pass the first scc dest MACs
550         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
551         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
552
553     def set_vtep_dest_macs(self, port_index, dest_macs):
554         """Set the list of dest MACs indexed by the chain id on given port.
555
556         port_index: the port for which dest macs must be set
557         dest_macs: a list of dest MACs indexed by chain id
558         """
559         if len(dest_macs) != self.config.service_chain_count:
560             raise TrafficClientException('Dest MAC list %s must have %d entries' %
561                                          (dest_macs, self.config.service_chain_count))
562         self.devices[port_index].set_vtep_dst_mac(dest_macs)
563         LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs})
564
565     def get_dest_macs(self):
566         """Return the list of dest macs indexed by port."""
567         return [dev.get_dest_macs() for dev in self.devices]
568
569     def set_vlans(self, port_index, vlans):
570         """Set the list of vlans to use indexed by the chain id on given port.
571
572         port_index: the port for which VLANs must be set
573         vlans: a  list of vlan lists indexed by chain id
574         """
575         if len(vlans) != self.config.service_chain_count:
576             raise TrafficClientException('VLAN list %s must have %d entries' %
577                                          (vlans, self.config.service_chain_count))
578         self.devices[port_index].set_vlans(vlans)
579
580     def set_vxlans(self, port_index, vxlans):
581         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
582
583         port_index: the port for which VXLANs must be set
584         VXLANs: a  list of VNIs lists indexed by chain id
585         """
586         if len(vxlans) != self.config.service_chain_count:
587             raise TrafficClientException('VXLAN list %s must have %d entries' %
588                                          (vxlans, self.config.service_chain_count))
589         self.devices[port_index].set_vxlans(vxlans)
590
591     def set_mpls_inner_labels(self, port_index, labels):
592         """Set the list of MPLS Labels to use indexed by the chain id on given port.
593
594         port_index: the port for which Labels must be set
595         Labels: a list of Labels lists indexed by chain id
596         """
597         if len(labels) != self.config.service_chain_count:
598             raise TrafficClientException('Inner MPLS list %s must have %d entries' %
599                                          (labels, self.config.service_chain_count))
600         self.devices[port_index].set_mpls_inner_labels(labels)
601
602     def set_mpls_outer_labels(self, port_index, labels):
603         """Set the list of MPLS Labels to use indexed by the chain id on given port.
604
605         port_index: the port for which Labels must be set
606         Labels: a list of Labels lists indexed by chain id
607         """
608         if len(labels) != self.config.service_chain_count:
609             raise TrafficClientException('Outer MPLS list %s must have %d entries' %
610                                          (labels, self.config.service_chain_count))
611         self.devices[port_index].set_mpls_outer_labels(labels)
612
613     def set_vtep_vlan(self, port_index, vlan):
614         """Set the vtep vlan to use indexed by the chain id on given port.
615         port_index: the port for which VLAN must be set
616         """
617         self.devices[port_index].set_vtep_vlan(vlan)
618
619     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
620         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
621
622     def set_mpls_peers(self, port_index, src_ip, dst_ip):
623         self.devices[port_index].set_mpls_peers(src_ip, dst_ip)
624
625     @staticmethod
626     def __match_generator_profile(traffic_generator, generator_profile):
627         gen_config = AttrDict(traffic_generator)
628         gen_config.pop('default_profile')
629         gen_config.pop('generator_profile')
630         matching_profile = [profile for profile in traffic_generator.generator_profile if
631                             profile.name == generator_profile]
632         if len(matching_profile) != 1:
633             raise Exception('Traffic generator profile not found: ' + generator_profile)
634
635         gen_config.update(matching_profile[0])
636         return gen_config
637
638
639 class TrafficClient(object):
640     """Traffic generator client with NDR/PDR binary seearch."""
641
642     PORTS = [0, 1]
643
644     def __init__(self, config, notifier=None):
645         """Create a new TrafficClient instance.
646
647         config: nfvbench config
648         notifier: notifier (optional)
649
650         A new instance is created everytime the nfvbench config may have changed.
651         """
652         self.config = config
653         self.generator_config = GeneratorConfig(config)
654         self.tool = self.generator_config.tool
655         self.gen = self._get_generator()
656         self.notifier = notifier
657         self.interval_collector = None
658         self.iteration_collector = None
659         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
660                                     self.config.service_mode)
661         self.config.frame_sizes = self._get_frame_sizes()
662         self.run_config = {
663             'l2frame_size': None,
664             'duration_sec': self.config.duration_sec,
665             'bidirectional': True,
666             'rates': []  # to avoid unsbuscriptable-obj warning
667         }
668         self.current_total_rate = {'rate_percent': '10'}
669         if self.config.single_run:
670             self.current_total_rate = utils.parse_rate_str(self.config.rate)
671         self.ifstats = None
672         # Speed is either discovered when connecting to TG or set from config
673         # This variable is 0 if not yet discovered from TG or must be the speed of
674         # each interface in bits per second
675         self.intf_speed = self.generator_config.intf_speed
676
677     def _get_generator(self):
678         tool = self.tool.lower()
679         if tool == 'trex':
680             from .traffic_gen import trex_gen
681             return trex_gen.TRex(self)
682         if tool == 'dummy':
683             from .traffic_gen import dummy
684             return dummy.DummyTG(self)
685         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
686
687     def skip_sleep(self):
688         """Skip all sleeps when doing unit testing with dummy TG.
689
690         Must be overriden using mock.patch
691         """
692         return False
693
694     def _get_frame_sizes(self):
695         traffic_profile_name = self.config.traffic.profile
696         matching_profiles = [profile for profile in self.config.traffic_profile if
697                              profile.name == traffic_profile_name]
698         if len(matching_profiles) > 1:
699             raise TrafficClientException('Multiple traffic profiles with name: ' +
700                                          traffic_profile_name)
701         if not matching_profiles:
702             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
703         return matching_profiles[0].l2frame_size
704
705     def start_traffic_generator(self):
706         """Start the traffic generator process (traffic not started yet)."""
707         self.gen.connect()
708         # pick up the interface speed if it is not set from config
709         intf_speeds = self.gen.get_port_speed_gbps()
710         # convert Gbps unit into bps
711         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
712         if self.intf_speed:
713             # interface speed is overriden from config
714             if self.intf_speed != tg_if_speed:
715                 # Warn the user if the speed in the config is different
716                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
717                             intf_speeds[0])
718         else:
719             # interface speed not provisioned by config
720             self.intf_speed = tg_if_speed
721             # also update the speed in the tg config
722             self.generator_config.intf_speed = tg_if_speed
723
724         # Save the traffic generator local MAC
725         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
726             device.set_mac(mac)
727
728     def setup(self):
729         """Set up the traffic client."""
730         self.gen.clear_stats()
731
732     def get_version(self):
733         """Get the traffic generator version."""
734         return self.gen.get_version()
735
736     def ensure_end_to_end(self):
737         """Ensure traffic generator receives packets it has transmitted.
738
739         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
740
741         VMs that are started and in active state may not pass traffic yet. It is imperative to make
742         sure that all VMs are passing traffic in both directions before starting any benchmarking.
743         To verify this, we need to send at a low frequency bi-directional packets and make sure
744         that we receive all packets back from all VMs. The number of flows is equal to 2 times
745         the number of chains (1 per direction) and we need to make sure we receive packets coming
746         from exactly 2 x chain count different source MAC addresses.
747
748         Example:
749             PVP chain (1 VM per chain)
750             N = 10 (number of chains)
751             Flow count = 20 (number of flows)
752             If the number of unique source MAC addresses from received packets is 20 then
753             all 10 VMs 10 VMs are in operational state.
754         """
755         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
756         # send 2pps on each chain and each direction
757         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
758         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
759                                 e2e=True)
760         # ensures enough traffic is coming back
761         retry_count = int((self.config.check_traffic_time_sec +
762                            self.config.generic_poll_sec - 1) / self.config.generic_poll_sec)
763
764         # we expect to see packets coming from 2 unique MAC per chain
765         # because there can be flooding in the case of shared net
766         # we must verify that packets from the right VMs are received
767         # and not just count unique src MAC
768         # create a dict of (port, chain) tuples indexed by dest mac
769         mac_map = {}
770         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
771             for chain, mac in enumerate(dest_macs):
772                 mac_map[mac] = (port, chain)
773         unique_src_mac_count = len(mac_map)
774         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
775             get_mac_id = lambda packet: packet['binary'][60:66]
776         elif self.config.vxlan:
777             get_mac_id = lambda packet: packet['binary'][56:62]
778         elif self.config.mpls:
779             get_mac_id = lambda packet: packet['binary'][24:30]
780             # mpls_transport_label = lambda packet: packet['binary'][14:18]
781         else:
782             get_mac_id = lambda packet: packet['binary'][6:12]
783         for it in range(retry_count):
784             self.gen.clear_stats()
785             self.gen.start_traffic()
786             self.gen.start_capture()
787             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
788                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
789                      it + 1, retry_count)
790             if not self.skip_sleep():
791                 time.sleep(self.config.generic_poll_sec)
792             self.gen.stop_traffic()
793             self.gen.fetch_capture_packets()
794             self.gen.stop_capture()
795             for packet in self.gen.packet_list:
796                 mac_id = get_mac_id(packet).decode('latin-1')
797                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
798                 if self.config.mpls:
799                     if src_mac in mac_map and self.is_mpls(packet):
800                         port, chain = mac_map[src_mac]
801                         LOG.info('Received mpls packet from mac: %s (chain=%d, port=%d)',
802                                  src_mac, chain, port)
803                         mac_map.pop(src_mac, None)
804                 else:
805                     if src_mac in mac_map and self.is_udp(packet):
806                         port, chain = mac_map[src_mac]
807                         LOG.info('Received udp packet from mac: %s (chain=%d, port=%d)',
808                                  src_mac, chain, port)
809                         mac_map.pop(src_mac, None)
810
811                 if not mac_map:
812                     LOG.info('End-to-end connectivity established')
813                     return
814             if self.config.l3_router and not self.config.no_arp:
815                 # In case of L3 traffic mode, routers are not able to route traffic
816                 # until VM interfaces are up and ARP requests are done
817                 LOG.info('Waiting for loopback service completely started...')
818                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
819                 self.ensure_arp_successful()
820         raise TrafficClientException('End-to-end connectivity cannot be ensured')
821
822     def is_udp(self, packet):
823         pkt = Ether(packet['binary'])
824         return UDP in pkt
825
826     def is_mpls(self, packet):
827         pkt = Ether(packet['binary'])
828         return MPLS in pkt
829
830     def ensure_arp_successful(self):
831         """Resolve all IP using ARP and throw an exception in case of failure."""
832         dest_macs = self.gen.resolve_arp()
833         if dest_macs:
834             # all dest macs are discovered, saved them into the generator config
835             if self.config.vxlan or self.config.mpls:
836                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
837                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
838             else:
839                 self.generator_config.set_dest_macs(0, dest_macs[0])
840                 self.generator_config.set_dest_macs(1, dest_macs[1])
841         else:
842             raise TrafficClientException('ARP cannot be resolved')
843
844     def set_traffic(self, frame_size, bidirectional):
845         """Reconfigure the traffic generator for a new frame size."""
846         self.run_config['bidirectional'] = bidirectional
847         self.run_config['l2frame_size'] = frame_size
848         self.run_config['rates'] = [self.get_per_direction_rate()]
849         if bidirectional:
850             self.run_config['rates'].append(self.get_per_direction_rate())
851         else:
852             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
853             if unidir_reverse_pps > 0:
854                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
855         # Fix for [NFVBENCH-67], convert the rate string to PPS
856         for idx, rate in enumerate(self.run_config['rates']):
857             if 'rate_pps' not in rate:
858                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
859
860         self.gen.clear_streamblock()
861
862         if self.config.no_latency_streams:
863             LOG.info("Latency streams are disabled")
864         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
865                                 latency=not self.config.no_latency_streams)
866
867     def _modify_load(self, load):
868         self.current_total_rate = {'rate_percent': str(load)}
869         rate_per_direction = self.get_per_direction_rate()
870
871         self.gen.modify_rate(rate_per_direction, False)
872         self.run_config['rates'][0] = rate_per_direction
873         if self.run_config['bidirectional']:
874             self.gen.modify_rate(rate_per_direction, True)
875             self.run_config['rates'][1] = rate_per_direction
876
877     def get_ndr_and_pdr(self):
878         """Start the NDR/PDR iteration and return the results."""
879         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
880         targets = {}
881         if self.config.ndr_run:
882             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
883             targets['ndr'] = self.config.measurement.NDR
884         if self.config.pdr_run:
885             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
886             targets['pdr'] = self.config.measurement.PDR
887
888         self.run_config['start_time'] = time.time()
889         self.interval_collector = IntervalCollector(self.run_config['start_time'])
890         self.interval_collector.attach_notifier(self.notifier)
891         self.iteration_collector = IterationCollector(self.run_config['start_time'])
892         results = {}
893         self.__range_search(0.0, 200.0, targets, results)
894
895         results['iteration_stats'] = {
896             'ndr_pdr': self.iteration_collector.get()
897         }
898
899         if self.config.ndr_run:
900             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
901             results['ndr']['time_taken_sec'] = \
902                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
903             if self.config.pdr_run:
904                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
905                 results['pdr']['time_taken_sec'] = \
906                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
907         else:
908             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
909             results['pdr']['time_taken_sec'] = \
910                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
911         return results
912
913     def __get_dropped_rate(self, result):
914         dropped_pkts = result['rx']['dropped_pkts']
915         total_pkts = result['tx']['total_pkts']
916         if not total_pkts:
917             return float('inf')
918         return float(dropped_pkts) / total_pkts * 100
919
920     def get_stats(self):
921         """Collect final stats for previous run."""
922         stats = self.gen.get_stats(self.ifstats)
923         retDict = {'total_tx_rate': stats['total_tx_rate'],
924                    'offered_tx_rate_bps': stats['offered_tx_rate_bps']}
925
926         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
927         rx_keys = tx_keys + ['dropped_pkts']
928
929         for port in self.PORTS:
930             port_stats = {'tx': {}, 'rx': {}}
931             for key in tx_keys:
932                 port_stats['tx'][key] = int(stats[port]['tx'][key])
933             for key in rx_keys:
934                 try:
935                     port_stats['rx'][key] = int(stats[port]['rx'][key])
936                 except ValueError:
937                     port_stats['rx'][key] = 0
938             port_stats['rx']['avg_delay_usec'] = cast_integer(
939                 stats[port]['rx']['avg_delay_usec'])
940             port_stats['rx']['min_delay_usec'] = cast_integer(
941                 stats[port]['rx']['min_delay_usec'])
942             port_stats['rx']['max_delay_usec'] = cast_integer(
943                 stats[port]['rx']['max_delay_usec'])
944             port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats)
945             retDict[str(port)] = port_stats
946
947         ports = sorted(list(retDict.keys()), key=str)
948         if self.run_config['bidirectional']:
949             retDict['overall'] = {'tx': {}, 'rx': {}}
950             for key in tx_keys:
951                 retDict['overall']['tx'][key] = \
952                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
953             for key in rx_keys:
954                 retDict['overall']['rx'][key] = \
955                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
956             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
957                           retDict[ports[1]]['rx']['total_pkts']]
958             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
959                           retDict[ports[1]]['rx']['avg_delay_usec']]
960             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
961                           retDict[ports[1]]['rx']['max_delay_usec']]
962             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
963                           retDict[ports[1]]['rx']['min_delay_usec']]
964             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
965             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
966             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
967             for key in ['pkt_bit_rate', 'pkt_rate']:
968                 for dirc in ['tx', 'rx']:
969                     retDict['overall'][dirc][key] /= 2.0
970                 retDict['overall']['hdrh'] = stats.get('hdrh', None)
971                 if retDict['overall']['hdrh']:
972                     decoded_histogram = HdrHistogram.decode(retDict['overall']['hdrh'])
973                     # override min max and avg from hdrh
974                     retDict['overall']['rx']['min_delay_usec'] = decoded_histogram.get_min_value()
975                     retDict['overall']['rx']['max_delay_usec'] = decoded_histogram.get_max_value()
976                     retDict['overall']['rx']['avg_delay_usec'] = decoded_histogram.get_mean_value()
977                     retDict['overall']['rx']['lat_percentile'] = {}
978                     for percentile in self.config.lat_percentiles:
979                         retDict['overall']['rx']['lat_percentile'][percentile] = \
980                             decoded_histogram.get_value_at_percentile(percentile)
981
982         else:
983             retDict['overall'] = retDict[ports[0]]
984         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
985         return retDict
986
987     def __convert_rates(self, rate):
988         return utils.convert_rates(self.run_config['l2frame_size'],
989                                    rate,
990                                    self.intf_speed)
991
992     def __ndr_pdr_found(self, tag, load):
993         rates = self.__convert_rates({'rate_percent': load})
994         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
995         last_stats = self.iteration_collector.peek()
996         self.interval_collector.add_ndr_pdr(tag, last_stats)
997
998     def __format_output_stats(self, stats):
999         for key in self.PORTS + ['overall']:
1000             key = str(key)
1001             interface = stats[key]
1002             stats[key] = {
1003                 'tx_pkts': interface['tx']['total_pkts'],
1004                 'rx_pkts': interface['rx']['total_pkts'],
1005                 'drop_percentage': interface['drop_rate_percent'],
1006                 'drop_pct': interface['rx']['dropped_pkts'],
1007                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
1008                 'max_delay_usec': interface['rx']['max_delay_usec'],
1009                 'min_delay_usec': interface['rx']['min_delay_usec'],
1010             }
1011
1012             if key == 'overall':
1013                 stats[key]['hdrh'] = interface.get('hdrh', None)
1014                 if stats[key]['hdrh']:
1015                     decoded_histogram = HdrHistogram.decode(stats[key]['hdrh'])
1016                     # override min max and avg from hdrh
1017                     stats[key]['min_delay_usec'] = decoded_histogram.get_min_value()
1018                     stats[key]['max_delay_usec'] = decoded_histogram.get_max_value()
1019                     stats[key]['avg_delay_usec'] = decoded_histogram.get_mean_value()
1020                     stats[key]['lat_percentile'] = {}
1021                     for percentile in self.config.lat_percentiles:
1022                         stats[key]['lat_percentile'][percentile] = decoded_histogram.\
1023                             get_value_at_percentile(percentile)
1024
1025
1026         return stats
1027
1028     def __targets_found(self, rate, targets, results):
1029         for tag, target in list(targets.items()):
1030             LOG.info('Found %s (%s) load: %s', tag, target, rate)
1031             self.__ndr_pdr_found(tag, rate)
1032             results[tag]['timestamp_sec'] = time.time()
1033
1034     def __range_search(self, left, right, targets, results):
1035         """Perform a binary search for a list of targets inside a [left..right] range or rate.
1036
1037         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
1038                 indicating the rate to send on each interface
1039         right   the right side of the range to search as a % of line rate
1040                 indicating the rate to send on each interface
1041         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
1042                 ('ndr', 'pdr')
1043         results a dict to store results
1044         """
1045         if not targets:
1046             return
1047         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
1048
1049         # Terminate search when gap is less than load epsilon
1050         if right - left < self.config.measurement.load_epsilon:
1051             self.__targets_found(left, targets, results)
1052             return
1053
1054         # Obtain the average drop rate in for middle load
1055         middle = (left + right) / 2.0
1056         try:
1057             stats, rates = self.__run_search_iteration(middle)
1058         except STLError:
1059             LOG.exception("Got exception from traffic generator during binary search")
1060             self.__targets_found(left, targets, results)
1061             return
1062         # Split target dicts based on the avg drop rate
1063         left_targets = {}
1064         right_targets = {}
1065         for tag, target in list(targets.items()):
1066             if stats['overall']['drop_rate_percent'] <= target:
1067                 # record the best possible rate found for this target
1068                 results[tag] = rates
1069                 results[tag].update({
1070                     'load_percent_per_direction': middle,
1071                     'stats': self.__format_output_stats(dict(stats)),
1072                     'timestamp_sec': None
1073                 })
1074                 right_targets[tag] = target
1075             else:
1076                 # initialize to 0 all fields of result for
1077                 # the worst case scenario of the binary search (if ndr/pdr is not found)
1078                 if tag not in results:
1079                     results[tag] = dict.fromkeys(rates, 0)
1080                     empty_stats = self.__format_output_stats(dict(stats))
1081                     for key in empty_stats:
1082                         if isinstance(empty_stats[key], dict):
1083                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
1084                         else:
1085                             empty_stats[key] = 0
1086                     results[tag].update({
1087                         'load_percent_per_direction': 0,
1088                         'stats': empty_stats,
1089                         'timestamp_sec': None
1090                     })
1091                 left_targets[tag] = target
1092
1093         # search lower half
1094         self.__range_search(left, middle, left_targets, results)
1095
1096         # search upper half only if the upper rate does not exceed
1097         # 100%, this only happens when the first search at 100%
1098         # yields a DR that is < target DR
1099         if middle >= 100:
1100             self.__targets_found(100, right_targets, results)
1101         else:
1102             self.__range_search(middle, right, right_targets, results)
1103
1104     def __run_search_iteration(self, rate):
1105         """Run one iteration at the given rate level.
1106
1107         rate: the rate to send on each port in percent (0 to 100)
1108         """
1109         self._modify_load(rate)
1110
1111         # poll interval stats and collect them
1112         for stats in self.run_traffic():
1113             self.interval_collector.add(stats)
1114             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
1115             if time_elapsed_ratio >= 1:
1116                 self.cancel_traffic()
1117                 if not self.skip_sleep():
1118                     time.sleep(self.config.pause_sec)
1119         self.interval_collector.reset()
1120
1121         # get stats from the run
1122         stats = self.runner.client.get_stats()
1123         current_traffic_config = self._get_traffic_config()
1124         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
1125                                         stats['total_tx_rate'])
1126         if warning is not None:
1127             stats['warning'] = warning
1128
1129         # save reliable stats from whole iteration
1130         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
1131         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
1132         return stats, current_traffic_config['direction-total']
1133
1134     def log_stats(self, stats):
1135         """Log estimated stats during run."""
1136         # Calculate a rolling drop rate based on differential to
1137         # the previous reading
1138         cur_tx = stats['overall']['tx']['total_pkts']
1139         cur_rx = stats['overall']['rx']['total_pkts']
1140         delta_tx = cur_tx - self.prev_tx
1141         delta_rx = cur_rx - self.prev_rx
1142         drops = delta_tx - delta_rx
1143         drop_rate_pct = 100 * (delta_tx - delta_rx)/delta_tx
1144         self.prev_tx = cur_tx
1145         self.prev_rx = cur_rx
1146         LOG.info('TX: %15s; RX: %15s; (Est.) Dropped: %12s; Drop rate: %8.4f%%',
1147                  format(cur_tx, ',d'),
1148                  format(cur_rx, ',d'),
1149                  format(drops, ',d'),
1150                  drop_rate_pct)
1151
1152     def run_traffic(self):
1153         """Start traffic and return intermediate stats for each interval."""
1154         stats = self.runner.run()
1155         self.prev_tx = 0
1156         self.prev_rx = 0
1157         while self.runner.is_running:
1158             self.log_stats(stats)
1159             yield stats
1160             stats = self.runner.poll_stats()
1161             if stats is None:
1162                 return
1163         self.log_stats(stats)
1164         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
1165         yield stats
1166
1167     def cancel_traffic(self):
1168         """Stop traffic."""
1169         self.runner.stop()
1170
1171     def _get_traffic_config(self):
1172         config = {}
1173         load_total = 0.0
1174         bps_total = 0.0
1175         pps_total = 0.0
1176         for idx, rate in enumerate(self.run_config['rates']):
1177             key = 'direction-forward' if idx == 0 else 'direction-reverse'
1178             config[key] = {
1179                 'l2frame_size': self.run_config['l2frame_size'],
1180                 'duration_sec': self.run_config['duration_sec']
1181             }
1182             config[key].update(rate)
1183             config[key].update(self.__convert_rates(rate))
1184             load_total += float(config[key]['rate_percent'])
1185             bps_total += float(config[key]['rate_bps'])
1186             pps_total += float(config[key]['rate_pps'])
1187         config['direction-total'] = dict(config['direction-forward'])
1188         config['direction-total'].update({
1189             'rate_percent': load_total,
1190             'rate_pps': cast_integer(pps_total),
1191             'rate_bps': bps_total
1192         })
1193
1194         return config
1195
1196     def get_run_config(self, results):
1197         """Return configuration which was used for the last run."""
1198         r = {}
1199         # because we want each direction to have the far end RX rates,
1200         # use the far end index (1-idx) to retrieve the RX rates
1201         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
1202             tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec
1203             rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec
1204             r[key] = {
1205                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
1206                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
1207                 "rx": self.__convert_rates({'rate_pps': rx_rate})
1208             }
1209
1210         total = {}
1211         for direction in ['orig', 'tx', 'rx']:
1212             total[direction] = {}
1213             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
1214                 total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())])
1215
1216         r['direction-total'] = total
1217         return r
1218
1219     def insert_interface_stats(self, pps_list):
1220         """Insert interface stats to a list of packet path stats.
1221
1222         pps_list: a list of packet path stats instances indexed by chain index
1223
1224         This function will insert the packet path stats for the traffic gen ports 0 and 1
1225         with itemized per chain tx/rx counters.
1226         There will be as many packet path stats as chains.
1227         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1228         self.pps_list:
1229         [
1230         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1231         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1232         ...
1233         ]
1234         """
1235         def get_if_stats(chain_idx):
1236             return [InterfaceStats('p' + str(port), self.tool)
1237                     for port in range(2)]
1238         # keep the list of list of interface stats indexed by the chain id
1239         self.ifstats = [get_if_stats(chain_idx)
1240                         for chain_idx in range(self.config.service_chain_count)]
1241         # note that we need to make a copy of the ifs list so that any modification in the
1242         # list from pps will not change the list saved in self.ifstats
1243         self.pps_list = [PacketPathStats(self.config, list(ifs)) for ifs in self.ifstats]
1244         # insert the corresponding pps in the passed list
1245         pps_list.extend(self.pps_list)
1246
1247     def update_interface_stats(self, diff=False):
1248         """Update all interface stats.
1249
1250         diff: if False, simply refresh the interface stats values with latest values
1251               if True, diff the interface stats with the latest values
1252         Make sure that the interface stats inserted in insert_interface_stats() are updated
1253         with proper values.
1254         self.ifstats:
1255         [
1256         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1257         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1258         ...
1259         ]
1260         """
1261         if diff:
1262             stats = self.gen.get_stats(self.ifstats)
1263             for chain_idx, ifs in enumerate(self.ifstats):
1264                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1265                 # corresponding to the
1266                 # port 0 and port 1 for the given chain_idx
1267                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1268                 # interface stats for the pps because it could have been modified to contain
1269                 # additional interface stats
1270                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1271             # special handling for vxlan
1272             # in case of vxlan, flow stats are not available so all rx counters will be
1273             # zeros when the total rx port counter is non zero.
1274             # in that case,
1275             for port in range(2):
1276                 total_rx = 0
1277                 for ifs in self.ifstats:
1278                     total_rx += ifs[port].rx
1279                 if total_rx == 0:
1280                     # check if the total port rx from Trex is also zero
1281                     port_rx = stats[port]['rx']['total_pkts']
1282                     if port_rx:
1283                         # the total rx for all chains from port level stats is non zero
1284                         # which means that the per-chain stats are not available
1285                         if len(self.ifstats) == 1:
1286                             # only one chain, simply report the port level rx to the chain rx stats
1287                             self.ifstats[0][port].rx = port_rx
1288                         else:
1289                             for ifs in self.ifstats:
1290                                 # mark this data as unavailable
1291                                 ifs[port].rx = None
1292                             # pitch in the total rx only in the last chain pps
1293                             self.ifstats[-1][port].rx_total = port_rx
1294
1295     @staticmethod
1296     def compare_tx_rates(required, actual):
1297         """Compare the actual TX rate to the required TX rate."""
1298         threshold = 0.9
1299         are_different = False
1300         try:
1301             if float(actual) / required < threshold:
1302                 are_different = True
1303         except ZeroDivisionError:
1304             are_different = True
1305
1306         if are_different:
1307             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1308                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1309                   "to achieve the requested TX rate.".format(r=required, a=actual)
1310             LOG.info(msg)
1311             return msg
1312
1313         return None
1314
1315     def get_per_direction_rate(self):
1316         """Get the rate for each direction."""
1317         divisor = 2 if self.run_config['bidirectional'] else 1
1318         if 'rate_percent' in self.current_total_rate:
1319             # don't split rate if it's percentage
1320             divisor = 1
1321
1322         return utils.divide_rate(self.current_total_rate, divisor)
1323
1324     def close(self):
1325         """Close this instance."""
1326         try:
1327             self.gen.stop_traffic()
1328         except Exception:
1329             pass
1330         self.gen.clear_stats()
1331         self.gen.cleanup()