NFVBENCH-176 Cannot run service mode with flow stats
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16 from math import gcd
17 import socket
18 import struct
19 import time
20
21 from attrdict import AttrDict
22 import bitmath
23 from hdrh.histogram import HdrHistogram
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: disable=wrong-import-order
30 from scapy.contrib.mpls import MPLS  # flake8: noqa
31 # pylint: enable=wrong-import-order
32 # pylint: enable=import-error
33
34 from .log import LOG
35 from .packet_stats import InterfaceStats
36 from .packet_stats import PacketPathStats
37 from .stats_collector import IntervalCollector
38 from .stats_collector import IterationCollector
39 from .traffic_gen import traffic_utils as utils
40 from .utils import cast_integer
41
42 class TrafficClientException(Exception):
43     """Generic traffic client exception."""
44
45 class TrafficRunner(object):
46     """Serialize various steps required to run traffic."""
47
48     def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
49         """Create a traffic runner."""
50         self.client = client
51         self.start_time = None
52         self.duration_sec = duration_sec
53         self.interval_sec = interval_sec
54         self.service_mode = service_mode
55
56     def run(self):
57         """Clear stats and instruct the traffic generator to start generating traffic."""
58         if self.is_running():
59             return None
60         LOG.info('Running traffic generator')
61         self.client.gen.clear_stats()
62         # Debug use only : new '--service-mode' option available for the NFVBench command line.
63         # A read-only mode TRex console would be able to capture the generated traffic.
64         self.client.gen.set_service_mode(enabled=self.service_mode)
65         LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
66         self.client.gen.start_traffic()
67         self.start_time = time.time()
68         return self.poll_stats()
69
70     def stop(self):
71         """Stop the current run and instruct the traffic generator to stop traffic."""
72         if self.is_running():
73             self.start_time = None
74             self.client.gen.stop_traffic()
75
76     def is_running(self):
77         """Check if a run is still pending."""
78         return self.start_time is not None
79
80     def time_elapsed(self):
81         """Return time elapsed since start of run."""
82         if self.is_running():
83             return time.time() - self.start_time
84         return self.duration_sec
85
86     def poll_stats(self):
87         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
88
89         return: latest stats or None if traffic is stopped
90         """
91         if not self.is_running():
92             return None
93         if self.client.skip_sleep():
94             self.stop()
95             return self.client.get_stats()
96         time_elapsed = self.time_elapsed()
97         if time_elapsed > self.duration_sec:
98             self.stop()
99             return None
100         time_left = self.duration_sec - time_elapsed
101         if self.interval_sec > 0.0:
102             if time_left <= self.interval_sec:
103                 time.sleep(time_left)
104                 self.stop()
105             else:
106                 time.sleep(self.interval_sec)
107         else:
108             time.sleep(self.duration_sec)
109             self.stop()
110         return self.client.get_stats()
111
112
113 class IpBlock(object):
114     """Manage a block of IP addresses."""
115
116     def __init__(self, base_ip, step_ip, count_ip):
117         """Create an IP block."""
118         self.base_ip_int = Device.ip_to_int(base_ip)
119         self.step = Device.ip_to_int(step_ip)
120         self.max_available = count_ip
121         self.next_free = 0
122
123     def get_ip(self, index=0):
124         """Return the IP address at given index."""
125         if index < 0 or index >= self.max_available:
126             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
127         return Device.int_to_ip(self.base_ip_int + index * self.step)
128
129     def reserve_ip_range(self, count, force_ip_reservation=False):
130         """Reserve a range of count consecutive IP addresses spaced by step.
131         force_ip_reservation parameter allows to continue the calculation of IPs when
132         the 2 sides (ports) have different size and the flow is greater than
133         the size as well.
134         """
135         if self.next_free + count > self.max_available and force_ip_reservation is False:
136             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
137                              (self.next_free,
138                               self.max_available,
139                               count))
140         if self.next_free + count > self.max_available and force_ip_reservation is True:
141             first_ip = self.get_ip(self.next_free)
142             last_ip = self.get_ip(self.next_free + self.max_available - 1)
143             self.next_free += self.max_available
144         else:
145             first_ip = self.get_ip(self.next_free)
146             last_ip = self.get_ip(self.next_free + count - 1)
147             self.next_free += count
148         return (first_ip, last_ip)
149
150     def reset_reservation(self):
151         """Reset all reservations and restart with a completely unused IP block."""
152         self.next_free = 0
153
154
155 class UdpPorts(object):
156
157     def __init__(self, src_min, src_max, dst_min, dst_max, step):
158
159         self.src_min = src_min
160         self.src_max = src_max
161         self.dst_min = dst_min
162         self.dst_max = dst_max
163         self.step = step
164
165
166 class Device(object):
167     """Represent a port device and all information associated to it.
168
169     In the curent version we only support 2 port devices for the traffic generator
170     identified as port 0 or port 1.
171     """
172
173     def __init__(self, port, generator_config):
174         """Create a new device for a given port."""
175         self.generator_config = generator_config
176         self.chain_count = generator_config.service_chain_count
177         if generator_config.bidirectional:
178             self.flow_count = generator_config.flow_count / 2
179         else:
180             self.flow_count = generator_config.flow_count
181
182         self.port = port
183         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
184         self.vtep_vlan = None
185         self.vtep_src_mac = None
186         self.vxlan = False
187         self.mpls = False
188         self.inner_labels = None
189         self.outer_labels = None
190         self.pci = generator_config.interfaces[port].pci
191         self.mac = None
192         self.dest_macs = None
193         self.vtep_dst_mac = None
194         self.vtep_dst_ip = None
195         if generator_config.vteps is None:
196             self.vtep_src_ip = None
197         else:
198             self.vtep_src_ip = generator_config.vteps[port]
199         self.vnis = None
200         self.vlans = None
201         self.ip_addrs = generator_config.ip_addrs[port]
202         self.ip_src_static = generator_config.ip_src_static
203         self.ip_addrs_step = generator_config.ip_addrs_step
204         if self.ip_addrs_step == 'random':
205             # Set step to 1 to calculate the IP range size (see check_ip_size below)
206             step = '0.0.0.1'
207         else:
208             step = self.ip_addrs_step
209         self.ip_size = self.check_ipsize(IPNetwork(self.ip_addrs).size, Device.ip_to_int(step))
210         self.ip = str(IPNetwork(self.ip_addrs).network)
211         ip_addrs_left = generator_config.ip_addrs[0]
212         ip_addrs_right = generator_config.ip_addrs[1]
213         self.ip_addrs_size = {
214             'left': self.check_ipsize(IPNetwork(ip_addrs_left).size, Device.ip_to_int(step)),
215             'right': self.check_ipsize(IPNetwork(ip_addrs_right).size, Device.ip_to_int(step))}
216         udp_src_port = generator_config.gen_config.udp_src_port
217         if udp_src_port is None:
218             udp_src_port = 53
219         udp_dst_port = generator_config.gen_config.udp_dst_port
220         if udp_dst_port is None:
221             udp_dst_port = 53
222         src_max, src_min = self.define_udp_range(udp_src_port, 'udp_src_port')
223         dst_max, dst_min = self.define_udp_range(udp_dst_port, 'udp_dst_port')
224         udp_src_range = int(src_max) - int(src_min) + 1
225         udp_dst_range = int(dst_max) - int(dst_min) + 1
226         lcm_port = self.lcm(udp_src_range, udp_dst_range)
227         if self.ip_src_static is True:
228             lcm_ip = self.lcm(1, min(self.ip_addrs_size['left'], self.ip_addrs_size['right']))
229         else:
230             lcm_ip = self.lcm(self.ip_addrs_size['left'], self.ip_addrs_size['right'])
231         flow_max = self.lcm(lcm_port, lcm_ip)
232         if self.flow_count > flow_max:
233             raise TrafficClientException('Trying to set unachievable traffic (%d > %d)' %
234                                          (self.flow_count, flow_max))
235
236         # manage udp range regarding flow count value
237         # UDP dst range is greater than FC => range will be limited to min + FC
238         if self.flow_count <= udp_dst_range:
239             dst_max = int(dst_min) + self.flow_count - 1
240         # UDP src range is greater than FC => range will be limited to min + FC
241         if self.flow_count <= udp_src_range:
242             src_max = int(src_min) + self.flow_count - 1
243         # Define IP block limit regarding flow count
244         if self.flow_count <= self.ip_size:
245             self.ip_block = IpBlock(self.ip, step, self.flow_count)
246         else:
247             self.ip_block = IpBlock(self.ip, step, self.ip_size)
248
249         self.udp_ports = UdpPorts(src_min, src_max, dst_min, dst_max,
250                                   generator_config.gen_config.udp_port_step)
251         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
252                                    generator_config.gateway_ip_addrs_step,
253                                    self.chain_count)
254         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
255         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
256                                       generator_config.tg_gateway_ip_addrs_step,
257                                       self.chain_count)
258
259     @staticmethod
260     def define_udp_range(udp_port, property_name):
261         if isinstance(udp_port, int):
262             min = udp_port
263             max = min
264         elif isinstance(udp_port, tuple):
265             min = udp_port[0]
266             max = udp_port[1]
267         else:
268             raise TrafficClientException('Invalid %s property value (53 or [\'53\',\'1024\'])'
269                                          % property_name)
270         return max, min
271
272     @staticmethod
273     def lcm(a, b):
274         """Calculate the maximum possible value for both IP and ports,
275         eventually for maximum possible flux."""
276         if a != 0 and b != 0:
277             lcm_value = a * b // gcd(a, b)
278             return lcm_value
279         raise TypeError(" IP size or port range can't be zero !")
280
281     @staticmethod
282     def check_ipsize(ip_size, step):
283         """Check and set the available IPs, considering the step."""
284         try:
285             if ip_size % step == 0:
286                 value = int(ip_size / step)
287             else:
288                 value = int((ip_size / step)) + 1
289             return value
290         except ZeroDivisionError:
291             raise ZeroDivisionError("step can't be zero !") from ZeroDivisionError
292
293     def set_mac(self, mac):
294         """Set the local MAC for this port device."""
295         if mac is None:
296             raise TrafficClientException('Trying to set traffic generator MAC address as None')
297         self.mac = mac
298
299     def get_peer_device(self):
300         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
301         return self.generator_config.devices[1 - self.port]
302
303     def set_vtep_dst_mac(self, dest_macs):
304         """Set the list of dest MACs indexed by the chain id.
305
306         This is only called in 2 cases:
307         - VM macs discovered using openstack API
308         - dest MACs provisioned in config file
309         """
310         self.vtep_dst_mac = list(map(str, dest_macs))
311
312     def set_dest_macs(self, dest_macs):
313         """Set the list of dest MACs indexed by the chain id.
314
315         This is only called in 2 cases:
316         - VM macs discovered using openstack API
317         - dest MACs provisioned in config file
318         """
319         self.dest_macs = list(map(str, dest_macs))
320
321     def get_dest_macs(self):
322         """Get the list of dest macs for this device.
323
324         If set_dest_macs was never called, assumes l2-loopback and return
325         a list of peer mac (as many as chains but normally only 1 chain)
326         """
327         if self.dest_macs:
328             return self.dest_macs
329         # assume this is l2-loopback
330         return [self.get_peer_device().mac] * self.chain_count
331
332     def set_vlans(self, vlans):
333         """Set the list of vlans to use indexed by the chain id."""
334         self.vlans = vlans
335         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
336
337     def set_vtep_vlan(self, vlan):
338         """Set the vtep vlan to use indexed by specific port."""
339         self.vtep_vlan = vlan
340         self.vxlan = True
341         self.vlan_tagging = None
342         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
343
344     def set_vxlan_endpoints(self, src_ip, dst_ip):
345         self.vtep_dst_ip = dst_ip
346         self.vtep_src_ip = src_ip
347         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
348                  self.vtep_src_ip, self.vtep_dst_ip)
349
350     def set_mpls_peers(self, src_ip, dst_ip):
351         self.mpls = True
352         self.vtep_dst_ip = dst_ip
353         self.vtep_src_ip = src_ip
354         LOG.info("Port %d: src_mpls_vtep %s, mpls_peer_ip %s", self.port,
355                  self.vtep_src_ip, self.vtep_dst_ip)
356
357     def set_vxlans(self, vnis):
358         self.vnis = vnis
359         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
360
361     def set_mpls_inner_labels(self, labels):
362         self.inner_labels = labels
363         LOG.info("Port %d: MPLS Inner Labels %s", self.port, self.inner_labels)
364
365     def set_mpls_outer_labels(self, labels):
366         self.outer_labels = labels
367         LOG.info("Port %d: MPLS Outer Labels %s", self.port, self.outer_labels)
368
369     def set_gw_ip(self, gateway_ip):
370         self.gw_ip_block = IpBlock(gateway_ip,
371                                    self.generator_config.gateway_ip_addrs_step,
372                                    self.chain_count)
373
374     def get_gw_ip(self, chain_index):
375         """Retrieve the IP address assigned for the gateway of a given chain."""
376         return self.gw_ip_block.get_ip(chain_index)
377
378     def get_stream_configs(self):
379         """Get the stream config for a given chain on this device.
380
381         Called by the traffic generator driver to program the traffic generator properly
382         before generating traffic
383         """
384         configs = []
385         # exact flow count for each chain is calculated as follows:
386         # - all chains except the first will have the same flow count
387         #   calculated as (total_flows + chain_count - 1) / chain_count
388         # - the first chain will have the remainder
389         # example 11 flows and 3 chains => 3, 4, 4
390         flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count)
391         cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1))
392         force_ip_reservation = False
393         # use case example of this parameter:
394         # - static IP addresses (source & destination), netmask = /30
395         # - 4 varying UDP source ports | 1 UDP destination port
396         # - Flow count = 8
397         # --> parameter 'reserve_ip_range' should have flag 'force_ip_reservation'
398         # in order to assign the maximum available IP on each iteration
399         if self.ip_size < cur_chain_flow_count \
400                 or self.ip_addrs_size['left'] != self.ip_addrs_size['right']:
401             force_ip_reservation = True
402
403         peer = self.get_peer_device()
404         self.ip_block.reset_reservation()
405         peer.ip_block.reset_reservation()
406         dest_macs = self.get_dest_macs()
407
408         for chain_idx in range(self.chain_count):
409             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range\
410             (cur_chain_flow_count, force_ip_reservation)
411             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range\
412             (cur_chain_flow_count, force_ip_reservation)
413
414             configs.append({
415                 'count': cur_chain_flow_count,
416                 'mac_src': self.mac,
417                 'mac_dst': dest_macs[chain_idx],
418                 'ip_src_addr': src_ip_first,
419                 'ip_src_addr_max': src_ip_last,
420                 'ip_src_count': cur_chain_flow_count,
421                 'ip_dst_addr': dst_ip_first,
422                 'ip_dst_addr_max': dst_ip_last,
423                 'ip_dst_count': cur_chain_flow_count,
424                 'ip_addrs_step': self.ip_addrs_step,
425                 'ip_src_static': self.ip_src_static,
426                 'udp_src_port': self.udp_ports.src_min,
427                 'udp_src_port_max': self.udp_ports.src_max,
428                 'udp_src_count': int(self.udp_ports.src_max) - int(self.udp_ports.src_min) + 1,
429                 'udp_dst_port': self.udp_ports.dst_min,
430                 'udp_dst_port_max': self.udp_ports.dst_max,
431                 'udp_dst_count': int(self.udp_ports.dst_max) - int(self.udp_ports.dst_min) + 1,
432                 'udp_port_step': self.udp_ports.step,
433                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
434                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
435                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
436                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
437                 'vxlan': self.vxlan,
438                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
439                 'vtep_src_mac': self.mac if (self.vxlan or self.mpls) else None,
440                 'vtep_dst_mac': self.vtep_dst_mac if (self.vxlan or self.mpls) else None,
441                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
442                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
443                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None,
444                 'mpls': self.mpls,
445                 'mpls_outer_label': self.outer_labels[chain_idx] if self.mpls is True else None,
446                 'mpls_inner_label': self.inner_labels[chain_idx] if self.mpls is True else None
447
448             })
449             # after first chain, fall back to the flow count for all other chains
450             cur_chain_flow_count = flows_per_chain
451         return configs
452
453     @staticmethod
454     def ip_to_int(addr):
455         """Convert an IP address from string to numeric."""
456         return struct.unpack("!I", socket.inet_aton(addr))[0]
457
458     @staticmethod
459     def int_to_ip(nvalue):
460         """Convert an IP address from numeric to string."""
461         return socket.inet_ntoa(struct.pack("!I", int(nvalue)))
462
463
464 class GeneratorConfig(object):
465     """Represents traffic configuration for currently running traffic profile."""
466
467     DEFAULT_IP_STEP = '0.0.0.1'
468     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
469
470     def __init__(self, config):
471         """Create a generator config."""
472         self.config = config
473         # name of the generator profile (normally trex or dummy)
474         # pick the default one if not specified explicitly from cli options
475         if not config.generator_profile:
476             config.generator_profile = config.traffic_generator.default_profile
477         # pick up the profile dict based on the name
478         gen_config = self.__match_generator_profile(config.traffic_generator,
479                                                     config.generator_profile)
480         self.gen_config = gen_config
481         # copy over fields from the dict
482         self.tool = gen_config.tool
483         self.ip = gen_config.ip
484         # overrides on config.cores and config.mbuf_factor
485         if config.cores:
486             self.cores = config.cores
487         else:
488             self.cores = gen_config.get('cores', 1)
489         self.mbuf_factor = config.mbuf_factor
490         self.mbuf_64 = config.mbuf_64
491         self.hdrh = not config.disable_hdrh
492         if gen_config.intf_speed:
493             # interface speed is overriden from config
494             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
495         else:
496             # interface speed is discovered/provided by the traffic generator
497             self.intf_speed = 0
498         self.name = gen_config.name
499         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
500         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
501         self.limit_memory = gen_config.get('limit_memory', 1024)
502         self.software_mode = gen_config.get('software_mode', False)
503         self.interfaces = gen_config.interfaces
504         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
505             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
506         self.service_chain = config.service_chain
507         self.service_chain_count = config.service_chain_count
508         self.flow_count = config.flow_count
509         self.host_name = gen_config.host_name
510         self.bidirectional = config.traffic.bidirectional
511         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
512         self.ip_addrs = gen_config.ip_addrs
513         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
514         self.tg_gateway_ip_addrs_step = \
515             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
516         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
517         self.gateway_ips = gen_config.gateway_ip_addrs
518         self.ip_src_static = gen_config.ip_src_static
519         self.vteps = gen_config.get('vteps')
520         self.devices = [Device(port, self) for port in [0, 1]]
521         # This should normally always be [0, 1]
522         self.ports = [device.port for device in self.devices]
523
524         # check that pci is not empty
525         if not gen_config.interfaces[0].get('pci', None) or \
526            not gen_config.interfaces[1].get('pci', None):
527             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
528
529         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
530         self.vlan_tagging = config.vlan_tagging
531
532         # needed for result/summarizer
533         config['tg-name'] = gen_config.name
534         config['tg-tool'] = self.tool
535
536     def to_json(self):
537         """Get json form to display the content into the overall result dict."""
538         return dict(self.gen_config)
539
540     def set_dest_macs(self, port_index, dest_macs):
541         """Set the list of dest MACs indexed by the chain id on given port.
542
543         port_index: the port for which dest macs must be set
544         dest_macs: a list of dest MACs indexed by chain id
545         """
546         if len(dest_macs) < self.config.service_chain_count:
547             raise TrafficClientException('Dest MAC list %s must have %d entries' %
548                                          (dest_macs, self.config.service_chain_count))
549         # only pass the first scc dest MACs
550         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
551         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
552
553     def set_vtep_dest_macs(self, port_index, dest_macs):
554         """Set the list of dest MACs indexed by the chain id on given port.
555
556         port_index: the port for which dest macs must be set
557         dest_macs: a list of dest MACs indexed by chain id
558         """
559         if len(dest_macs) != self.config.service_chain_count:
560             raise TrafficClientException('Dest MAC list %s must have %d entries' %
561                                          (dest_macs, self.config.service_chain_count))
562         self.devices[port_index].set_vtep_dst_mac(dest_macs)
563         LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs})
564
565     def get_dest_macs(self):
566         """Return the list of dest macs indexed by port."""
567         return [dev.get_dest_macs() for dev in self.devices]
568
569     def set_vlans(self, port_index, vlans):
570         """Set the list of vlans to use indexed by the chain id on given port.
571
572         port_index: the port for which VLANs must be set
573         vlans: a  list of vlan lists indexed by chain id
574         """
575         if len(vlans) != self.config.service_chain_count:
576             raise TrafficClientException('VLAN list %s must have %d entries' %
577                                          (vlans, self.config.service_chain_count))
578         self.devices[port_index].set_vlans(vlans)
579
580     def set_vxlans(self, port_index, vxlans):
581         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
582
583         port_index: the port for which VXLANs must be set
584         VXLANs: a  list of VNIs lists indexed by chain id
585         """
586         if len(vxlans) != self.config.service_chain_count:
587             raise TrafficClientException('VXLAN list %s must have %d entries' %
588                                          (vxlans, self.config.service_chain_count))
589         self.devices[port_index].set_vxlans(vxlans)
590
591     def set_mpls_inner_labels(self, port_index, labels):
592         """Set the list of MPLS Labels to use indexed by the chain id on given port.
593
594         port_index: the port for which Labels must be set
595         Labels: a list of Labels lists indexed by chain id
596         """
597         if len(labels) != self.config.service_chain_count:
598             raise TrafficClientException('Inner MPLS list %s must have %d entries' %
599                                          (labels, self.config.service_chain_count))
600         self.devices[port_index].set_mpls_inner_labels(labels)
601
602     def set_mpls_outer_labels(self, port_index, labels):
603         """Set the list of MPLS Labels to use indexed by the chain id on given port.
604
605         port_index: the port for which Labels must be set
606         Labels: a list of Labels lists indexed by chain id
607         """
608         if len(labels) != self.config.service_chain_count:
609             raise TrafficClientException('Outer MPLS list %s must have %d entries' %
610                                          (labels, self.config.service_chain_count))
611         self.devices[port_index].set_mpls_outer_labels(labels)
612
613     def set_vtep_vlan(self, port_index, vlan):
614         """Set the vtep vlan to use indexed by the chain id on given port.
615         port_index: the port for which VLAN must be set
616         """
617         self.devices[port_index].set_vtep_vlan(vlan)
618
619     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
620         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
621
622     def set_mpls_peers(self, port_index, src_ip, dst_ip):
623         self.devices[port_index].set_mpls_peers(src_ip, dst_ip)
624
625     @staticmethod
626     def __match_generator_profile(traffic_generator, generator_profile):
627         gen_config = AttrDict(traffic_generator)
628         gen_config.pop('default_profile')
629         gen_config.pop('generator_profile')
630         matching_profile = [profile for profile in traffic_generator.generator_profile if
631                             profile.name == generator_profile]
632         if len(matching_profile) != 1:
633             raise Exception('Traffic generator profile not found: ' + generator_profile)
634
635         gen_config.update(matching_profile[0])
636         return gen_config
637
638
639 class TrafficClient(object):
640     """Traffic generator client with NDR/PDR binary seearch."""
641
642     PORTS = [0, 1]
643
644     def __init__(self, config, notifier=None):
645         """Create a new TrafficClient instance.
646
647         config: nfvbench config
648         notifier: notifier (optional)
649
650         A new instance is created everytime the nfvbench config may have changed.
651         """
652         self.config = config
653         self.generator_config = GeneratorConfig(config)
654         self.tool = self.generator_config.tool
655         self.gen = self._get_generator()
656         self.notifier = notifier
657         self.interval_collector = None
658         self.iteration_collector = None
659         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
660                                     self.config.service_mode)
661         self.config.frame_sizes = self._get_frame_sizes()
662         self.run_config = {
663             'l2frame_size': None,
664             'duration_sec': self.config.duration_sec,
665             'bidirectional': True,
666             'rates': []  # to avoid unsbuscriptable-obj warning
667         }
668         self.current_total_rate = {'rate_percent': '10'}
669         if self.config.single_run:
670             self.current_total_rate = utils.parse_rate_str(self.config.rate)
671         self.ifstats = None
672         # Speed is either discovered when connecting to TG or set from config
673         # This variable is 0 if not yet discovered from TG or must be the speed of
674         # each interface in bits per second
675         self.intf_speed = self.generator_config.intf_speed
676
677     def _get_generator(self):
678         tool = self.tool.lower()
679         if tool == 'trex':
680             from .traffic_gen import trex_gen
681             return trex_gen.TRex(self)
682         if tool == 'dummy':
683             from .traffic_gen import dummy
684             return dummy.DummyTG(self)
685         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
686
687     def skip_sleep(self):
688         """Skip all sleeps when doing unit testing with dummy TG.
689
690         Must be overriden using mock.patch
691         """
692         return False
693
694     def _get_frame_sizes(self):
695         traffic_profile_name = self.config.traffic.profile
696         matching_profiles = [profile for profile in self.config.traffic_profile if
697                              profile.name == traffic_profile_name]
698         if len(matching_profiles) > 1:
699             raise TrafficClientException('Multiple traffic profiles with name: ' +
700                                          traffic_profile_name)
701         if not matching_profiles:
702             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
703         return matching_profiles[0].l2frame_size
704
705     def start_traffic_generator(self):
706         """Start the traffic generator process (traffic not started yet)."""
707         self.gen.connect()
708         # pick up the interface speed if it is not set from config
709         intf_speeds = self.gen.get_port_speed_gbps()
710         # convert Gbps unit into bps
711         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
712         if self.intf_speed:
713             # interface speed is overriden from config
714             if self.intf_speed != tg_if_speed:
715                 # Warn the user if the speed in the config is different
716                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
717                             intf_speeds[0])
718         else:
719             # interface speed not provisioned by config
720             self.intf_speed = tg_if_speed
721             # also update the speed in the tg config
722             self.generator_config.intf_speed = tg_if_speed
723
724         # Save the traffic generator local MAC
725         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
726             device.set_mac(mac)
727
728     def setup(self):
729         """Set up the traffic client."""
730         self.gen.clear_stats()
731
732     def get_version(self):
733         """Get the traffic generator version."""
734         return self.gen.get_version()
735
736     def ensure_end_to_end(self):
737         """Ensure traffic generator receives packets it has transmitted.
738
739         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
740
741         VMs that are started and in active state may not pass traffic yet. It is imperative to make
742         sure that all VMs are passing traffic in both directions before starting any benchmarking.
743         To verify this, we need to send at a low frequency bi-directional packets and make sure
744         that we receive all packets back from all VMs. The number of flows is equal to 2 times
745         the number of chains (1 per direction) and we need to make sure we receive packets coming
746         from exactly 2 x chain count different source MAC addresses.
747
748         Example:
749             PVP chain (1 VM per chain)
750             N = 10 (number of chains)
751             Flow count = 20 (number of flows)
752             If the number of unique source MAC addresses from received packets is 20 then
753             all 10 VMs 10 VMs are in operational state.
754         """
755         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
756         # send 2pps on each chain and each direction
757         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
758         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
759                                 e2e=True)
760         # ensures enough traffic is coming back
761         retry_count = int((self.config.check_traffic_time_sec +
762                            self.config.generic_poll_sec - 1) / self.config.generic_poll_sec)
763
764         # we expect to see packets coming from 2 unique MAC per chain
765         # because there can be flooding in the case of shared net
766         # we must verify that packets from the right VMs are received
767         # and not just count unique src MAC
768         # create a dict of (port, chain) tuples indexed by dest mac
769         mac_map = {}
770         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
771             for chain, mac in enumerate(dest_macs):
772                 mac_map[mac] = (port, chain)
773         unique_src_mac_count = len(mac_map)
774         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
775             get_mac_id = lambda packet: packet['binary'][60:66]
776         elif self.config.vxlan:
777             get_mac_id = lambda packet: packet['binary'][56:62]
778         elif self.config.mpls:
779             get_mac_id = lambda packet: packet['binary'][24:30]
780             # mpls_transport_label = lambda packet: packet['binary'][14:18]
781         else:
782             get_mac_id = lambda packet: packet['binary'][6:12]
783         for it in range(retry_count):
784             self.gen.clear_stats()
785             self.gen.start_traffic()
786             self.gen.start_capture()
787             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
788                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
789                      it + 1, retry_count)
790             if not self.skip_sleep():
791                 time.sleep(self.config.generic_poll_sec)
792             self.gen.stop_traffic()
793             self.gen.fetch_capture_packets()
794             self.gen.stop_capture()
795             for packet in self.gen.packet_list:
796                 mac_id = get_mac_id(packet).decode('latin-1')
797                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
798                 if self.config.mpls:
799                     if src_mac in mac_map and self.is_mpls(packet):
800                         port, chain = mac_map[src_mac]
801                         LOG.info('Received mpls packet from mac: %s (chain=%d, port=%d)',
802                                  src_mac, chain, port)
803                         mac_map.pop(src_mac, None)
804                 else:
805                     if src_mac in mac_map and self.is_udp(packet):
806                         port, chain = mac_map[src_mac]
807                         LOG.info('Received udp packet from mac: %s (chain=%d, port=%d)',
808                                  src_mac, chain, port)
809                         mac_map.pop(src_mac, None)
810
811                 if not mac_map:
812                     LOG.info('End-to-end connectivity established')
813                     return
814             if self.config.l3_router and not self.config.no_arp:
815                 # In case of L3 traffic mode, routers are not able to route traffic
816                 # until VM interfaces are up and ARP requests are done
817                 LOG.info('Waiting for loopback service completely started...')
818                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
819                 self.ensure_arp_successful()
820         raise TrafficClientException('End-to-end connectivity cannot be ensured')
821
822     def is_udp(self, packet):
823         pkt = Ether(packet['binary'])
824         return UDP in pkt
825
826     def is_mpls(self, packet):
827         pkt = Ether(packet['binary'])
828         return MPLS in pkt
829
830     def ensure_arp_successful(self):
831         """Resolve all IP using ARP and throw an exception in case of failure."""
832         dest_macs = self.gen.resolve_arp()
833         if dest_macs:
834             # all dest macs are discovered, saved them into the generator config
835             if self.config.vxlan or self.config.mpls:
836                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
837                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
838             else:
839                 self.generator_config.set_dest_macs(0, dest_macs[0])
840                 self.generator_config.set_dest_macs(1, dest_macs[1])
841         else:
842             raise TrafficClientException('ARP cannot be resolved')
843
844     def set_traffic(self, frame_size, bidirectional):
845         """Reconfigure the traffic generator for a new frame size."""
846         self.run_config['bidirectional'] = bidirectional
847         self.run_config['l2frame_size'] = frame_size
848         self.run_config['rates'] = [self.get_per_direction_rate()]
849         if bidirectional:
850             self.run_config['rates'].append(self.get_per_direction_rate())
851         else:
852             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
853             if unidir_reverse_pps > 0:
854                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
855         # Fix for [NFVBENCH-67], convert the rate string to PPS
856         for idx, rate in enumerate(self.run_config['rates']):
857             if 'rate_pps' not in rate:
858                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
859
860         self.gen.clear_streamblock()
861
862         if self.config.no_latency_streams:
863             LOG.info("Latency streams are disabled")
864         # in service mode, we must disable flow stats (e2e=True)
865         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
866                                 latency=not self.config.no_latency_streams,
867                                 e2e=self.runner.service_mode)
868
869     def _modify_load(self, load):
870         self.current_total_rate = {'rate_percent': str(load)}
871         rate_per_direction = self.get_per_direction_rate()
872
873         self.gen.modify_rate(rate_per_direction, False)
874         self.run_config['rates'][0] = rate_per_direction
875         if self.run_config['bidirectional']:
876             self.gen.modify_rate(rate_per_direction, True)
877             self.run_config['rates'][1] = rate_per_direction
878
879     def get_ndr_and_pdr(self):
880         """Start the NDR/PDR iteration and return the results."""
881         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
882         targets = {}
883         if self.config.ndr_run:
884             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
885             targets['ndr'] = self.config.measurement.NDR
886         if self.config.pdr_run:
887             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
888             targets['pdr'] = self.config.measurement.PDR
889
890         self.run_config['start_time'] = time.time()
891         self.interval_collector = IntervalCollector(self.run_config['start_time'])
892         self.interval_collector.attach_notifier(self.notifier)
893         self.iteration_collector = IterationCollector(self.run_config['start_time'])
894         results = {}
895         self.__range_search(0.0, 200.0, targets, results)
896
897         results['iteration_stats'] = {
898             'ndr_pdr': self.iteration_collector.get()
899         }
900
901         if self.config.ndr_run:
902             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
903             results['ndr']['time_taken_sec'] = \
904                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
905             if self.config.pdr_run:
906                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
907                 results['pdr']['time_taken_sec'] = \
908                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
909         else:
910             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
911             results['pdr']['time_taken_sec'] = \
912                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
913         return results
914
915     def __get_dropped_rate(self, result):
916         dropped_pkts = result['rx']['dropped_pkts']
917         total_pkts = result['tx']['total_pkts']
918         if not total_pkts:
919             return float('inf')
920         return float(dropped_pkts) / total_pkts * 100
921
922     def get_stats(self):
923         """Collect final stats for previous run."""
924         stats = self.gen.get_stats(self.ifstats)
925         retDict = {'total_tx_rate': stats['total_tx_rate'],
926                    'offered_tx_rate_bps': stats['offered_tx_rate_bps']}
927
928         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
929         rx_keys = tx_keys + ['dropped_pkts']
930
931         for port in self.PORTS:
932             port_stats = {'tx': {}, 'rx': {}}
933             for key in tx_keys:
934                 port_stats['tx'][key] = int(stats[port]['tx'][key])
935             for key in rx_keys:
936                 try:
937                     port_stats['rx'][key] = int(stats[port]['rx'][key])
938                 except ValueError:
939                     port_stats['rx'][key] = 0
940             port_stats['rx']['avg_delay_usec'] = cast_integer(
941                 stats[port]['rx']['avg_delay_usec'])
942             port_stats['rx']['min_delay_usec'] = cast_integer(
943                 stats[port]['rx']['min_delay_usec'])
944             port_stats['rx']['max_delay_usec'] = cast_integer(
945                 stats[port]['rx']['max_delay_usec'])
946             port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats)
947             retDict[str(port)] = port_stats
948
949         ports = sorted(list(retDict.keys()), key=str)
950         if self.run_config['bidirectional']:
951             retDict['overall'] = {'tx': {}, 'rx': {}}
952             for key in tx_keys:
953                 retDict['overall']['tx'][key] = \
954                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
955             for key in rx_keys:
956                 retDict['overall']['rx'][key] = \
957                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
958             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
959                           retDict[ports[1]]['rx']['total_pkts']]
960             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
961                           retDict[ports[1]]['rx']['avg_delay_usec']]
962             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
963                           retDict[ports[1]]['rx']['max_delay_usec']]
964             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
965                           retDict[ports[1]]['rx']['min_delay_usec']]
966             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
967             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
968             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
969             for key in ['pkt_bit_rate', 'pkt_rate']:
970                 for dirc in ['tx', 'rx']:
971                     retDict['overall'][dirc][key] /= 2.0
972                 retDict['overall']['hdrh'] = stats.get('hdrh', None)
973                 if retDict['overall']['hdrh']:
974                     decoded_histogram = HdrHistogram.decode(retDict['overall']['hdrh'])
975                     # override min max and avg from hdrh
976                     retDict['overall']['rx']['min_delay_usec'] = decoded_histogram.get_min_value()
977                     retDict['overall']['rx']['max_delay_usec'] = decoded_histogram.get_max_value()
978                     retDict['overall']['rx']['avg_delay_usec'] = decoded_histogram.get_mean_value()
979                     retDict['overall']['rx']['lat_percentile'] = {}
980                     for percentile in self.config.lat_percentiles:
981                         retDict['overall']['rx']['lat_percentile'][percentile] = \
982                             decoded_histogram.get_value_at_percentile(percentile)
983
984         else:
985             retDict['overall'] = retDict[ports[0]]
986         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
987         return retDict
988
989     def __convert_rates(self, rate):
990         return utils.convert_rates(self.run_config['l2frame_size'],
991                                    rate,
992                                    self.intf_speed)
993
994     def __ndr_pdr_found(self, tag, load):
995         rates = self.__convert_rates({'rate_percent': load})
996         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
997         last_stats = self.iteration_collector.peek()
998         self.interval_collector.add_ndr_pdr(tag, last_stats)
999
1000     def __format_output_stats(self, stats):
1001         for key in self.PORTS + ['overall']:
1002             key = str(key)
1003             interface = stats[key]
1004             stats[key] = {
1005                 'tx_pkts': interface['tx']['total_pkts'],
1006                 'rx_pkts': interface['rx']['total_pkts'],
1007                 'drop_percentage': interface['drop_rate_percent'],
1008                 'drop_pct': interface['rx']['dropped_pkts'],
1009                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
1010                 'max_delay_usec': interface['rx']['max_delay_usec'],
1011                 'min_delay_usec': interface['rx']['min_delay_usec'],
1012             }
1013
1014             if key == 'overall':
1015                 stats[key]['hdrh'] = interface.get('hdrh', None)
1016                 if stats[key]['hdrh']:
1017                     decoded_histogram = HdrHistogram.decode(stats[key]['hdrh'])
1018                     # override min max and avg from hdrh
1019                     stats[key]['min_delay_usec'] = decoded_histogram.get_min_value()
1020                     stats[key]['max_delay_usec'] = decoded_histogram.get_max_value()
1021                     stats[key]['avg_delay_usec'] = decoded_histogram.get_mean_value()
1022                     stats[key]['lat_percentile'] = {}
1023                     for percentile in self.config.lat_percentiles:
1024                         stats[key]['lat_percentile'][percentile] = decoded_histogram.\
1025                             get_value_at_percentile(percentile)
1026
1027
1028         return stats
1029
1030     def __targets_found(self, rate, targets, results):
1031         for tag, target in list(targets.items()):
1032             LOG.info('Found %s (%s) load: %s', tag, target, rate)
1033             self.__ndr_pdr_found(tag, rate)
1034             results[tag]['timestamp_sec'] = time.time()
1035
1036     def __range_search(self, left, right, targets, results):
1037         """Perform a binary search for a list of targets inside a [left..right] range or rate.
1038
1039         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
1040                 indicating the rate to send on each interface
1041         right   the right side of the range to search as a % of line rate
1042                 indicating the rate to send on each interface
1043         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
1044                 ('ndr', 'pdr')
1045         results a dict to store results
1046         """
1047         if not targets:
1048             return
1049         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
1050
1051         # Terminate search when gap is less than load epsilon
1052         if right - left < self.config.measurement.load_epsilon:
1053             self.__targets_found(left, targets, results)
1054             return
1055
1056         # Obtain the average drop rate in for middle load
1057         middle = (left + right) / 2.0
1058         try:
1059             stats, rates = self.__run_search_iteration(middle)
1060         except STLError:
1061             LOG.exception("Got exception from traffic generator during binary search")
1062             self.__targets_found(left, targets, results)
1063             return
1064         # Split target dicts based on the avg drop rate
1065         left_targets = {}
1066         right_targets = {}
1067         for tag, target in list(targets.items()):
1068             if stats['overall']['drop_rate_percent'] <= target:
1069                 # record the best possible rate found for this target
1070                 results[tag] = rates
1071                 results[tag].update({
1072                     'load_percent_per_direction': middle,
1073                     'stats': self.__format_output_stats(dict(stats)),
1074                     'timestamp_sec': None
1075                 })
1076                 right_targets[tag] = target
1077             else:
1078                 # initialize to 0 all fields of result for
1079                 # the worst case scenario of the binary search (if ndr/pdr is not found)
1080                 if tag not in results:
1081                     results[tag] = dict.fromkeys(rates, 0)
1082                     empty_stats = self.__format_output_stats(dict(stats))
1083                     for key in empty_stats:
1084                         if isinstance(empty_stats[key], dict):
1085                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
1086                         else:
1087                             empty_stats[key] = 0
1088                     results[tag].update({
1089                         'load_percent_per_direction': 0,
1090                         'stats': empty_stats,
1091                         'timestamp_sec': None
1092                     })
1093                 left_targets[tag] = target
1094
1095         # search lower half
1096         self.__range_search(left, middle, left_targets, results)
1097
1098         # search upper half only if the upper rate does not exceed
1099         # 100%, this only happens when the first search at 100%
1100         # yields a DR that is < target DR
1101         if middle >= 100:
1102             self.__targets_found(100, right_targets, results)
1103         else:
1104             self.__range_search(middle, right, right_targets, results)
1105
1106     def __run_search_iteration(self, rate):
1107         """Run one iteration at the given rate level.
1108
1109         rate: the rate to send on each port in percent (0 to 100)
1110         """
1111         self._modify_load(rate)
1112
1113         # poll interval stats and collect them
1114         for stats in self.run_traffic():
1115             self.interval_collector.add(stats)
1116             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
1117             if time_elapsed_ratio >= 1:
1118                 self.cancel_traffic()
1119                 if not self.skip_sleep():
1120                     time.sleep(self.config.pause_sec)
1121         self.interval_collector.reset()
1122
1123         # get stats from the run
1124         stats = self.runner.client.get_stats()
1125         current_traffic_config = self._get_traffic_config()
1126         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
1127                                         stats['total_tx_rate'])
1128         if warning is not None:
1129             stats['warning'] = warning
1130
1131         # save reliable stats from whole iteration
1132         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
1133         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
1134         return stats, current_traffic_config['direction-total']
1135
1136     def log_stats(self, stats):
1137         """Log estimated stats during run."""
1138         # Calculate a rolling drop rate based on differential to
1139         # the previous reading
1140         cur_tx = stats['overall']['tx']['total_pkts']
1141         cur_rx = stats['overall']['rx']['total_pkts']
1142         delta_tx = cur_tx - self.prev_tx
1143         delta_rx = cur_rx - self.prev_rx
1144         drops = delta_tx - delta_rx
1145         drop_rate_pct = 100 * (delta_tx - delta_rx)/delta_tx
1146         self.prev_tx = cur_tx
1147         self.prev_rx = cur_rx
1148         LOG.info('TX: %15s; RX: %15s; (Est.) Dropped: %12s; Drop rate: %8.4f%%',
1149                  format(cur_tx, ',d'),
1150                  format(cur_rx, ',d'),
1151                  format(drops, ',d'),
1152                  drop_rate_pct)
1153
1154     def run_traffic(self):
1155         """Start traffic and return intermediate stats for each interval."""
1156         stats = self.runner.run()
1157         self.prev_tx = 0
1158         self.prev_rx = 0
1159         while self.runner.is_running:
1160             self.log_stats(stats)
1161             yield stats
1162             stats = self.runner.poll_stats()
1163             if stats is None:
1164                 return
1165         self.log_stats(stats)
1166         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
1167         yield stats
1168
1169     def cancel_traffic(self):
1170         """Stop traffic."""
1171         self.runner.stop()
1172
1173     def _get_traffic_config(self):
1174         config = {}
1175         load_total = 0.0
1176         bps_total = 0.0
1177         pps_total = 0.0
1178         for idx, rate in enumerate(self.run_config['rates']):
1179             key = 'direction-forward' if idx == 0 else 'direction-reverse'
1180             config[key] = {
1181                 'l2frame_size': self.run_config['l2frame_size'],
1182                 'duration_sec': self.run_config['duration_sec']
1183             }
1184             config[key].update(rate)
1185             config[key].update(self.__convert_rates(rate))
1186             load_total += float(config[key]['rate_percent'])
1187             bps_total += float(config[key]['rate_bps'])
1188             pps_total += float(config[key]['rate_pps'])
1189         config['direction-total'] = dict(config['direction-forward'])
1190         config['direction-total'].update({
1191             'rate_percent': load_total,
1192             'rate_pps': cast_integer(pps_total),
1193             'rate_bps': bps_total
1194         })
1195
1196         return config
1197
1198     def get_run_config(self, results):
1199         """Return configuration which was used for the last run."""
1200         r = {}
1201         # because we want each direction to have the far end RX rates,
1202         # use the far end index (1-idx) to retrieve the RX rates
1203         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
1204             tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec
1205             rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec
1206             r[key] = {
1207                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
1208                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
1209                 "rx": self.__convert_rates({'rate_pps': rx_rate})
1210             }
1211
1212         total = {}
1213         for direction in ['orig', 'tx', 'rx']:
1214             total[direction] = {}
1215             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
1216                 total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())])
1217
1218         r['direction-total'] = total
1219         return r
1220
1221     def insert_interface_stats(self, pps_list):
1222         """Insert interface stats to a list of packet path stats.
1223
1224         pps_list: a list of packet path stats instances indexed by chain index
1225
1226         This function will insert the packet path stats for the traffic gen ports 0 and 1
1227         with itemized per chain tx/rx counters.
1228         There will be as many packet path stats as chains.
1229         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1230         self.pps_list:
1231         [
1232         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1233         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1234         ...
1235         ]
1236         """
1237         def get_if_stats(chain_idx):
1238             return [InterfaceStats('p' + str(port), self.tool)
1239                     for port in range(2)]
1240         # keep the list of list of interface stats indexed by the chain id
1241         self.ifstats = [get_if_stats(chain_idx)
1242                         for chain_idx in range(self.config.service_chain_count)]
1243         # note that we need to make a copy of the ifs list so that any modification in the
1244         # list from pps will not change the list saved in self.ifstats
1245         self.pps_list = [PacketPathStats(self.config, list(ifs)) for ifs in self.ifstats]
1246         # insert the corresponding pps in the passed list
1247         pps_list.extend(self.pps_list)
1248
1249     def update_interface_stats(self, diff=False):
1250         """Update all interface stats.
1251
1252         diff: if False, simply refresh the interface stats values with latest values
1253               if True, diff the interface stats with the latest values
1254         Make sure that the interface stats inserted in insert_interface_stats() are updated
1255         with proper values.
1256         self.ifstats:
1257         [
1258         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1259         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1260         ...
1261         ]
1262         """
1263         if diff:
1264             stats = self.gen.get_stats(self.ifstats)
1265             for chain_idx, ifs in enumerate(self.ifstats):
1266                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1267                 # corresponding to the
1268                 # port 0 and port 1 for the given chain_idx
1269                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1270                 # interface stats for the pps because it could have been modified to contain
1271                 # additional interface stats
1272                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1273             # special handling for vxlan
1274             # in case of vxlan, flow stats are not available so all rx counters will be
1275             # zeros when the total rx port counter is non zero.
1276             # in that case,
1277             for port in range(2):
1278                 total_rx = 0
1279                 for ifs in self.ifstats:
1280                     total_rx += ifs[port].rx
1281                 if total_rx == 0:
1282                     # check if the total port rx from Trex is also zero
1283                     port_rx = stats[port]['rx']['total_pkts']
1284                     if port_rx:
1285                         # the total rx for all chains from port level stats is non zero
1286                         # which means that the per-chain stats are not available
1287                         if len(self.ifstats) == 1:
1288                             # only one chain, simply report the port level rx to the chain rx stats
1289                             self.ifstats[0][port].rx = port_rx
1290                         else:
1291                             for ifs in self.ifstats:
1292                                 # mark this data as unavailable
1293                                 ifs[port].rx = None
1294                             # pitch in the total rx only in the last chain pps
1295                             self.ifstats[-1][port].rx_total = port_rx
1296
1297     @staticmethod
1298     def compare_tx_rates(required, actual):
1299         """Compare the actual TX rate to the required TX rate."""
1300         threshold = 0.9
1301         are_different = False
1302         try:
1303             if float(actual) / required < threshold:
1304                 are_different = True
1305         except ZeroDivisionError:
1306             are_different = True
1307
1308         if are_different:
1309             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1310                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1311                   "to achieve the requested TX rate.".format(r=required, a=actual)
1312             LOG.info(msg)
1313             return msg
1314
1315         return None
1316
1317     def get_per_direction_rate(self):
1318         """Get the rate for each direction."""
1319         divisor = 2 if self.run_config['bidirectional'] else 1
1320         if 'rate_percent' in self.current_total_rate:
1321             # don't split rate if it's percentage
1322             divisor = 1
1323
1324         return utils.divide_rate(self.current_total_rate, divisor)
1325
1326     def close(self):
1327         """Close this instance."""
1328         try:
1329             self.gen.stop_traffic()
1330         except Exception:
1331             pass
1332         self.gen.clear_stats()
1333         self.gen.cleanup()