NFVBENCH-177: Add a config item 'user_info' and theoretical max rate value
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16 from math import gcd
17 import socket
18 import struct
19 import time
20
21 from attrdict import AttrDict
22 import bitmath
23 from hdrh.histogram import HdrHistogram
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: disable=wrong-import-order
30 from scapy.contrib.mpls import MPLS  # flake8: noqa
31 # pylint: enable=wrong-import-order
32 # pylint: enable=import-error
33
34 from .log import LOG
35 from .packet_stats import InterfaceStats
36 from .packet_stats import PacketPathStats
37 from .stats_collector import IntervalCollector
38 from .stats_collector import IterationCollector
39 from .traffic_gen import traffic_utils as utils
40 from .utils import cast_integer
41
42 class TrafficClientException(Exception):
43     """Generic traffic client exception."""
44
45 class TrafficRunner(object):
46     """Serialize various steps required to run traffic."""
47
48     def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
49         """Create a traffic runner."""
50         self.client = client
51         self.start_time = None
52         self.duration_sec = duration_sec
53         self.interval_sec = interval_sec
54         self.service_mode = service_mode
55
56     def run(self):
57         """Clear stats and instruct the traffic generator to start generating traffic."""
58         if self.is_running():
59             return None
60         LOG.info('Running traffic generator')
61         self.client.gen.clear_stats()
62         # Debug use only : new '--service-mode' option available for the NFVBench command line.
63         # A read-only mode TRex console would be able to capture the generated traffic.
64         self.client.gen.set_service_mode(enabled=self.service_mode)
65         LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
66         self.client.gen.start_traffic()
67         self.start_time = time.time()
68         return self.poll_stats()
69
70     def stop(self):
71         """Stop the current run and instruct the traffic generator to stop traffic."""
72         if self.is_running():
73             self.start_time = None
74             self.client.gen.stop_traffic()
75
76     def is_running(self):
77         """Check if a run is still pending."""
78         return self.start_time is not None
79
80     def time_elapsed(self):
81         """Return time elapsed since start of run."""
82         if self.is_running():
83             return time.time() - self.start_time
84         return self.duration_sec
85
86     def poll_stats(self):
87         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
88
89         return: latest stats or None if traffic is stopped
90         """
91         if not self.is_running():
92             return None
93         if self.client.skip_sleep():
94             self.stop()
95             return self.client.get_stats()
96         time_elapsed = self.time_elapsed()
97         if time_elapsed > self.duration_sec:
98             self.stop()
99             return None
100         time_left = self.duration_sec - time_elapsed
101         if self.interval_sec > 0.0:
102             if time_left <= self.interval_sec:
103                 time.sleep(time_left)
104                 self.stop()
105             else:
106                 time.sleep(self.interval_sec)
107         else:
108             time.sleep(self.duration_sec)
109             self.stop()
110         return self.client.get_stats()
111
112
113 class IpBlock(object):
114     """Manage a block of IP addresses."""
115
116     def __init__(self, base_ip, step_ip, count_ip):
117         """Create an IP block."""
118         self.base_ip_int = Device.ip_to_int(base_ip)
119         self.step = Device.ip_to_int(step_ip)
120         self.max_available = count_ip
121         self.next_free = 0
122
123     def get_ip(self, index=0):
124         """Return the IP address at given index."""
125         if index < 0 or index >= self.max_available:
126             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
127         return Device.int_to_ip(self.base_ip_int + index * self.step)
128
129     def reserve_ip_range(self, count, force_ip_reservation=False):
130         """Reserve a range of count consecutive IP addresses spaced by step.
131         force_ip_reservation parameter allows to continue the calculation of IPs when
132         the 2 sides (ports) have different size and the flow is greater than
133         the size as well.
134         """
135         if self.next_free + count > self.max_available and force_ip_reservation is False:
136             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
137                              (self.next_free,
138                               self.max_available,
139                               count))
140         if self.next_free + count > self.max_available and force_ip_reservation is True:
141             first_ip = self.get_ip(self.next_free)
142             last_ip = self.get_ip(self.next_free + self.max_available - 1)
143             self.next_free += self.max_available
144         else:
145             first_ip = self.get_ip(self.next_free)
146             last_ip = self.get_ip(self.next_free + count - 1)
147             self.next_free += count
148         return (first_ip, last_ip)
149
150     def reset_reservation(self):
151         """Reset all reservations and restart with a completely unused IP block."""
152         self.next_free = 0
153
154
155 class UdpPorts(object):
156
157     def __init__(self, src_min, src_max, dst_min, dst_max, step):
158
159         self.src_min = src_min
160         self.src_max = src_max
161         self.dst_min = dst_min
162         self.dst_max = dst_max
163         self.step = step
164
165
166 class Device(object):
167     """Represent a port device and all information associated to it.
168
169     In the curent version we only support 2 port devices for the traffic generator
170     identified as port 0 or port 1.
171     """
172
173     def __init__(self, port, generator_config):
174         """Create a new device for a given port."""
175         self.generator_config = generator_config
176         self.chain_count = generator_config.service_chain_count
177         if generator_config.bidirectional:
178             self.flow_count = generator_config.flow_count / 2
179         else:
180             self.flow_count = generator_config.flow_count
181
182         self.port = port
183         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
184         self.vtep_vlan = None
185         self.vtep_src_mac = None
186         self.vxlan = False
187         self.mpls = False
188         self.inner_labels = None
189         self.outer_labels = None
190         self.pci = generator_config.interfaces[port].pci
191         self.mac = None
192         self.dest_macs = None
193         self.vtep_dst_mac = None
194         self.vtep_dst_ip = None
195         if generator_config.vteps is None:
196             self.vtep_src_ip = None
197         else:
198             self.vtep_src_ip = generator_config.vteps[port]
199         self.vnis = None
200         self.vlans = None
201         self.ip_addrs = generator_config.ip_addrs[port]
202         self.ip_src_static = generator_config.ip_src_static
203         self.ip_addrs_step = generator_config.ip_addrs_step
204         if self.ip_addrs_step == 'random':
205             # Set step to 1 to calculate the IP range size (see check_ip_size below)
206             step = '0.0.0.1'
207         else:
208             step = self.ip_addrs_step
209         self.ip_size = self.check_ipsize(IPNetwork(self.ip_addrs).size, Device.ip_to_int(step))
210         self.ip = str(IPNetwork(self.ip_addrs).network)
211         ip_addrs_left = generator_config.ip_addrs[0]
212         ip_addrs_right = generator_config.ip_addrs[1]
213         self.ip_addrs_size = {
214             'left': self.check_ipsize(IPNetwork(ip_addrs_left).size, Device.ip_to_int(step)),
215             'right': self.check_ipsize(IPNetwork(ip_addrs_right).size, Device.ip_to_int(step))}
216         udp_src_port = generator_config.gen_config.udp_src_port
217         if udp_src_port is None:
218             udp_src_port = 53
219         udp_dst_port = generator_config.gen_config.udp_dst_port
220         if udp_dst_port is None:
221             udp_dst_port = 53
222         src_max, src_min = self.define_udp_range(udp_src_port, 'udp_src_port')
223         dst_max, dst_min = self.define_udp_range(udp_dst_port, 'udp_dst_port')
224         udp_src_range = int(src_max) - int(src_min) + 1
225         udp_dst_range = int(dst_max) - int(dst_min) + 1
226         lcm_port = self.lcm(udp_src_range, udp_dst_range)
227         if self.ip_src_static is True:
228             lcm_ip = self.lcm(1, min(self.ip_addrs_size['left'], self.ip_addrs_size['right']))
229         else:
230             lcm_ip = self.lcm(self.ip_addrs_size['left'], self.ip_addrs_size['right'])
231         flow_max = self.lcm(lcm_port, lcm_ip)
232         if self.flow_count > flow_max:
233             raise TrafficClientException('Trying to set unachievable traffic (%d > %d)' %
234                                          (self.flow_count, flow_max))
235
236         # manage udp range regarding flow count value
237         # UDP dst range is greater than FC => range will be limited to min + FC
238         if self.flow_count <= udp_dst_range:
239             dst_max = int(dst_min) + self.flow_count - 1
240         # UDP src range is greater than FC => range will be limited to min + FC
241         if self.flow_count <= udp_src_range:
242             src_max = int(src_min) + self.flow_count - 1
243         # Define IP block limit regarding flow count
244         if self.flow_count <= self.ip_size:
245             self.ip_block = IpBlock(self.ip, step, self.flow_count)
246         else:
247             self.ip_block = IpBlock(self.ip, step, self.ip_size)
248
249         self.udp_ports = UdpPorts(src_min, src_max, dst_min, dst_max,
250                                   generator_config.gen_config.udp_port_step)
251         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
252                                    generator_config.gateway_ip_addrs_step,
253                                    self.chain_count)
254         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
255         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
256                                       generator_config.tg_gateway_ip_addrs_step,
257                                       self.chain_count)
258
259     @staticmethod
260     def define_udp_range(udp_port, property_name):
261         if isinstance(udp_port, int):
262             min = udp_port
263             max = min
264         elif isinstance(udp_port, tuple):
265             min = udp_port[0]
266             max = udp_port[1]
267         else:
268             raise TrafficClientException('Invalid %s property value (53 or [\'53\',\'1024\'])'
269                                          % property_name)
270         return max, min
271
272     @staticmethod
273     def lcm(a, b):
274         """Calculate the maximum possible value for both IP and ports,
275         eventually for maximum possible flux."""
276         if a != 0 and b != 0:
277             lcm_value = a * b // gcd(a, b)
278             return lcm_value
279         raise TypeError(" IP size or port range can't be zero !")
280
281     @staticmethod
282     def check_ipsize(ip_size, step):
283         """Check and set the available IPs, considering the step."""
284         try:
285             if ip_size % step == 0:
286                 value = int(ip_size / step)
287             else:
288                 value = int((ip_size / step)) + 1
289             return value
290         except ZeroDivisionError:
291             raise ZeroDivisionError("step can't be zero !") from ZeroDivisionError
292
293     def set_mac(self, mac):
294         """Set the local MAC for this port device."""
295         if mac is None:
296             raise TrafficClientException('Trying to set traffic generator MAC address as None')
297         self.mac = mac
298
299     def get_peer_device(self):
300         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
301         return self.generator_config.devices[1 - self.port]
302
303     def set_vtep_dst_mac(self, dest_macs):
304         """Set the list of dest MACs indexed by the chain id.
305
306         This is only called in 2 cases:
307         - VM macs discovered using openstack API
308         - dest MACs provisioned in config file
309         """
310         self.vtep_dst_mac = list(map(str, dest_macs))
311
312     def set_dest_macs(self, dest_macs):
313         """Set the list of dest MACs indexed by the chain id.
314
315         This is only called in 2 cases:
316         - VM macs discovered using openstack API
317         - dest MACs provisioned in config file
318         """
319         self.dest_macs = list(map(str, dest_macs))
320
321     def get_dest_macs(self):
322         """Get the list of dest macs for this device.
323
324         If set_dest_macs was never called, assumes l2-loopback and return
325         a list of peer mac (as many as chains but normally only 1 chain)
326         """
327         if self.dest_macs:
328             return self.dest_macs
329         # assume this is l2-loopback
330         return [self.get_peer_device().mac] * self.chain_count
331
332     def set_vlans(self, vlans):
333         """Set the list of vlans to use indexed by the chain id."""
334         self.vlans = vlans
335         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
336
337     def set_vtep_vlan(self, vlan):
338         """Set the vtep vlan to use indexed by specific port."""
339         self.vtep_vlan = vlan
340         self.vxlan = True
341         self.vlan_tagging = None
342         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
343
344     def set_vxlan_endpoints(self, src_ip, dst_ip):
345         self.vtep_dst_ip = dst_ip
346         self.vtep_src_ip = src_ip
347         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
348                  self.vtep_src_ip, self.vtep_dst_ip)
349
350     def set_mpls_peers(self, src_ip, dst_ip):
351         self.mpls = True
352         self.vtep_dst_ip = dst_ip
353         self.vtep_src_ip = src_ip
354         LOG.info("Port %d: src_mpls_vtep %s, mpls_peer_ip %s", self.port,
355                  self.vtep_src_ip, self.vtep_dst_ip)
356
357     def set_vxlans(self, vnis):
358         self.vnis = vnis
359         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
360
361     def set_mpls_inner_labels(self, labels):
362         self.inner_labels = labels
363         LOG.info("Port %d: MPLS Inner Labels %s", self.port, self.inner_labels)
364
365     def set_mpls_outer_labels(self, labels):
366         self.outer_labels = labels
367         LOG.info("Port %d: MPLS Outer Labels %s", self.port, self.outer_labels)
368
369     def set_gw_ip(self, gateway_ip):
370         self.gw_ip_block = IpBlock(gateway_ip,
371                                    self.generator_config.gateway_ip_addrs_step,
372                                    self.chain_count)
373
374     def get_gw_ip(self, chain_index):
375         """Retrieve the IP address assigned for the gateway of a given chain."""
376         return self.gw_ip_block.get_ip(chain_index)
377
378     def get_stream_configs(self):
379         """Get the stream config for a given chain on this device.
380
381         Called by the traffic generator driver to program the traffic generator properly
382         before generating traffic
383         """
384         configs = []
385         # exact flow count for each chain is calculated as follows:
386         # - all chains except the first will have the same flow count
387         #   calculated as (total_flows + chain_count - 1) / chain_count
388         # - the first chain will have the remainder
389         # example 11 flows and 3 chains => 3, 4, 4
390         flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count)
391         cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1))
392         force_ip_reservation = False
393         # use case example of this parameter:
394         # - static IP addresses (source & destination), netmask = /30
395         # - 4 varying UDP source ports | 1 UDP destination port
396         # - Flow count = 8
397         # --> parameter 'reserve_ip_range' should have flag 'force_ip_reservation'
398         # in order to assign the maximum available IP on each iteration
399         if self.ip_size < cur_chain_flow_count \
400                 or self.ip_addrs_size['left'] != self.ip_addrs_size['right']:
401             force_ip_reservation = True
402
403         peer = self.get_peer_device()
404         self.ip_block.reset_reservation()
405         peer.ip_block.reset_reservation()
406         dest_macs = self.get_dest_macs()
407
408         for chain_idx in range(self.chain_count):
409             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range\
410             (cur_chain_flow_count, force_ip_reservation)
411             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range\
412             (cur_chain_flow_count, force_ip_reservation)
413
414             configs.append({
415                 'count': cur_chain_flow_count,
416                 'mac_src': self.mac,
417                 'mac_dst': dest_macs[chain_idx],
418                 'ip_src_addr': src_ip_first,
419                 'ip_src_addr_max': src_ip_last,
420                 'ip_src_count': cur_chain_flow_count,
421                 'ip_dst_addr': dst_ip_first,
422                 'ip_dst_addr_max': dst_ip_last,
423                 'ip_dst_count': cur_chain_flow_count,
424                 'ip_addrs_step': self.ip_addrs_step,
425                 'ip_src_static': self.ip_src_static,
426                 'udp_src_port': self.udp_ports.src_min,
427                 'udp_src_port_max': self.udp_ports.src_max,
428                 'udp_src_count': int(self.udp_ports.src_max) - int(self.udp_ports.src_min) + 1,
429                 'udp_dst_port': self.udp_ports.dst_min,
430                 'udp_dst_port_max': self.udp_ports.dst_max,
431                 'udp_dst_count': int(self.udp_ports.dst_max) - int(self.udp_ports.dst_min) + 1,
432                 'udp_port_step': self.udp_ports.step,
433                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
434                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
435                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
436                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
437                 'vxlan': self.vxlan,
438                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
439                 'vtep_src_mac': self.mac if (self.vxlan or self.mpls) else None,
440                 'vtep_dst_mac': self.vtep_dst_mac if (self.vxlan or self.mpls) else None,
441                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
442                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
443                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None,
444                 'mpls': self.mpls,
445                 'mpls_outer_label': self.outer_labels[chain_idx] if self.mpls is True else None,
446                 'mpls_inner_label': self.inner_labels[chain_idx] if self.mpls is True else None
447
448             })
449             # after first chain, fall back to the flow count for all other chains
450             cur_chain_flow_count = flows_per_chain
451         return configs
452
453     @staticmethod
454     def ip_to_int(addr):
455         """Convert an IP address from string to numeric."""
456         return struct.unpack("!I", socket.inet_aton(addr))[0]
457
458     @staticmethod
459     def int_to_ip(nvalue):
460         """Convert an IP address from numeric to string."""
461         return socket.inet_ntoa(struct.pack("!I", int(nvalue)))
462
463
464 class GeneratorConfig(object):
465     """Represents traffic configuration for currently running traffic profile."""
466
467     DEFAULT_IP_STEP = '0.0.0.1'
468     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
469
470     def __init__(self, config):
471         """Create a generator config."""
472         self.config = config
473         # name of the generator profile (normally trex or dummy)
474         # pick the default one if not specified explicitly from cli options
475         if not config.generator_profile:
476             config.generator_profile = config.traffic_generator.default_profile
477         # pick up the profile dict based on the name
478         gen_config = self.__match_generator_profile(config.traffic_generator,
479                                                     config.generator_profile)
480         self.gen_config = gen_config
481         # copy over fields from the dict
482         self.tool = gen_config.tool
483         self.ip = gen_config.ip
484         # overrides on config.cores and config.mbuf_factor
485         if config.cores:
486             self.cores = config.cores
487         else:
488             self.cores = gen_config.get('cores', 1)
489         # let's report the value actually used in the end
490         config.cores_used = self.cores
491         self.mbuf_factor = config.mbuf_factor
492         self.mbuf_64 = config.mbuf_64
493         self.hdrh = not config.disable_hdrh
494         if config.intf_speed:
495             # interface speed is overriden from the command line
496             self.intf_speed = config.intf_speed
497         elif gen_config.intf_speed:
498             # interface speed is overriden from the generator config
499             self.intf_speed = gen_config.intf_speed
500         else:
501             self.intf_speed = "auto"
502         if self.intf_speed == "auto" or self.intf_speed == "0":
503             # interface speed is discovered/provided by the traffic generator
504             self.intf_speed = 0
505         else:
506             self.intf_speed = bitmath.parse_string(self.intf_speed.replace('ps', '')).bits
507         self.name = gen_config.name
508         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
509         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
510         self.limit_memory = gen_config.get('limit_memory', 1024)
511         self.software_mode = gen_config.get('software_mode', False)
512         self.interfaces = gen_config.interfaces
513         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
514             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
515         self.service_chain = config.service_chain
516         self.service_chain_count = config.service_chain_count
517         self.flow_count = config.flow_count
518         self.host_name = gen_config.host_name
519         self.bidirectional = config.traffic.bidirectional
520         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
521         self.ip_addrs = gen_config.ip_addrs
522         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
523         self.tg_gateway_ip_addrs_step = \
524             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
525         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
526         self.gateway_ips = gen_config.gateway_ip_addrs
527         self.ip_src_static = gen_config.ip_src_static
528         self.vteps = gen_config.get('vteps')
529         self.devices = [Device(port, self) for port in [0, 1]]
530         # This should normally always be [0, 1]
531         self.ports = [device.port for device in self.devices]
532
533         # check that pci is not empty
534         if not gen_config.interfaces[0].get('pci', None) or \
535            not gen_config.interfaces[1].get('pci', None):
536             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
537
538         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
539         self.vlan_tagging = config.vlan_tagging
540
541         # needed for result/summarizer
542         config['tg-name'] = gen_config.name
543         config['tg-tool'] = self.tool
544
545     def to_json(self):
546         """Get json form to display the content into the overall result dict."""
547         return dict(self.gen_config)
548
549     def set_dest_macs(self, port_index, dest_macs):
550         """Set the list of dest MACs indexed by the chain id on given port.
551
552         port_index: the port for which dest macs must be set
553         dest_macs: a list of dest MACs indexed by chain id
554         """
555         if len(dest_macs) < self.config.service_chain_count:
556             raise TrafficClientException('Dest MAC list %s must have %d entries' %
557                                          (dest_macs, self.config.service_chain_count))
558         # only pass the first scc dest MACs
559         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
560         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
561
562     def set_vtep_dest_macs(self, port_index, dest_macs):
563         """Set the list of dest MACs indexed by the chain id on given port.
564
565         port_index: the port for which dest macs must be set
566         dest_macs: a list of dest MACs indexed by chain id
567         """
568         if len(dest_macs) != self.config.service_chain_count:
569             raise TrafficClientException('Dest MAC list %s must have %d entries' %
570                                          (dest_macs, self.config.service_chain_count))
571         self.devices[port_index].set_vtep_dst_mac(dest_macs)
572         LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs})
573
574     def get_dest_macs(self):
575         """Return the list of dest macs indexed by port."""
576         return [dev.get_dest_macs() for dev in self.devices]
577
578     def set_vlans(self, port_index, vlans):
579         """Set the list of vlans to use indexed by the chain id on given port.
580
581         port_index: the port for which VLANs must be set
582         vlans: a  list of vlan lists indexed by chain id
583         """
584         if len(vlans) != self.config.service_chain_count:
585             raise TrafficClientException('VLAN list %s must have %d entries' %
586                                          (vlans, self.config.service_chain_count))
587         self.devices[port_index].set_vlans(vlans)
588
589     def set_vxlans(self, port_index, vxlans):
590         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
591
592         port_index: the port for which VXLANs must be set
593         VXLANs: a  list of VNIs lists indexed by chain id
594         """
595         if len(vxlans) != self.config.service_chain_count:
596             raise TrafficClientException('VXLAN list %s must have %d entries' %
597                                          (vxlans, self.config.service_chain_count))
598         self.devices[port_index].set_vxlans(vxlans)
599
600     def set_mpls_inner_labels(self, port_index, labels):
601         """Set the list of MPLS Labels to use indexed by the chain id on given port.
602
603         port_index: the port for which Labels must be set
604         Labels: a list of Labels lists indexed by chain id
605         """
606         if len(labels) != self.config.service_chain_count:
607             raise TrafficClientException('Inner MPLS list %s must have %d entries' %
608                                          (labels, self.config.service_chain_count))
609         self.devices[port_index].set_mpls_inner_labels(labels)
610
611     def set_mpls_outer_labels(self, port_index, labels):
612         """Set the list of MPLS Labels to use indexed by the chain id on given port.
613
614         port_index: the port for which Labels must be set
615         Labels: a list of Labels lists indexed by chain id
616         """
617         if len(labels) != self.config.service_chain_count:
618             raise TrafficClientException('Outer MPLS list %s must have %d entries' %
619                                          (labels, self.config.service_chain_count))
620         self.devices[port_index].set_mpls_outer_labels(labels)
621
622     def set_vtep_vlan(self, port_index, vlan):
623         """Set the vtep vlan to use indexed by the chain id on given port.
624         port_index: the port for which VLAN must be set
625         """
626         self.devices[port_index].set_vtep_vlan(vlan)
627
628     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
629         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
630
631     def set_mpls_peers(self, port_index, src_ip, dst_ip):
632         self.devices[port_index].set_mpls_peers(src_ip, dst_ip)
633
634     @staticmethod
635     def __match_generator_profile(traffic_generator, generator_profile):
636         gen_config = AttrDict(traffic_generator)
637         gen_config.pop('default_profile')
638         gen_config.pop('generator_profile')
639         matching_profile = [profile for profile in traffic_generator.generator_profile if
640                             profile.name == generator_profile]
641         if len(matching_profile) != 1:
642             raise Exception('Traffic generator profile not found: ' + generator_profile)
643
644         gen_config.update(matching_profile[0])
645         return gen_config
646
647
648 class TrafficClient(object):
649     """Traffic generator client with NDR/PDR binary seearch."""
650
651     PORTS = [0, 1]
652
653     def __init__(self, config, notifier=None):
654         """Create a new TrafficClient instance.
655
656         config: nfvbench config
657         notifier: notifier (optional)
658
659         A new instance is created everytime the nfvbench config may have changed.
660         """
661         self.config = config
662         self.generator_config = GeneratorConfig(config)
663         self.tool = self.generator_config.tool
664         self.gen = self._get_generator()
665         self.notifier = notifier
666         self.interval_collector = None
667         self.iteration_collector = None
668         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
669                                     self.config.service_mode)
670         self.config.frame_sizes = self._get_frame_sizes()
671         self.run_config = {
672             'l2frame_size': None,
673             'duration_sec': self.config.duration_sec,
674             'bidirectional': True,
675             'rates': []  # to avoid unsbuscriptable-obj warning
676         }
677         self.current_total_rate = {'rate_percent': '10'}
678         if self.config.single_run:
679             self.current_total_rate = utils.parse_rate_str(self.config.rate)
680         self.ifstats = None
681         # Speed is either discovered when connecting to TG or set from config
682         # This variable is 0 if not yet discovered from TG or must be the speed of
683         # each interface in bits per second
684         self.intf_speed = self.generator_config.intf_speed
685
686     def _get_generator(self):
687         tool = self.tool.lower()
688         if tool == 'trex':
689             from .traffic_gen import trex_gen
690             return trex_gen.TRex(self)
691         if tool == 'dummy':
692             from .traffic_gen import dummy
693             return dummy.DummyTG(self)
694         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
695
696     def skip_sleep(self):
697         """Skip all sleeps when doing unit testing with dummy TG.
698
699         Must be overriden using mock.patch
700         """
701         return False
702
703     def _get_frame_sizes(self):
704         traffic_profile_name = self.config.traffic.profile
705         matching_profiles = [profile for profile in self.config.traffic_profile if
706                              profile.name == traffic_profile_name]
707         if len(matching_profiles) > 1:
708             raise TrafficClientException('Multiple traffic profiles with name: ' +
709                                          traffic_profile_name)
710         if not matching_profiles:
711             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
712         return matching_profiles[0].l2frame_size
713
714     def start_traffic_generator(self):
715         """Start the traffic generator process (traffic not started yet)."""
716         self.gen.connect()
717         # pick up the interface speed if it is not set from config
718         intf_speeds = self.gen.get_port_speed_gbps()
719         # convert Gbps unit into bps
720         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
721         if self.intf_speed:
722             # interface speed is overriden from config
723             if self.intf_speed != tg_if_speed:
724                 # Warn the user if the speed in the config is different
725                 LOG.warning(
726                     'Interface speed provided (%g Gbps) is different from actual speed (%d Gbps)',
727                     self.intf_speed / 1000000000.0, intf_speeds[0])
728         else:
729             # interface speed not provisioned by config
730             self.intf_speed = tg_if_speed
731             # also update the speed in the tg config
732             self.generator_config.intf_speed = tg_if_speed
733         # let's report detected and actually used interface speed
734         self.config.intf_speed_detected = tg_if_speed
735         self.config.intf_speed_used = self.intf_speed
736
737         # Save the traffic generator local MAC
738         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
739             device.set_mac(mac)
740
741     def setup(self):
742         """Set up the traffic client."""
743         self.gen.clear_stats()
744
745     def get_version(self):
746         """Get the traffic generator version."""
747         return self.gen.get_version()
748
749     def ensure_end_to_end(self):
750         """Ensure traffic generator receives packets it has transmitted.
751
752         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
753
754         VMs that are started and in active state may not pass traffic yet. It is imperative to make
755         sure that all VMs are passing traffic in both directions before starting any benchmarking.
756         To verify this, we need to send at a low frequency bi-directional packets and make sure
757         that we receive all packets back from all VMs. The number of flows is equal to 2 times
758         the number of chains (1 per direction) and we need to make sure we receive packets coming
759         from exactly 2 x chain count different source MAC addresses.
760
761         Example:
762             PVP chain (1 VM per chain)
763             N = 10 (number of chains)
764             Flow count = 20 (number of flows)
765             If the number of unique source MAC addresses from received packets is 20 then
766             all 10 VMs 10 VMs are in operational state.
767         """
768         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
769         # send 2pps on each chain and each direction
770         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
771         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
772                                 e2e=True)
773         # ensures enough traffic is coming back
774         retry_count = int((self.config.check_traffic_time_sec +
775                            self.config.generic_poll_sec - 1) / self.config.generic_poll_sec)
776
777         # we expect to see packets coming from 2 unique MAC per chain
778         # because there can be flooding in the case of shared net
779         # we must verify that packets from the right VMs are received
780         # and not just count unique src MAC
781         # create a dict of (port, chain) tuples indexed by dest mac
782         mac_map = {}
783         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
784             for chain, mac in enumerate(dest_macs):
785                 mac_map[mac] = (port, chain)
786         unique_src_mac_count = len(mac_map)
787         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
788             get_mac_id = lambda packet: packet['binary'][60:66]
789         elif self.config.vxlan:
790             get_mac_id = lambda packet: packet['binary'][56:62]
791         elif self.config.mpls:
792             get_mac_id = lambda packet: packet['binary'][24:30]
793             # mpls_transport_label = lambda packet: packet['binary'][14:18]
794         else:
795             get_mac_id = lambda packet: packet['binary'][6:12]
796         for it in range(retry_count):
797             self.gen.clear_stats()
798             self.gen.start_traffic()
799             self.gen.start_capture()
800             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
801                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
802                      it + 1, retry_count)
803             if not self.skip_sleep():
804                 time.sleep(self.config.generic_poll_sec)
805             self.gen.stop_traffic()
806             self.gen.fetch_capture_packets()
807             self.gen.stop_capture()
808             for packet in self.gen.packet_list:
809                 mac_id = get_mac_id(packet).decode('latin-1')
810                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
811                 if self.config.mpls:
812                     if src_mac in mac_map and self.is_mpls(packet):
813                         port, chain = mac_map[src_mac]
814                         LOG.info('Received mpls packet from mac: %s (chain=%d, port=%d)',
815                                  src_mac, chain, port)
816                         mac_map.pop(src_mac, None)
817                 else:
818                     if src_mac in mac_map and self.is_udp(packet):
819                         port, chain = mac_map[src_mac]
820                         LOG.info('Received udp packet from mac: %s (chain=%d, port=%d)',
821                                  src_mac, chain, port)
822                         mac_map.pop(src_mac, None)
823
824                 if not mac_map:
825                     LOG.info('End-to-end connectivity established')
826                     return
827             if self.config.l3_router and not self.config.no_arp:
828                 # In case of L3 traffic mode, routers are not able to route traffic
829                 # until VM interfaces are up and ARP requests are done
830                 LOG.info('Waiting for loopback service completely started...')
831                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
832                 self.ensure_arp_successful()
833         raise TrafficClientException('End-to-end connectivity cannot be ensured')
834
835     def is_udp(self, packet):
836         pkt = Ether(packet['binary'])
837         return UDP in pkt
838
839     def is_mpls(self, packet):
840         pkt = Ether(packet['binary'])
841         return MPLS in pkt
842
843     def ensure_arp_successful(self):
844         """Resolve all IP using ARP and throw an exception in case of failure."""
845         dest_macs = self.gen.resolve_arp()
846         if dest_macs:
847             # all dest macs are discovered, saved them into the generator config
848             if self.config.vxlan or self.config.mpls:
849                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
850                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
851             else:
852                 self.generator_config.set_dest_macs(0, dest_macs[0])
853                 self.generator_config.set_dest_macs(1, dest_macs[1])
854         else:
855             raise TrafficClientException('ARP cannot be resolved')
856
857     def set_traffic(self, frame_size, bidirectional):
858         """Reconfigure the traffic generator for a new frame size."""
859         self.run_config['bidirectional'] = bidirectional
860         self.run_config['l2frame_size'] = frame_size
861         self.run_config['rates'] = [self.get_per_direction_rate()]
862         if bidirectional:
863             self.run_config['rates'].append(self.get_per_direction_rate())
864         else:
865             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
866             if unidir_reverse_pps > 0:
867                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
868         # Fix for [NFVBENCH-67], convert the rate string to PPS
869         for idx, rate in enumerate(self.run_config['rates']):
870             if 'rate_pps' not in rate:
871                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
872
873         self.gen.clear_streamblock()
874
875         if self.config.no_latency_streams:
876             LOG.info("Latency streams are disabled")
877         # in service mode, we must disable flow stats (e2e=True)
878         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
879                                 latency=not self.config.no_latency_streams,
880                                 e2e=self.runner.service_mode)
881
882     def _modify_load(self, load):
883         self.current_total_rate = {'rate_percent': str(load)}
884         rate_per_direction = self.get_per_direction_rate()
885
886         self.gen.modify_rate(rate_per_direction, False)
887         self.run_config['rates'][0] = rate_per_direction
888         if self.run_config['bidirectional']:
889             self.gen.modify_rate(rate_per_direction, True)
890             self.run_config['rates'][1] = rate_per_direction
891
892     def get_ndr_and_pdr(self):
893         """Start the NDR/PDR iteration and return the results."""
894         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
895         targets = {}
896         if self.config.ndr_run:
897             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
898             targets['ndr'] = self.config.measurement.NDR
899         if self.config.pdr_run:
900             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
901             targets['pdr'] = self.config.measurement.PDR
902
903         self.run_config['start_time'] = time.time()
904         self.interval_collector = IntervalCollector(self.run_config['start_time'])
905         self.interval_collector.attach_notifier(self.notifier)
906         self.iteration_collector = IterationCollector(self.run_config['start_time'])
907         results = {}
908         self.__range_search(0.0, 200.0, targets, results)
909
910         results['iteration_stats'] = {
911             'ndr_pdr': self.iteration_collector.get()
912         }
913
914         if self.config.ndr_run:
915             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
916             results['ndr']['time_taken_sec'] = \
917                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
918             if self.config.pdr_run:
919                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
920                 results['pdr']['time_taken_sec'] = \
921                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
922         else:
923             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
924             results['pdr']['time_taken_sec'] = \
925                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
926         return results
927
928     def __get_dropped_rate(self, result):
929         dropped_pkts = result['rx']['dropped_pkts']
930         total_pkts = result['tx']['total_pkts']
931         if not total_pkts:
932             return float('inf')
933         return float(dropped_pkts) / total_pkts * 100
934
935     def get_stats(self):
936         """Collect final stats for previous run."""
937         stats = self.gen.get_stats(self.ifstats)
938         retDict = {'total_tx_rate': stats['total_tx_rate'],
939                    'offered_tx_rate_bps': stats['offered_tx_rate_bps'],
940                    'theoretical_tx_rate_bps': stats['theoretical_tx_rate_bps'],
941                    'theoretical_tx_rate_pps': stats['theoretical_tx_rate_pps']}
942
943         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
944         rx_keys = tx_keys + ['dropped_pkts']
945
946         for port in self.PORTS:
947             port_stats = {'tx': {}, 'rx': {}}
948             for key in tx_keys:
949                 port_stats['tx'][key] = int(stats[port]['tx'][key])
950             for key in rx_keys:
951                 try:
952                     port_stats['rx'][key] = int(stats[port]['rx'][key])
953                 except ValueError:
954                     port_stats['rx'][key] = 0
955             port_stats['rx']['avg_delay_usec'] = cast_integer(
956                 stats[port]['rx']['avg_delay_usec'])
957             port_stats['rx']['min_delay_usec'] = cast_integer(
958                 stats[port]['rx']['min_delay_usec'])
959             port_stats['rx']['max_delay_usec'] = cast_integer(
960                 stats[port]['rx']['max_delay_usec'])
961             port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats)
962             retDict[str(port)] = port_stats
963
964         ports = sorted(list(retDict.keys()), key=str)
965         if self.run_config['bidirectional']:
966             retDict['overall'] = {'tx': {}, 'rx': {}}
967             for key in tx_keys:
968                 retDict['overall']['tx'][key] = \
969                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
970             for key in rx_keys:
971                 retDict['overall']['rx'][key] = \
972                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
973             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
974                           retDict[ports[1]]['rx']['total_pkts']]
975             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
976                           retDict[ports[1]]['rx']['avg_delay_usec']]
977             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
978                           retDict[ports[1]]['rx']['max_delay_usec']]
979             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
980                           retDict[ports[1]]['rx']['min_delay_usec']]
981             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
982             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
983             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
984             for key in ['pkt_bit_rate', 'pkt_rate']:
985                 for dirc in ['tx', 'rx']:
986                     retDict['overall'][dirc][key] /= 2.0
987                 retDict['overall']['hdrh'] = stats.get('hdrh', None)
988                 if retDict['overall']['hdrh']:
989                     decoded_histogram = HdrHistogram.decode(retDict['overall']['hdrh'])
990                     # override min max and avg from hdrh
991                     retDict['overall']['rx']['min_delay_usec'] = decoded_histogram.get_min_value()
992                     retDict['overall']['rx']['max_delay_usec'] = decoded_histogram.get_max_value()
993                     retDict['overall']['rx']['avg_delay_usec'] = decoded_histogram.get_mean_value()
994                     retDict['overall']['rx']['lat_percentile'] = {}
995                     for percentile in self.config.lat_percentiles:
996                         retDict['overall']['rx']['lat_percentile'][percentile] = \
997                             decoded_histogram.get_value_at_percentile(percentile)
998
999         else:
1000             retDict['overall'] = retDict[ports[0]]
1001         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
1002         return retDict
1003
1004     def __convert_rates(self, rate):
1005         return utils.convert_rates(self.run_config['l2frame_size'],
1006                                    rate,
1007                                    self.intf_speed)
1008
1009     def __ndr_pdr_found(self, tag, load):
1010         rates = self.__convert_rates({'rate_percent': load})
1011         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
1012         last_stats = self.iteration_collector.peek()
1013         self.interval_collector.add_ndr_pdr(tag, last_stats)
1014
1015     def __format_output_stats(self, stats):
1016         for key in self.PORTS + ['overall']:
1017             key = str(key)
1018             interface = stats[key]
1019             stats[key] = {
1020                 'tx_pkts': interface['tx']['total_pkts'],
1021                 'rx_pkts': interface['rx']['total_pkts'],
1022                 'drop_percentage': interface['drop_rate_percent'],
1023                 'drop_pct': interface['rx']['dropped_pkts'],
1024                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
1025                 'max_delay_usec': interface['rx']['max_delay_usec'],
1026                 'min_delay_usec': interface['rx']['min_delay_usec'],
1027             }
1028
1029             if key == 'overall':
1030                 stats[key]['hdrh'] = interface.get('hdrh', None)
1031                 if stats[key]['hdrh']:
1032                     decoded_histogram = HdrHistogram.decode(stats[key]['hdrh'])
1033                     # override min max and avg from hdrh
1034                     stats[key]['min_delay_usec'] = decoded_histogram.get_min_value()
1035                     stats[key]['max_delay_usec'] = decoded_histogram.get_max_value()
1036                     stats[key]['avg_delay_usec'] = decoded_histogram.get_mean_value()
1037                     stats[key]['lat_percentile'] = {}
1038                     for percentile in self.config.lat_percentiles:
1039                         stats[key]['lat_percentile'][percentile] = decoded_histogram.\
1040                             get_value_at_percentile(percentile)
1041
1042
1043         return stats
1044
1045     def __targets_found(self, rate, targets, results):
1046         for tag, target in list(targets.items()):
1047             LOG.info('Found %s (%s) load: %s', tag, target, rate)
1048             self.__ndr_pdr_found(tag, rate)
1049             results[tag]['timestamp_sec'] = time.time()
1050
1051     def __range_search(self, left, right, targets, results):
1052         """Perform a binary search for a list of targets inside a [left..right] range or rate.
1053
1054         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
1055                 indicating the rate to send on each interface
1056         right   the right side of the range to search as a % of line rate
1057                 indicating the rate to send on each interface
1058         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
1059                 ('ndr', 'pdr')
1060         results a dict to store results
1061         """
1062         if not targets:
1063             return
1064         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
1065
1066         # Terminate search when gap is less than load epsilon
1067         if right - left < self.config.measurement.load_epsilon:
1068             self.__targets_found(left, targets, results)
1069             return
1070
1071         # Obtain the average drop rate in for middle load
1072         middle = (left + right) / 2.0
1073         try:
1074             stats, rates = self.__run_search_iteration(middle)
1075         except STLError:
1076             LOG.exception("Got exception from traffic generator during binary search")
1077             self.__targets_found(left, targets, results)
1078             return
1079         # Split target dicts based on the avg drop rate
1080         left_targets = {}
1081         right_targets = {}
1082         for tag, target in list(targets.items()):
1083             if stats['overall']['drop_rate_percent'] <= target:
1084                 # record the best possible rate found for this target
1085                 results[tag] = rates
1086                 results[tag].update({
1087                     'load_percent_per_direction': middle,
1088                     'stats': self.__format_output_stats(dict(stats)),
1089                     'timestamp_sec': None
1090                 })
1091                 right_targets[tag] = target
1092             else:
1093                 # initialize to 0 all fields of result for
1094                 # the worst case scenario of the binary search (if ndr/pdr is not found)
1095                 if tag not in results:
1096                     results[tag] = dict.fromkeys(rates, 0)
1097                     empty_stats = self.__format_output_stats(dict(stats))
1098                     for key in empty_stats:
1099                         if isinstance(empty_stats[key], dict):
1100                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
1101                         else:
1102                             empty_stats[key] = 0
1103                     results[tag].update({
1104                         'load_percent_per_direction': 0,
1105                         'stats': empty_stats,
1106                         'timestamp_sec': None
1107                     })
1108                 left_targets[tag] = target
1109
1110         # search lower half
1111         self.__range_search(left, middle, left_targets, results)
1112
1113         # search upper half only if the upper rate does not exceed
1114         # 100%, this only happens when the first search at 100%
1115         # yields a DR that is < target DR
1116         if middle >= 100:
1117             self.__targets_found(100, right_targets, results)
1118         else:
1119             self.__range_search(middle, right, right_targets, results)
1120
1121     def __run_search_iteration(self, rate):
1122         """Run one iteration at the given rate level.
1123
1124         rate: the rate to send on each port in percent (0 to 100)
1125         """
1126         self._modify_load(rate)
1127
1128         # poll interval stats and collect them
1129         for stats in self.run_traffic():
1130             self.interval_collector.add(stats)
1131             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
1132             if time_elapsed_ratio >= 1:
1133                 self.cancel_traffic()
1134                 if not self.skip_sleep():
1135                     time.sleep(self.config.pause_sec)
1136         self.interval_collector.reset()
1137
1138         # get stats from the run
1139         stats = self.runner.client.get_stats()
1140         current_traffic_config = self._get_traffic_config()
1141         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
1142                                         stats['total_tx_rate'])
1143         if warning is not None:
1144             stats['warning'] = warning
1145
1146         # save reliable stats from whole iteration
1147         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
1148         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
1149         return stats, current_traffic_config['direction-total']
1150
1151     def log_stats(self, stats):
1152         """Log estimated stats during run."""
1153         # Calculate a rolling drop rate based on differential to
1154         # the previous reading
1155         cur_tx = stats['overall']['tx']['total_pkts']
1156         cur_rx = stats['overall']['rx']['total_pkts']
1157         delta_tx = cur_tx - self.prev_tx
1158         delta_rx = cur_rx - self.prev_rx
1159         drops = delta_tx - delta_rx
1160         drop_rate_pct = 100 * (delta_tx - delta_rx)/delta_tx
1161         self.prev_tx = cur_tx
1162         self.prev_rx = cur_rx
1163         LOG.info('TX: %15s; RX: %15s; (Est.) Dropped: %12s; Drop rate: %8.4f%%',
1164                  format(cur_tx, ',d'),
1165                  format(cur_rx, ',d'),
1166                  format(drops, ',d'),
1167                  drop_rate_pct)
1168
1169     def run_traffic(self):
1170         """Start traffic and return intermediate stats for each interval."""
1171         stats = self.runner.run()
1172         self.prev_tx = 0
1173         self.prev_rx = 0
1174         while self.runner.is_running:
1175             self.log_stats(stats)
1176             yield stats
1177             stats = self.runner.poll_stats()
1178             if stats is None:
1179                 return
1180         self.log_stats(stats)
1181         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
1182         yield stats
1183
1184     def cancel_traffic(self):
1185         """Stop traffic."""
1186         self.runner.stop()
1187
1188     def _get_traffic_config(self):
1189         config = {}
1190         load_total = 0.0
1191         bps_total = 0.0
1192         pps_total = 0.0
1193         for idx, rate in enumerate(self.run_config['rates']):
1194             key = 'direction-forward' if idx == 0 else 'direction-reverse'
1195             config[key] = {
1196                 'l2frame_size': self.run_config['l2frame_size'],
1197                 'duration_sec': self.run_config['duration_sec']
1198             }
1199             config[key].update(rate)
1200             config[key].update(self.__convert_rates(rate))
1201             load_total += float(config[key]['rate_percent'])
1202             bps_total += float(config[key]['rate_bps'])
1203             pps_total += float(config[key]['rate_pps'])
1204         config['direction-total'] = dict(config['direction-forward'])
1205         config['direction-total'].update({
1206             'rate_percent': load_total,
1207             'rate_pps': cast_integer(pps_total),
1208             'rate_bps': bps_total
1209         })
1210
1211         return config
1212
1213     def get_run_config(self, results):
1214         """Return configuration which was used for the last run."""
1215         r = {}
1216         # because we want each direction to have the far end RX rates,
1217         # use the far end index (1-idx) to retrieve the RX rates
1218         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
1219             tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec
1220             rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec
1221             r[key] = {
1222                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
1223                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
1224                 "rx": self.__convert_rates({'rate_pps': rx_rate})
1225             }
1226
1227         total = {}
1228         for direction in ['orig', 'tx', 'rx']:
1229             total[direction] = {}
1230             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
1231                 total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())])
1232
1233         r['direction-total'] = total
1234         return r
1235
1236     def insert_interface_stats(self, pps_list):
1237         """Insert interface stats to a list of packet path stats.
1238
1239         pps_list: a list of packet path stats instances indexed by chain index
1240
1241         This function will insert the packet path stats for the traffic gen ports 0 and 1
1242         with itemized per chain tx/rx counters.
1243         There will be as many packet path stats as chains.
1244         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1245         self.pps_list:
1246         [
1247         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1248         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1249         ...
1250         ]
1251         """
1252         def get_if_stats(chain_idx):
1253             return [InterfaceStats('p' + str(port), self.tool)
1254                     for port in range(2)]
1255         # keep the list of list of interface stats indexed by the chain id
1256         self.ifstats = [get_if_stats(chain_idx)
1257                         for chain_idx in range(self.config.service_chain_count)]
1258         # note that we need to make a copy of the ifs list so that any modification in the
1259         # list from pps will not change the list saved in self.ifstats
1260         self.pps_list = [PacketPathStats(self.config, list(ifs)) for ifs in self.ifstats]
1261         # insert the corresponding pps in the passed list
1262         pps_list.extend(self.pps_list)
1263
1264     def update_interface_stats(self, diff=False):
1265         """Update all interface stats.
1266
1267         diff: if False, simply refresh the interface stats values with latest values
1268               if True, diff the interface stats with the latest values
1269         Make sure that the interface stats inserted in insert_interface_stats() are updated
1270         with proper values.
1271         self.ifstats:
1272         [
1273         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1274         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1275         ...
1276         ]
1277         """
1278         if diff:
1279             stats = self.gen.get_stats(self.ifstats)
1280             for chain_idx, ifs in enumerate(self.ifstats):
1281                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1282                 # corresponding to the
1283                 # port 0 and port 1 for the given chain_idx
1284                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1285                 # interface stats for the pps because it could have been modified to contain
1286                 # additional interface stats
1287                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1288             # special handling for vxlan
1289             # in case of vxlan, flow stats are not available so all rx counters will be
1290             # zeros when the total rx port counter is non zero.
1291             # in that case,
1292             for port in range(2):
1293                 total_rx = 0
1294                 for ifs in self.ifstats:
1295                     total_rx += ifs[port].rx
1296                 if total_rx == 0:
1297                     # check if the total port rx from Trex is also zero
1298                     port_rx = stats[port]['rx']['total_pkts']
1299                     if port_rx:
1300                         # the total rx for all chains from port level stats is non zero
1301                         # which means that the per-chain stats are not available
1302                         if len(self.ifstats) == 1:
1303                             # only one chain, simply report the port level rx to the chain rx stats
1304                             self.ifstats[0][port].rx = port_rx
1305                         else:
1306                             for ifs in self.ifstats:
1307                                 # mark this data as unavailable
1308                                 ifs[port].rx = None
1309                             # pitch in the total rx only in the last chain pps
1310                             self.ifstats[-1][port].rx_total = port_rx
1311
1312     @staticmethod
1313     def compare_tx_rates(required, actual):
1314         """Compare the actual TX rate to the required TX rate."""
1315         threshold = 0.9
1316         are_different = False
1317         try:
1318             if float(actual) / required < threshold:
1319                 are_different = True
1320         except ZeroDivisionError:
1321             are_different = True
1322
1323         if are_different:
1324             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1325                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1326                   "to achieve the requested TX rate.".format(r=required, a=actual)
1327             LOG.info(msg)
1328             return msg
1329
1330         return None
1331
1332     def get_per_direction_rate(self):
1333         """Get the rate for each direction."""
1334         divisor = 2 if self.run_config['bidirectional'] else 1
1335         if 'rate_percent' in self.current_total_rate:
1336             # don't split rate if it's percentage
1337             divisor = 1
1338
1339         return utils.divide_rate(self.current_total_rate, divisor)
1340
1341     def close(self):
1342         """Close this instance."""
1343         try:
1344             self.gen.stop_traffic()
1345         except Exception:
1346             pass
1347         self.gen.clear_stats()
1348         self.gen.cleanup()