ae8af8da912696fd653a96f932035eedac7f931d
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16 import socket
17 import struct
18 import time
19 import sys
20
21 from attrdict import AttrDict
22 import bitmath
23 from hdrh.histogram import HdrHistogram
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: disable=wrong-import-order
30 from scapy.contrib.mpls import MPLS  # flake8: noqa
31 # pylint: enable=wrong-import-order
32 # pylint: enable=import-error
33
34 from .log import LOG
35 from .packet_stats import InterfaceStats
36 from .packet_stats import PacketPathStats
37 from .stats_collector import IntervalCollector
38 from .stats_collector import IterationCollector
39 from .traffic_gen import traffic_utils as utils
40 from .utils import cast_integer, find_max_size, find_tuples_equal_to_lcm_value, get_divisors, lcm
41
42 class TrafficClientException(Exception):
43     """Generic traffic client exception."""
44
45 class TrafficRunner(object):
46     """Serialize various steps required to run traffic."""
47
48     def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
49         """Create a traffic runner."""
50         self.client = client
51         self.start_time = None
52         self.duration_sec = duration_sec
53         self.interval_sec = interval_sec
54         self.service_mode = service_mode
55
56     def run(self):
57         """Clear stats and instruct the traffic generator to start generating traffic."""
58         if self.is_running():
59             return None
60         LOG.info('Running traffic generator')
61         self.client.gen.clear_stats()
62         # Debug use only: the service_mode flag may have been set in
63         # the configuration, in order to enable the 'service' mode
64         # in the trex generator, before starting the traffic (run).
65         # From this point, a T-rex console (launched in readonly mode) would
66         # then be able to capture the transmitted and/or received traffic.
67         self.client.gen.set_service_mode(enabled=self.service_mode)
68         LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
69         self.client.gen.start_traffic()
70         self.start_time = time.time()
71         return self.poll_stats()
72
73     def stop(self):
74         """Stop the current run and instruct the traffic generator to stop traffic."""
75         if self.is_running():
76             self.start_time = None
77             self.client.gen.stop_traffic()
78
79     def is_running(self):
80         """Check if a run is still pending."""
81         return self.start_time is not None
82
83     def time_elapsed(self):
84         """Return time elapsed since start of run."""
85         if self.is_running():
86             return time.time() - self.start_time
87         return self.duration_sec
88
89     def poll_stats(self):
90         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
91
92         return: latest stats or None if traffic is stopped
93         """
94         if not self.is_running():
95             return None
96         if self.client.skip_sleep():
97             self.stop()
98             return self.client.get_stats()
99         time_elapsed = self.time_elapsed()
100         if time_elapsed > self.duration_sec:
101             self.stop()
102             return None
103         time_left = self.duration_sec - time_elapsed
104         if self.interval_sec > 0.0:
105             if time_left <= self.interval_sec:
106                 time.sleep(time_left)
107                 self.stop()
108             else:
109                 time.sleep(self.interval_sec)
110         else:
111             time.sleep(self.duration_sec)
112             self.stop()
113         return self.client.get_stats()
114
115
116 class IpBlock(object):
117     """Manage a block of IP addresses."""
118
119     def __init__(self, base_ip, step_ip, count_ip):
120         """Create an IP block."""
121         self.base_ip_int = Device.ip_to_int(base_ip)
122         if step_ip == 'random':
123             step_ip = '0.0.0.1'
124         self.step = Device.ip_to_int(step_ip)
125         self.max_available = count_ip
126         self.next_free = 0
127
128     def get_ip(self, index=0):
129         """Return the IP address at given index."""
130         if index < 0 or index >= self.max_available:
131             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
132         return Device.int_to_ip(self.base_ip_int + index * self.step)
133
134     def get_ip_from_chain_first_ip(self, first_ip, index=0):
135         """Return the IP address at given index starting from chain first ip."""
136         if index < 0 or index >= self.max_available:
137             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
138         return Device.int_to_ip(first_ip + index * self.step)
139
140     def reserve_ip_range(self, count):
141         """Reserve a range of count consecutive IP addresses spaced by step.
142         """
143         if self.next_free + count > self.max_available:
144             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
145                              (self.next_free,
146                               self.max_available,
147                               count))
148         first_ip = self.get_ip(self.next_free)
149         last_ip = self.get_ip(self.next_free + count - 1)
150         self.next_free += count
151         return (first_ip, last_ip)
152
153     def reset_reservation(self):
154         """Reset all reservations and restart with a completely unused IP block."""
155         self.next_free = 0
156
157
158 class UdpPorts(object):
159
160     def __init__(self, src_min, src_max, dst_min, dst_max, udp_src_size, udp_dst_size, step):
161
162         self.src_min = int(src_min)
163         self.src_max = int(src_max)
164         self.dst_min = int(dst_min)
165         self.dst_max = int(dst_max)
166         self.udp_src_size = udp_src_size
167         self.udp_dst_size = udp_dst_size
168         self.step = step
169
170     def get_src_max(self, index=0):
171         """Return the UDP src port at given index."""
172         return int(self.src_min) + index * int(self.step)
173
174     def get_dst_max(self, index=0):
175         """Return the UDP dst port at given index."""
176         return int(self.dst_min) + index * int(self.step)
177
178
179 class Device(object):
180     """Represent a port device and all information associated to it.
181
182     In the curent version we only support 2 port devices for the traffic generator
183     identified as port 0 or port 1.
184     """
185
186     def __init__(self, port, generator_config):
187         """Create a new device for a given port."""
188         self.generator_config = generator_config
189         self.chain_count = generator_config.service_chain_count
190         if generator_config.bidirectional:
191             self.flow_count = generator_config.flow_count / 2
192         else:
193             self.flow_count = generator_config.flow_count
194
195         self.port = port
196         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
197         self.vtep_vlan = None
198         self.vtep_src_mac = None
199         self.vxlan = False
200         self.mpls = False
201         self.inner_labels = None
202         self.outer_labels = None
203         self.pci = generator_config.interfaces[port].pci
204         self.mac = None
205         self.dest_macs = None
206         self.vtep_dst_mac = None
207         self.vtep_dst_ip = None
208         if generator_config.vteps is None:
209             self.vtep_src_ip = None
210         else:
211             self.vtep_src_ip = generator_config.vteps[port]
212         self.vnis = None
213         self.vlans = None
214         self.ip_addrs = generator_config.ip_addrs[port]
215         self.ip_src_static = generator_config.ip_src_static
216         self.ip_addrs_step = generator_config.ip_addrs_step
217         if self.ip_addrs_step == 'random':
218             # Set step to 1 to calculate the IP range size (see check_range_size below)
219             step = '0.0.0.1'
220         else:
221             step = self.ip_addrs_step
222         self.ip_size = self.check_range_size(IPNetwork(self.ip_addrs).size, Device.ip_to_int(step))
223         self.ip = str(IPNetwork(self.ip_addrs).network)
224         ip_addrs_left = generator_config.ip_addrs[0]
225         ip_addrs_right = generator_config.ip_addrs[1]
226         self.ip_addrs_size = {
227             'left': self.check_range_size(IPNetwork(ip_addrs_left).size, Device.ip_to_int(step)),
228             'right': self.check_range_size(IPNetwork(ip_addrs_right).size, Device.ip_to_int(step))}
229         udp_src_port = generator_config.gen_config.udp_src_port
230         if udp_src_port is None:
231             udp_src_port = 53
232         udp_dst_port = generator_config.gen_config.udp_dst_port
233         if udp_dst_port is None:
234             udp_dst_port = 53
235         src_max, src_min = self.define_udp_range(udp_src_port, 'udp_src_port')
236         dst_max, dst_min = self.define_udp_range(udp_dst_port, 'udp_dst_port')
237         if generator_config.gen_config.udp_port_step == 'random':
238             # Set step to 1 to calculate the UDP range size
239             udp_step = 1
240         else:
241             udp_step = int(generator_config.gen_config.udp_port_step)
242         udp_src_size = self.check_range_size(int(src_max) - int(src_min) + 1, udp_step)
243         udp_dst_size = self.check_range_size(int(dst_max) - int(dst_min) + 1, udp_step)
244         lcm_port = lcm(udp_src_size, udp_dst_size)
245         if self.ip_src_static is True:
246             lcm_ip = lcm(1, min(self.ip_addrs_size['left'], self.ip_addrs_size['right']))
247         else:
248             lcm_ip = lcm(self.ip_addrs_size['left'], self.ip_addrs_size['right'])
249         flow_max = lcm(lcm_port, lcm_ip)
250         if self.flow_count > flow_max:
251             raise TrafficClientException('Trying to set unachievable traffic (%d > %d)' %
252                                          (self.flow_count, flow_max))
253
254         self.udp_ports = UdpPorts(src_min, src_max, dst_min, dst_max, udp_src_size, udp_dst_size,
255                                   generator_config.gen_config.udp_port_step)
256
257         self.ip_block = IpBlock(self.ip, step, self.ip_size)
258
259         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
260                                    generator_config.gateway_ip_addrs_step,
261                                    self.chain_count)
262         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
263         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
264                                       generator_config.tg_gateway_ip_addrs_step,
265                                       self.chain_count)
266
267     def limit_ip_udp_ranges(self, peer_ip_size, cur_chain_flow_count):
268         # init to min value in case of no matching values found with lcm calculation
269         new_src_ip_size = 1
270         new_peer_ip_size = 1
271         new_src_udp_size = 1
272         new_dst_udp_size = 1
273
274         if self.ip_src_static is True:
275             src_ip_size = 1
276         else:
277             src_ip_size = self.ip_size
278         ip_src_divisors = list(get_divisors(src_ip_size))
279         ip_dst_divisors = list(get_divisors(peer_ip_size))
280         udp_src_divisors = list(get_divisors(self.udp_ports.udp_src_size))
281         udp_dst_divisors = list(get_divisors(self.udp_ports.udp_dst_size))
282         fc = int(cur_chain_flow_count)
283         tuples_ip = list(find_tuples_equal_to_lcm_value(ip_src_divisors, ip_dst_divisors, fc))
284         tuples_udp = list(find_tuples_equal_to_lcm_value(udp_src_divisors, udp_dst_divisors, fc))
285
286         if tuples_ip:
287             new_src_ip_size = tuples_ip[-1][0]
288             new_peer_ip_size = tuples_ip[-1][1]
289
290         if tuples_udp:
291             new_src_udp_size = tuples_udp[-1][0]
292             new_dst_udp_size = tuples_udp[-1][1]
293
294         tuples_src = []
295         tuples_dst = []
296         if not tuples_ip and not tuples_udp:
297             # in case of not divisors in common matching LCM value (i.e. requested flow count)
298             # try to find an accurate UDP range to fit requested flow count
299             udp_src_int = range(self.udp_ports.src_min, self.udp_ports.src_max)
300             udp_dst_int = range(self.udp_ports.dst_min, self.udp_ports.dst_max)
301             tuples_src = list(find_tuples_equal_to_lcm_value(ip_src_divisors, udp_src_int, fc))
302             tuples_dst = list(find_tuples_equal_to_lcm_value(ip_dst_divisors, udp_dst_int, fc))
303
304             if not tuples_src and not tuples_dst:
305                 # iterate IP and UDP ranges to find a tuple that match flow count values
306                 src_ip_range = range(1,src_ip_size)
307                 dst_ip_range = range(1, peer_ip_size)
308                 tuples_src = list(find_tuples_equal_to_lcm_value(src_ip_range, udp_src_int, fc))
309                 tuples_dst = list(find_tuples_equal_to_lcm_value(dst_ip_range, udp_dst_int, fc))
310
311         if tuples_src or tuples_dst:
312             if tuples_src:
313                 new_src_ip_size = tuples_src[-1][0]
314                 new_src_udp_size = tuples_src[-1][1]
315             if tuples_dst:
316                 new_peer_ip_size = tuples_dst[-1][0]
317                 new_dst_udp_size = tuples_dst[-1][1]
318         else:
319             if not tuples_ip:
320                 if src_ip_size != 1:
321                     if src_ip_size > fc:
322                         new_src_ip_size = fc
323                     else:
324                         new_src_ip_size = find_max_size(src_ip_size, tuples_udp, fc)
325                 if peer_ip_size != 1:
326                     if peer_ip_size > fc:
327                         new_peer_ip_size = fc
328                     else:
329                         new_peer_ip_size = find_max_size(peer_ip_size, tuples_udp, fc)
330
331             if not tuples_udp:
332                 if self.udp_ports.udp_src_size != 1:
333                     if self.udp_ports.udp_src_size > fc:
334                         new_src_udp_size = fc
335                     else:
336                         new_src_udp_size = find_max_size(self.udp_ports.udp_src_size,
337                                                          tuples_ip, fc)
338                 if self.udp_ports.udp_dst_size != 1:
339                     if self.udp_ports.udp_dst_size > fc:
340                         new_dst_udp_size = fc
341                     else:
342                         new_dst_udp_size = find_max_size(self.udp_ports.udp_dst_size,
343                                                          tuples_ip, fc)
344         max_possible_flows = lcm(lcm(new_src_ip_size, new_peer_ip_size),
345                                  lcm(new_src_udp_size, new_dst_udp_size))
346
347         LOG.debug("IP dst size: %d", new_peer_ip_size)
348         LOG.debug("LCM IP: %d", lcm(new_src_ip_size, new_peer_ip_size))
349         LOG.debug("LCM UDP: %d", lcm(new_src_udp_size, new_dst_udp_size))
350         LOG.debug("Global LCM: %d", max_possible_flows)
351         LOG.debug("IP src size: %d, IP dst size: %d, UDP src size: %d, UDP dst size: %d",
352                   new_src_ip_size, new_peer_ip_size, self.udp_ports.udp_src_size,
353                   self.udp_ports.udp_dst_size)
354         if not max_possible_flows == cur_chain_flow_count:
355             if (self.ip_addrs_step != '0.0.0.1' or self.udp_ports.step != '1') and not (
356                     self.ip_addrs_step == 'random' and self.udp_ports.step == 'random'):
357                 LOG.warning("Current values of ip_addrs_step and/or udp_port_step properties "
358                             "do not allow to control an accurate flow count. "
359                             "Values will be overridden as follows:")
360                 if self.ip_addrs_step != '0.0.0.1':
361                     LOG.info("ip_addrs_step='0.0.0.1' (previous value: ip_addrs_step='%s')",
362                              self.ip_addrs_step)
363                     self.ip_addrs_step = '0.0.0.1'
364
365                 if self.udp_ports.step != '1':
366                     LOG.info("udp_port_step='1' (previous value: udp_port_step='%s')",
367                              self.udp_ports.step)
368                     self.udp_ports.step = '1'
369                     # override config for not logging random step warning message in trex_gen.py
370                     self.generator_config.gen_config.udp_port_step = self.udp_ports.step
371             else:
372                 LOG.error("Current values of ip_addrs_step and udp_port_step properties "
373                           "do not allow to control an accurate flow count.")
374         else:
375             src_ip_size = new_src_ip_size
376             peer_ip_size = new_peer_ip_size
377             self.udp_ports.udp_src_size = new_src_udp_size
378             self.udp_ports.udp_dst_size = new_dst_udp_size
379         return src_ip_size, peer_ip_size
380
381     @staticmethod
382     def define_udp_range(udp_port, property_name):
383         if isinstance(udp_port, int):
384             min = udp_port
385             max = min
386         elif isinstance(udp_port, tuple):
387             min = udp_port[0]
388             max = udp_port[1]
389         else:
390             raise TrafficClientException('Invalid %s property value (53 or [\'53\',\'1024\'])'
391                                          % property_name)
392         return max, min
393
394
395     @staticmethod
396     def check_range_size(range_size, step):
397         """Check and set the available IPs or UDP ports, considering the step."""
398         try:
399             if range_size % step == 0:
400                 value = range_size // step
401             else:
402                 value = range_size // step + 1
403             return value
404         except ZeroDivisionError:
405             raise ZeroDivisionError("step can't be zero !") from ZeroDivisionError
406
407     def set_mac(self, mac):
408         """Set the local MAC for this port device."""
409         if mac is None:
410             raise TrafficClientException('Trying to set traffic generator MAC address as None')
411         self.mac = mac
412
413     def get_peer_device(self):
414         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
415         return self.generator_config.devices[1 - self.port]
416
417     def set_vtep_dst_mac(self, dest_macs):
418         """Set the list of dest MACs indexed by the chain id.
419
420         This is only called in 2 cases:
421         - VM macs discovered using openstack API
422         - dest MACs provisioned in config file
423         """
424         self.vtep_dst_mac = list(map(str, dest_macs))
425
426     def set_dest_macs(self, dest_macs):
427         """Set the list of dest MACs indexed by the chain id.
428
429         This is only called in 2 cases:
430         - VM macs discovered using openstack API
431         - dest MACs provisioned in config file
432         """
433         self.dest_macs = list(map(str, dest_macs))
434
435     def get_dest_macs(self):
436         """Get the list of dest macs for this device.
437
438         If set_dest_macs was never called, assumes l2-loopback and return
439         a list of peer mac (as many as chains but normally only 1 chain)
440         """
441         if self.dest_macs:
442             return self.dest_macs
443         # assume this is l2-loopback
444         return [self.get_peer_device().mac] * self.chain_count
445
446     def set_vlans(self, vlans):
447         """Set the list of vlans to use indexed by the chain id."""
448         self.vlans = vlans
449         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
450
451     def set_vtep_vlan(self, vlan):
452         """Set the vtep vlan to use indexed by specific port."""
453         self.vtep_vlan = vlan
454         self.vxlan = True
455         self.vlan_tagging = None
456         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
457
458     def set_vxlan_endpoints(self, src_ip, dst_ip):
459         self.vtep_dst_ip = dst_ip
460         self.vtep_src_ip = src_ip
461         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
462                  self.vtep_src_ip, self.vtep_dst_ip)
463
464     def set_mpls_peers(self, src_ip, dst_ip):
465         self.mpls = True
466         self.vtep_dst_ip = dst_ip
467         self.vtep_src_ip = src_ip
468         LOG.info("Port %d: src_mpls_vtep %s, mpls_peer_ip %s", self.port,
469                  self.vtep_src_ip, self.vtep_dst_ip)
470
471     def set_vxlans(self, vnis):
472         self.vnis = vnis
473         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
474
475     def set_mpls_inner_labels(self, labels):
476         self.inner_labels = labels
477         LOG.info("Port %d: MPLS Inner Labels %s", self.port, self.inner_labels)
478
479     def set_mpls_outer_labels(self, labels):
480         self.outer_labels = labels
481         LOG.info("Port %d: MPLS Outer Labels %s", self.port, self.outer_labels)
482
483     def set_gw_ip(self, gateway_ip):
484         self.gw_ip_block = IpBlock(gateway_ip,
485                                    self.generator_config.gateway_ip_addrs_step,
486                                    self.chain_count)
487
488     def get_gw_ip(self, chain_index):
489         """Retrieve the IP address assigned for the gateway of a given chain."""
490         return self.gw_ip_block.get_ip(chain_index)
491
492     def get_stream_configs(self):
493         """Get the stream config for a given chain on this device.
494
495         Called by the traffic generator driver to program the traffic generator properly
496         before generating traffic
497         """
498         configs = []
499         # exact flow count for each chain is calculated as follows:
500         # - all chains except the first will have the same flow count
501         #   calculated as (total_flows + chain_count - 1) / chain_count
502         # - the first chain will have the remainder
503         # example 11 flows and 3 chains => 3, 4, 4
504         flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count)
505         cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1))
506
507         peer = self.get_peer_device()
508         self.ip_block.reset_reservation()
509         peer.ip_block.reset_reservation()
510         dest_macs = self.get_dest_macs()
511
512         # limit ranges of UDP ports and IP to avoid overflow of the number of flows
513         peer_size = peer.ip_size // self.chain_count
514
515         for chain_idx in range(self.chain_count):
516             src_ip_size, peer_ip_size = self.limit_ip_udp_ranges(peer_size, cur_chain_flow_count)
517
518             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range \
519                 (src_ip_size)
520             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range \
521                 (peer_ip_size)
522
523             if self.ip_addrs_step != 'random':
524                 src_ip_last = self.ip_block.get_ip_from_chain_first_ip(
525                     Device.ip_to_int(src_ip_first), src_ip_size - 1)
526                 dst_ip_last = peer.ip_block.get_ip_from_chain_first_ip(
527                     Device.ip_to_int(dst_ip_first), peer_ip_size - 1)
528             if self.udp_ports.step != 'random':
529                 self.udp_ports.src_max = self.udp_ports.get_src_max(self.udp_ports.udp_src_size - 1)
530                 self.udp_ports.dst_max = self.udp_ports.get_dst_max(self.udp_ports.udp_dst_size - 1)
531             if self.ip_src_static:
532                 src_ip_last = src_ip_first
533
534             LOG.info("Port %d, chain %d: IP src range [%s,%s]", self.port, chain_idx,
535                      src_ip_first, src_ip_last)
536             LOG.info("Port %d, chain %d: IP dst range [%s,%s]", self.port, chain_idx,
537                      dst_ip_first, dst_ip_last)
538             LOG.info("Port %d, chain %d: UDP src range [%s,%s]", self.port, chain_idx,
539                      self.udp_ports.src_min, self.udp_ports.src_max)
540             LOG.info("Port %d, chain %d: UDP dst range [%s,%s]", self.port, chain_idx,
541                      self.udp_ports.dst_min, self.udp_ports.dst_max)
542
543             configs.append({
544                 'count': cur_chain_flow_count,
545                 'mac_src': self.mac,
546                 'mac_dst': dest_macs[chain_idx],
547                 'ip_src_addr': src_ip_first,
548                 'ip_src_addr_max': src_ip_last,
549                 'ip_src_count': src_ip_size,
550                 'ip_dst_addr': dst_ip_first,
551                 'ip_dst_addr_max': dst_ip_last,
552                 'ip_dst_count': peer_ip_size,
553                 'ip_addrs_step': self.ip_addrs_step,
554                 'ip_src_static': self.ip_src_static,
555                 'udp_src_port': self.udp_ports.src_min,
556                 'udp_src_port_max': self.udp_ports.src_max,
557                 'udp_src_count': self.udp_ports.udp_src_size,
558                 'udp_dst_port': self.udp_ports.dst_min,
559                 'udp_dst_port_max': self.udp_ports.dst_max,
560                 'udp_dst_count': self.udp_ports.udp_dst_size,
561                 'udp_port_step': self.udp_ports.step,
562                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
563                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
564                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
565                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
566                 'vxlan': self.vxlan,
567                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
568                 'vtep_src_mac': self.mac if (self.vxlan or self.mpls) else None,
569                 'vtep_dst_mac': self.vtep_dst_mac if (self.vxlan or self.mpls) else None,
570                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
571                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
572                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None,
573                 'mpls': self.mpls,
574                 'mpls_outer_label': self.outer_labels[chain_idx] if self.mpls is True else None,
575                 'mpls_inner_label': self.inner_labels[chain_idx] if self.mpls is True else None
576
577             })
578             # after first chain, fall back to the flow count for all other chains
579             cur_chain_flow_count = flows_per_chain
580         return configs
581
582     @staticmethod
583     def ip_to_int(addr):
584         """Convert an IP address from string to numeric."""
585         return struct.unpack("!I", socket.inet_aton(addr))[0]
586
587     @staticmethod
588     def int_to_ip(nvalue):
589         """Convert an IP address from numeric to string."""
590         return socket.inet_ntoa(struct.pack("!I", int(nvalue)))
591
592
593 class GeneratorConfig(object):
594     """Represents traffic configuration for currently running traffic profile."""
595
596     DEFAULT_IP_STEP = '0.0.0.1'
597     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
598
599     def __init__(self, config):
600         """Create a generator config."""
601         self.config = config
602         # name of the generator profile (normally trex or dummy)
603         # pick the default one if not specified explicitly from cli options
604         if not config.generator_profile:
605             config.generator_profile = config.traffic_generator.default_profile
606         # pick up the profile dict based on the name
607         gen_config = self.__match_generator_profile(config.traffic_generator,
608                                                     config.generator_profile)
609         self.gen_config = gen_config
610         # copy over fields from the dict
611         self.tool = gen_config.tool
612         self.ip = gen_config.ip
613         # overrides on config.cores and config.mbuf_factor
614         if config.cores:
615             self.cores = config.cores
616         else:
617             self.cores = gen_config.get('cores', 1)
618         # let's report the value actually used in the end
619         config.cores_used = self.cores
620         self.mbuf_factor = config.mbuf_factor
621         self.mbuf_64 = config.mbuf_64
622         self.hdrh = not config.disable_hdrh
623         if config.intf_speed:
624             # interface speed is overriden from the command line
625             self.intf_speed = config.intf_speed
626         elif gen_config.intf_speed:
627             # interface speed is overriden from the generator config
628             self.intf_speed = gen_config.intf_speed
629         else:
630             self.intf_speed = "auto"
631         if self.intf_speed == "auto" or self.intf_speed == "0":
632             # interface speed is discovered/provided by the traffic generator
633             self.intf_speed = 0
634         else:
635             self.intf_speed = bitmath.parse_string(self.intf_speed.replace('ps', '')).bits
636         self.name = gen_config.name
637         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
638         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
639         self.limit_memory = gen_config.get('limit_memory', 1024)
640         self.software_mode = gen_config.get('software_mode', False)
641         self.interfaces = gen_config.interfaces
642         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
643             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
644         self.service_chain = config.service_chain
645         self.service_chain_count = config.service_chain_count
646         self.flow_count = config.flow_count
647         self.host_name = gen_config.host_name
648         self.bidirectional = config.traffic.bidirectional
649         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
650         self.ip_addrs = gen_config.ip_addrs
651         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
652         self.tg_gateway_ip_addrs_step = \
653             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
654         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
655         self.gateway_ips = gen_config.gateway_ip_addrs
656         self.ip_src_static = gen_config.ip_src_static
657         self.vteps = gen_config.get('vteps')
658         self.devices = [Device(port, self) for port in [0, 1]]
659         # This should normally always be [0, 1]
660         self.ports = [device.port for device in self.devices]
661
662         # check that pci is not empty
663         if not gen_config.interfaces[0].get('pci', None) or \
664            not gen_config.interfaces[1].get('pci', None):
665             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
666
667         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
668         self.vlan_tagging = config.vlan_tagging
669
670         # needed for result/summarizer
671         config['tg-name'] = gen_config.name
672         config['tg-tool'] = self.tool
673
674     def to_json(self):
675         """Get json form to display the content into the overall result dict."""
676         return dict(self.gen_config)
677
678     def set_dest_macs(self, port_index, dest_macs):
679         """Set the list of dest MACs indexed by the chain id on given port.
680
681         port_index: the port for which dest macs must be set
682         dest_macs: a list of dest MACs indexed by chain id
683         """
684         if len(dest_macs) < self.config.service_chain_count:
685             raise TrafficClientException('Dest MAC list %s must have %d entries' %
686                                          (dest_macs, self.config.service_chain_count))
687         # only pass the first scc dest MACs
688         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
689         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
690
691     def set_vtep_dest_macs(self, port_index, dest_macs):
692         """Set the list of dest MACs indexed by the chain id on given port.
693
694         port_index: the port for which dest macs must be set
695         dest_macs: a list of dest MACs indexed by chain id
696         """
697         if len(dest_macs) != self.config.service_chain_count:
698             raise TrafficClientException('Dest MAC list %s must have %d entries' %
699                                          (dest_macs, self.config.service_chain_count))
700         self.devices[port_index].set_vtep_dst_mac(dest_macs)
701         LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs})
702
703     def get_dest_macs(self):
704         """Return the list of dest macs indexed by port."""
705         return [dev.get_dest_macs() for dev in self.devices]
706
707     def set_vlans(self, port_index, vlans):
708         """Set the list of vlans to use indexed by the chain id on given port.
709
710         port_index: the port for which VLANs must be set
711         vlans: a  list of vlan lists indexed by chain id
712         """
713         if len(vlans) != self.config.service_chain_count:
714             raise TrafficClientException('VLAN list %s must have %d entries' %
715                                          (vlans, self.config.service_chain_count))
716         self.devices[port_index].set_vlans(vlans)
717
718     def set_vxlans(self, port_index, vxlans):
719         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
720
721         port_index: the port for which VXLANs must be set
722         VXLANs: a  list of VNIs lists indexed by chain id
723         """
724         if len(vxlans) != self.config.service_chain_count:
725             raise TrafficClientException('VXLAN list %s must have %d entries' %
726                                          (vxlans, self.config.service_chain_count))
727         self.devices[port_index].set_vxlans(vxlans)
728
729     def set_mpls_inner_labels(self, port_index, labels):
730         """Set the list of MPLS Labels to use indexed by the chain id on given port.
731
732         port_index: the port for which Labels must be set
733         Labels: a list of Labels lists indexed by chain id
734         """
735         if len(labels) != self.config.service_chain_count:
736             raise TrafficClientException('Inner MPLS list %s must have %d entries' %
737                                          (labels, self.config.service_chain_count))
738         self.devices[port_index].set_mpls_inner_labels(labels)
739
740     def set_mpls_outer_labels(self, port_index, labels):
741         """Set the list of MPLS Labels to use indexed by the chain id on given port.
742
743         port_index: the port for which Labels must be set
744         Labels: a list of Labels lists indexed by chain id
745         """
746         if len(labels) != self.config.service_chain_count:
747             raise TrafficClientException('Outer MPLS list %s must have %d entries' %
748                                          (labels, self.config.service_chain_count))
749         self.devices[port_index].set_mpls_outer_labels(labels)
750
751     def set_vtep_vlan(self, port_index, vlan):
752         """Set the vtep vlan to use indexed by the chain id on given port.
753         port_index: the port for which VLAN must be set
754         """
755         self.devices[port_index].set_vtep_vlan(vlan)
756
757     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
758         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
759
760     def set_mpls_peers(self, port_index, src_ip, dst_ip):
761         self.devices[port_index].set_mpls_peers(src_ip, dst_ip)
762
763     @staticmethod
764     def __match_generator_profile(traffic_generator, generator_profile):
765         gen_config = AttrDict(traffic_generator)
766         gen_config.pop('default_profile')
767         gen_config.pop('generator_profile')
768         matching_profile = [profile for profile in traffic_generator.generator_profile if
769                             profile.name == generator_profile]
770         if len(matching_profile) != 1:
771             raise Exception('Traffic generator profile not found: ' + generator_profile)
772
773         gen_config.update(matching_profile[0])
774         return gen_config
775
776
777 class TrafficClient(object):
778     """Traffic generator client with NDR/PDR binary seearch."""
779
780     PORTS = [0, 1]
781
782     def __init__(self, config, notifier=None):
783         """Create a new TrafficClient instance.
784
785         config: nfvbench config
786         notifier: notifier (optional)
787
788         A new instance is created everytime the nfvbench config may have changed.
789         """
790         self.config = config
791         self.generator_config = GeneratorConfig(config)
792         self.tool = self.generator_config.tool
793         self.gen = self._get_generator()
794         self.notifier = notifier
795         self.interval_collector = None
796         self.iteration_collector = None
797         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
798                                     self.config.service_mode)
799         self.config.frame_sizes = self._get_frame_sizes()
800         self.run_config = {
801             'l2frame_size': None,
802             'duration_sec': self.config.duration_sec,
803             'bidirectional': True,
804             'rates': []  # to avoid unsbuscriptable-obj warning
805         }
806         self.current_total_rate = {'rate_percent': '10'}
807         if self.config.single_run:
808             self.current_total_rate = utils.parse_rate_str(self.config.rate)
809         self.ifstats = None
810         # Speed is either discovered when connecting to TG or set from config
811         # This variable is 0 if not yet discovered from TG or must be the speed of
812         # each interface in bits per second
813         self.intf_speed = self.generator_config.intf_speed
814
815     def _get_generator(self):
816         tool = self.tool.lower()
817         if tool == 'trex':
818             from .traffic_gen import trex_gen
819             return trex_gen.TRex(self)
820         if tool == 'dummy':
821             from .traffic_gen import dummy
822             return dummy.DummyTG(self)
823         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
824
825     def skip_sleep(self):
826         """Skip all sleeps when doing unit testing with dummy TG.
827
828         Must be overriden using mock.patch
829         """
830         return False
831
832     def _get_frame_sizes(self):
833         traffic_profile_name = self.config.traffic.profile
834         matching_profiles = [profile for profile in self.config.traffic_profile if
835                              profile.name == traffic_profile_name]
836         if len(matching_profiles) > 1:
837             raise TrafficClientException('Multiple traffic profiles with name: ' +
838                                          traffic_profile_name)
839         if not matching_profiles:
840             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
841         return matching_profiles[0].l2frame_size
842
843     def start_traffic_generator(self):
844         """Start the traffic generator process (traffic not started yet)."""
845         self.gen.connect()
846         # pick up the interface speed if it is not set from config
847         intf_speeds = self.gen.get_port_speed_gbps()
848         # convert Gbps unit into bps
849         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
850         if self.intf_speed:
851             # interface speed is overriden from config
852             if self.intf_speed != tg_if_speed:
853                 # Warn the user if the speed in the config is different
854                 LOG.warning(
855                     'Interface speed provided (%g Gbps) is different from actual speed (%d Gbps)',
856                     self.intf_speed / 1000000000.0, intf_speeds[0])
857         else:
858             # interface speed not provisioned by config
859             self.intf_speed = tg_if_speed
860             # also update the speed in the tg config
861             self.generator_config.intf_speed = tg_if_speed
862         # let's report detected and actually used interface speed
863         self.config.intf_speed_detected = tg_if_speed
864         self.config.intf_speed_used = self.intf_speed
865
866         # Save the traffic generator local MAC
867         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
868             device.set_mac(mac)
869
870     def setup(self):
871         """Set up the traffic client."""
872         self.gen.clear_stats()
873
874     def get_version(self):
875         """Get the traffic generator version."""
876         return self.gen.get_version()
877
878     def ensure_end_to_end(self):
879         """Ensure traffic generator receives packets it has transmitted.
880
881         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
882
883         VMs that are started and in active state may not pass traffic yet. It is imperative to make
884         sure that all VMs are passing traffic in both directions before starting any benchmarking.
885         To verify this, we need to send at a low frequency bi-directional packets and make sure
886         that we receive all packets back from all VMs. The number of flows is equal to 2 times
887         the number of chains (1 per direction) and we need to make sure we receive packets coming
888         from exactly 2 x chain count different source MAC addresses.
889
890         Example:
891             PVP chain (1 VM per chain)
892             N = 10 (number of chains)
893             Flow count = 20 (number of flows)
894             If the number of unique source MAC addresses from received packets is 20 then
895             all 10 VMs 10 VMs are in operational state.
896         """
897         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
898         # send 2pps on each chain and each direction
899         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
900         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
901                                 e2e=True)
902         # ensures enough traffic is coming back
903         retry_count = int((self.config.check_traffic_time_sec +
904                            self.config.generic_poll_sec - 1) / self.config.generic_poll_sec)
905
906         # we expect to see packets coming from 2 unique MAC per chain
907         # because there can be flooding in the case of shared net
908         # we must verify that packets from the right VMs are received
909         # and not just count unique src MAC
910         # create a dict of (port, chain) tuples indexed by dest mac
911         mac_map = {}
912         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
913             for chain, mac in enumerate(dest_macs):
914                 mac_map[mac] = (port, chain)
915         unique_src_mac_count = len(mac_map)
916         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
917             get_mac_id = lambda packet: packet['binary'][60:66]
918         elif self.config.vxlan:
919             get_mac_id = lambda packet: packet['binary'][56:62]
920         elif self.config.mpls:
921             get_mac_id = lambda packet: packet['binary'][24:30]
922             # mpls_transport_label = lambda packet: packet['binary'][14:18]
923         else:
924             get_mac_id = lambda packet: packet['binary'][6:12]
925         for it in range(retry_count):
926             self.gen.clear_stats()
927             self.gen.start_traffic()
928             self.gen.start_capture()
929             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
930                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
931                      it + 1, retry_count)
932             if not self.skip_sleep():
933                 time.sleep(self.config.generic_poll_sec)
934             self.gen.stop_traffic()
935             self.gen.fetch_capture_packets()
936             self.gen.stop_capture()
937             for packet in self.gen.packet_list:
938                 mac_id = get_mac_id(packet).decode('latin-1')
939                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
940                 if self.config.mpls:
941                     if src_mac in mac_map and self.is_mpls(packet):
942                         port, chain = mac_map[src_mac]
943                         LOG.info('Received mpls packet from mac: %s (chain=%d, port=%d)',
944                                  src_mac, chain, port)
945                         mac_map.pop(src_mac, None)
946                 else:
947                     if src_mac in mac_map and self.is_udp(packet):
948                         port, chain = mac_map[src_mac]
949                         LOG.info('Received udp packet from mac: %s (chain=%d, port=%d)',
950                                  src_mac, chain, port)
951                         mac_map.pop(src_mac, None)
952
953                 if not mac_map:
954                     LOG.info('End-to-end connectivity established')
955                     return
956             if self.config.l3_router and not self.config.no_arp:
957                 # In case of L3 traffic mode, routers are not able to route traffic
958                 # until VM interfaces are up and ARP requests are done
959                 LOG.info('Waiting for loopback service completely started...')
960                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
961                 self.ensure_arp_successful()
962         raise TrafficClientException('End-to-end connectivity cannot be ensured')
963
964     def is_udp(self, packet):
965         pkt = Ether(packet['binary'])
966         return UDP in pkt
967
968     def is_mpls(self, packet):
969         pkt = Ether(packet['binary'])
970         return MPLS in pkt
971
972     def ensure_arp_successful(self):
973         """Resolve all IP using ARP and throw an exception in case of failure."""
974         dest_macs = self.gen.resolve_arp()
975         if dest_macs:
976             # all dest macs are discovered, saved them into the generator config
977             if self.config.vxlan or self.config.mpls:
978                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
979                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
980             else:
981                 self.generator_config.set_dest_macs(0, dest_macs[0])
982                 self.generator_config.set_dest_macs(1, dest_macs[1])
983         else:
984             raise TrafficClientException('ARP cannot be resolved')
985
986     def set_traffic(self, frame_size, bidirectional):
987         """Reconfigure the traffic generator for a new frame size."""
988         self.run_config['bidirectional'] = bidirectional
989         self.run_config['l2frame_size'] = frame_size
990         self.run_config['rates'] = [self.get_per_direction_rate()]
991         if bidirectional:
992             self.run_config['rates'].append(self.get_per_direction_rate())
993         else:
994             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
995             if unidir_reverse_pps > 0:
996                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
997         # Fix for [NFVBENCH-67], convert the rate string to PPS
998         for idx, rate in enumerate(self.run_config['rates']):
999             if 'rate_pps' not in rate:
1000                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
1001
1002         self.gen.clear_streamblock()
1003
1004         if self.config.no_latency_streams:
1005             LOG.info("Latency streams are disabled")
1006         # in service mode, we must disable flow stats (e2e=True)
1007         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
1008                                 latency=not self.config.no_latency_streams,
1009                                 e2e=self.runner.service_mode)
1010
1011     def _modify_load(self, load):
1012         self.current_total_rate = {'rate_percent': str(load)}
1013         rate_per_direction = self.get_per_direction_rate()
1014
1015         self.gen.modify_rate(rate_per_direction, False)
1016         self.run_config['rates'][0] = rate_per_direction
1017         if self.run_config['bidirectional']:
1018             self.gen.modify_rate(rate_per_direction, True)
1019             self.run_config['rates'][1] = rate_per_direction
1020
1021     def get_ndr_and_pdr(self):
1022         """Start the NDR/PDR iteration and return the results."""
1023         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
1024         targets = {}
1025         if self.config.ndr_run:
1026             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
1027             targets['ndr'] = self.config.measurement.NDR
1028         if self.config.pdr_run:
1029             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
1030             targets['pdr'] = self.config.measurement.PDR
1031
1032         self.run_config['start_time'] = time.time()
1033         self.interval_collector = IntervalCollector(self.run_config['start_time'])
1034         self.interval_collector.attach_notifier(self.notifier)
1035         self.iteration_collector = IterationCollector(self.run_config['start_time'])
1036         results = {}
1037         self.__range_search(0.0, 200.0, targets, results)
1038
1039         results['iteration_stats'] = {
1040             'ndr_pdr': self.iteration_collector.get()
1041         }
1042
1043         if self.config.ndr_run:
1044             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
1045             results['ndr']['time_taken_sec'] = \
1046                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
1047             if self.config.pdr_run:
1048                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
1049                 results['pdr']['time_taken_sec'] = \
1050                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
1051         else:
1052             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
1053             results['pdr']['time_taken_sec'] = \
1054                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
1055         return results
1056
1057     def __get_dropped_rate(self, result):
1058         dropped_pkts = result['rx']['dropped_pkts']
1059         total_pkts = result['tx']['total_pkts']
1060         if not total_pkts:
1061             return float('inf')
1062         return float(dropped_pkts) / total_pkts * 100
1063
1064     def get_stats(self):
1065         """Collect final stats for previous run."""
1066         stats = self.gen.get_stats(self.ifstats)
1067         retDict = {'total_tx_rate': stats['total_tx_rate'],
1068                    'offered_tx_rate_bps': stats['offered_tx_rate_bps'],
1069                    'theoretical_tx_rate_bps': stats['theoretical_tx_rate_bps'],
1070                    'theoretical_tx_rate_pps': stats['theoretical_tx_rate_pps']}
1071
1072         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
1073         rx_keys = tx_keys + ['dropped_pkts']
1074
1075         for port in self.PORTS:
1076             port_stats = {'tx': {}, 'rx': {}}
1077             for key in tx_keys:
1078                 port_stats['tx'][key] = int(stats[port]['tx'][key])
1079             for key in rx_keys:
1080                 try:
1081                     port_stats['rx'][key] = int(stats[port]['rx'][key])
1082                 except ValueError:
1083                     port_stats['rx'][key] = 0
1084             port_stats['rx']['avg_delay_usec'] = cast_integer(
1085                 stats[port]['rx']['avg_delay_usec'])
1086             port_stats['rx']['min_delay_usec'] = cast_integer(
1087                 stats[port]['rx']['min_delay_usec'])
1088             port_stats['rx']['max_delay_usec'] = cast_integer(
1089                 stats[port]['rx']['max_delay_usec'])
1090             port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats)
1091             retDict[str(port)] = port_stats
1092
1093         ports = sorted(list(retDict.keys()), key=str)
1094         if self.run_config['bidirectional']:
1095             retDict['overall'] = {'tx': {}, 'rx': {}}
1096             for key in tx_keys:
1097                 retDict['overall']['tx'][key] = \
1098                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
1099             for key in rx_keys:
1100                 retDict['overall']['rx'][key] = \
1101                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
1102             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
1103                           retDict[ports[1]]['rx']['total_pkts']]
1104             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
1105                           retDict[ports[1]]['rx']['avg_delay_usec']]
1106             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
1107                           retDict[ports[1]]['rx']['max_delay_usec']]
1108             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
1109                           retDict[ports[1]]['rx']['min_delay_usec']]
1110             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
1111             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
1112             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
1113             for key in ['pkt_bit_rate', 'pkt_rate']:
1114                 for dirc in ['tx', 'rx']:
1115                     retDict['overall'][dirc][key] /= 2.0
1116         else:
1117             retDict['overall'] = retDict[ports[0]]
1118         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
1119
1120         if 'overall_hdrh' in stats:
1121             retDict['overall']['hdrh'] = stats.get('overall_hdrh', None)
1122             decoded_histogram = HdrHistogram.decode(retDict['overall']['hdrh'])
1123             retDict['overall']['rx']['lat_percentile'] = {}
1124             # override min max and avg from hdrh (only if histogram is valid)
1125             if decoded_histogram.get_total_count() != 0:
1126                 retDict['overall']['rx']['min_delay_usec'] = decoded_histogram.get_min_value()
1127                 retDict['overall']['rx']['max_delay_usec'] = decoded_histogram.get_max_value()
1128                 retDict['overall']['rx']['avg_delay_usec'] = decoded_histogram.get_mean_value()
1129                 for percentile in self.config.lat_percentiles:
1130                     retDict['overall']['rx']['lat_percentile'][percentile] = \
1131                         decoded_histogram.get_value_at_percentile(percentile)
1132             else:
1133                 for percentile in self.config.lat_percentiles:
1134                     retDict['overall']['rx']['lat_percentile'][percentile] = 'n/a'
1135         return retDict
1136
1137     def __convert_rates(self, rate):
1138         return utils.convert_rates(self.run_config['l2frame_size'],
1139                                    rate,
1140                                    self.intf_speed)
1141
1142     def __ndr_pdr_found(self, tag, load):
1143         rates = self.__convert_rates({'rate_percent': load})
1144         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
1145         last_stats = self.iteration_collector.peek()
1146         self.interval_collector.add_ndr_pdr(tag, last_stats)
1147
1148     def __format_output_stats(self, stats):
1149         for key in self.PORTS + ['overall']:
1150             key = str(key)
1151             interface = stats[key]
1152             stats[key] = {
1153                 'tx_pkts': interface['tx']['total_pkts'],
1154                 'rx_pkts': interface['rx']['total_pkts'],
1155                 'drop_percentage': interface['drop_rate_percent'],
1156                 'drop_pct': interface['rx']['dropped_pkts'],
1157                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
1158                 'max_delay_usec': interface['rx']['max_delay_usec'],
1159                 'min_delay_usec': interface['rx']['min_delay_usec'],
1160             }
1161
1162             if key == 'overall':
1163                 if 'hdrh' in interface:
1164                     stats[key]['hdrh'] = interface.get('hdrh', None)
1165                     decoded_histogram = HdrHistogram.decode(stats[key]['hdrh'])
1166                     stats[key]['lat_percentile'] = {}
1167                     # override min max and avg from hdrh (only if histogram is valid)
1168                     if decoded_histogram.get_total_count() != 0:
1169                         stats[key]['min_delay_usec'] = decoded_histogram.get_min_value()
1170                         stats[key]['max_delay_usec'] = decoded_histogram.get_max_value()
1171                         stats[key]['avg_delay_usec'] = decoded_histogram.get_mean_value()
1172                         for percentile in self.config.lat_percentiles:
1173                             stats[key]['lat_percentile'][percentile] = decoded_histogram.\
1174                                 get_value_at_percentile(percentile)
1175                     else:
1176                         for percentile in self.config.lat_percentiles:
1177                             stats[key]['lat_percentile'][percentile] = 'n/a'
1178         return stats
1179
1180     def __targets_found(self, rate, targets, results):
1181         for tag, target in list(targets.items()):
1182             LOG.info('Found %s (%s) load: %s', tag, target, rate)
1183             self.__ndr_pdr_found(tag, rate)
1184             results[tag]['timestamp_sec'] = time.time()
1185
1186     def __range_search(self, left, right, targets, results):
1187         """Perform a binary search for a list of targets inside a [left..right] range or rate.
1188
1189         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
1190                 indicating the rate to send on each interface
1191         right   the right side of the range to search as a % of line rate
1192                 indicating the rate to send on each interface
1193         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
1194                 ('ndr', 'pdr')
1195         results a dict to store results
1196         """
1197         if not targets:
1198             return
1199         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
1200
1201         # Terminate search when gap is less than load epsilon
1202         if right - left < self.config.measurement.load_epsilon:
1203             self.__targets_found(left, targets, results)
1204             return
1205
1206         # Obtain the average drop rate in for middle load
1207         middle = (left + right) / 2.0
1208         try:
1209             stats, rates = self.__run_search_iteration(middle)
1210         except STLError:
1211             LOG.exception("Got exception from traffic generator during binary search")
1212             self.__targets_found(left, targets, results)
1213             return
1214         # Split target dicts based on the avg drop rate
1215         left_targets = {}
1216         right_targets = {}
1217         for tag, target in list(targets.items()):
1218             if stats['overall']['drop_rate_percent'] <= target:
1219                 # record the best possible rate found for this target
1220                 results[tag] = rates
1221                 results[tag].update({
1222                     'load_percent_per_direction': middle,
1223                     'stats': self.__format_output_stats(dict(stats)),
1224                     'timestamp_sec': None
1225                 })
1226                 right_targets[tag] = target
1227             else:
1228                 # initialize to 0 all fields of result for
1229                 # the worst case scenario of the binary search (if ndr/pdr is not found)
1230                 if tag not in results:
1231                     results[tag] = dict.fromkeys(rates, 0)
1232                     empty_stats = self.__format_output_stats(dict(stats))
1233                     for key in empty_stats:
1234                         if isinstance(empty_stats[key], dict):
1235                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
1236                         else:
1237                             empty_stats[key] = 0
1238                     results[tag].update({
1239                         'load_percent_per_direction': 0,
1240                         'stats': empty_stats,
1241                         'timestamp_sec': None
1242                     })
1243                 left_targets[tag] = target
1244
1245         # search lower half
1246         self.__range_search(left, middle, left_targets, results)
1247
1248         # search upper half only if the upper rate does not exceed
1249         # 100%, this only happens when the first search at 100%
1250         # yields a DR that is < target DR
1251         if middle >= 100:
1252             self.__targets_found(100, right_targets, results)
1253         else:
1254             self.__range_search(middle, right, right_targets, results)
1255
1256     def __run_search_iteration(self, rate):
1257         """Run one iteration at the given rate level.
1258
1259         rate: the rate to send on each port in percent (0 to 100)
1260         """
1261         self._modify_load(rate)
1262
1263         # There used to be a inconsistency in case of interface speed override.
1264         # The emulated 'intf_speed' value is unknown to the T-Rex generator which
1265         # refers to the detected line rate for converting relative traffic loads.
1266         # Therefore, we need to convert actual rates here, in terms of packets/s.
1267
1268         for idx, str_rate in enumerate(self.gen.rates):
1269             if str_rate.endswith('%'):
1270                 float_rate = float(str_rate.replace('%', '').strip())
1271                 pps_rate = self.__convert_rates({'rate_percent': float_rate})['rate_pps']
1272                 self.gen.rates[idx] = str(pps_rate) + 'pps'
1273
1274         # poll interval stats and collect them
1275         for stats in self.run_traffic():
1276             self.interval_collector.add(stats)
1277             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
1278             if time_elapsed_ratio >= 1:
1279                 self.cancel_traffic()
1280                 if not self.skip_sleep():
1281                     time.sleep(self.config.pause_sec)
1282         self.interval_collector.reset()
1283
1284         # get stats from the run
1285         stats = self.runner.client.get_stats()
1286         current_traffic_config = self._get_traffic_config()
1287         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
1288                                         stats['total_tx_rate'])
1289         if warning is not None:
1290             stats['warning'] = warning
1291
1292         # save reliable stats from whole iteration
1293         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
1294         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
1295         return stats, current_traffic_config['direction-total']
1296
1297     def log_stats(self, stats):
1298         """Log estimated stats during run."""
1299         # Calculate a rolling drop rate based on differential to
1300         # the previous reading
1301         cur_tx = stats['overall']['tx']['total_pkts']
1302         cur_rx = stats['overall']['rx']['total_pkts']
1303         delta_tx = cur_tx - self.prev_tx
1304         delta_rx = cur_rx - self.prev_rx
1305         drops = delta_tx - delta_rx
1306         if delta_tx == 0:
1307             LOG.info("\x1b[1mConfiguration issue!\x1b[0m (no transmission)")
1308             sys.exit(0)
1309         drop_rate_pct = 100 * (delta_tx - delta_rx)/delta_tx
1310         self.prev_tx = cur_tx
1311         self.prev_rx = cur_rx
1312         LOG.info('TX: %15s; RX: %15s; (Est.) Dropped: %12s; Drop rate: %8.4f%%',
1313                  format(cur_tx, ',d'),
1314                  format(cur_rx, ',d'),
1315                  format(drops, ',d'),
1316                  drop_rate_pct)
1317
1318     def run_traffic(self):
1319         """Start traffic and return intermediate stats for each interval."""
1320         stats = self.runner.run()
1321         self.prev_tx = 0
1322         self.prev_rx = 0
1323         while self.runner.is_running:
1324             self.log_stats(stats)
1325             yield stats
1326             stats = self.runner.poll_stats()
1327             if stats is None:
1328                 return
1329         self.log_stats(stats)
1330         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
1331         yield stats
1332
1333     def cancel_traffic(self):
1334         """Stop traffic."""
1335         self.runner.stop()
1336
1337     def _get_traffic_config(self):
1338         config = {}
1339         load_total = 0.0
1340         bps_total = 0.0
1341         pps_total = 0.0
1342         for idx, rate in enumerate(self.run_config['rates']):
1343             key = 'direction-forward' if idx == 0 else 'direction-reverse'
1344             config[key] = {
1345                 'l2frame_size': self.run_config['l2frame_size'],
1346                 'duration_sec': self.run_config['duration_sec']
1347             }
1348             config[key].update(rate)
1349             config[key].update(self.__convert_rates(rate))
1350             load_total += float(config[key]['rate_percent'])
1351             bps_total += float(config[key]['rate_bps'])
1352             pps_total += float(config[key]['rate_pps'])
1353         config['direction-total'] = dict(config['direction-forward'])
1354         config['direction-total'].update({
1355             'rate_percent': load_total,
1356             'rate_pps': cast_integer(pps_total),
1357             'rate_bps': bps_total
1358         })
1359
1360         return config
1361
1362     def get_run_config(self, results):
1363         """Return configuration which was used for the last run."""
1364         r = {}
1365         # because we want each direction to have the far end RX rates,
1366         # use the far end index (1-idx) to retrieve the RX rates
1367         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
1368             tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec
1369             rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec
1370             r[key] = {
1371                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
1372                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
1373                 "rx": self.__convert_rates({'rate_pps': rx_rate})
1374             }
1375
1376         total = {}
1377         for direction in ['orig', 'tx', 'rx']:
1378             total[direction] = {}
1379             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
1380                 total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())])
1381
1382         r['direction-total'] = total
1383         return r
1384
1385     def insert_interface_stats(self, pps_list):
1386         """Insert interface stats to a list of packet path stats.
1387
1388         pps_list: a list of packet path stats instances indexed by chain index
1389
1390         This function will insert the packet path stats for the traffic gen ports 0 and 1
1391         with itemized per chain tx/rx counters.
1392         There will be as many packet path stats as chains.
1393         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1394         self.pps_list:
1395         [
1396         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1397         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1398         ...
1399         ]
1400         """
1401         def get_if_stats(chain_idx):
1402             return [InterfaceStats('p' + str(port), self.tool)
1403                     for port in range(2)]
1404         # keep the list of list of interface stats indexed by the chain id
1405         self.ifstats = [get_if_stats(chain_idx)
1406                         for chain_idx in range(self.config.service_chain_count)]
1407         # note that we need to make a copy of the ifs list so that any modification in the
1408         # list from pps will not change the list saved in self.ifstats
1409         self.pps_list = [PacketPathStats(self.config, list(ifs)) for ifs in self.ifstats]
1410         # insert the corresponding pps in the passed list
1411         pps_list.extend(self.pps_list)
1412
1413     def update_interface_stats(self, diff=False):
1414         """Update all interface stats.
1415
1416         diff: if False, simply refresh the interface stats values with latest values
1417               if True, diff the interface stats with the latest values
1418         Make sure that the interface stats inserted in insert_interface_stats() are updated
1419         with proper values.
1420         self.ifstats:
1421         [
1422         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1423         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1424         ...
1425         ]
1426         """
1427         if diff:
1428             stats = self.gen.get_stats(self.ifstats)
1429             for chain_idx, ifs in enumerate(self.ifstats):
1430                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1431                 # corresponding to the
1432                 # port 0 and port 1 for the given chain_idx
1433                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1434                 # interface stats for the pps because it could have been modified to contain
1435                 # additional interface stats
1436                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1437             # special handling for vxlan
1438             # in case of vxlan, flow stats are not available so all rx counters will be
1439             # zeros when the total rx port counter is non zero.
1440             # in that case,
1441             for port in range(2):
1442                 total_rx = 0
1443                 for ifs in self.ifstats:
1444                     total_rx += ifs[port].rx
1445                 if total_rx == 0:
1446                     # check if the total port rx from Trex is also zero
1447                     port_rx = stats[port]['rx']['total_pkts']
1448                     if port_rx:
1449                         # the total rx for all chains from port level stats is non zero
1450                         # which means that the per-chain stats are not available
1451                         if len(self.ifstats) == 1:
1452                             # only one chain, simply report the port level rx to the chain rx stats
1453                             self.ifstats[0][port].rx = port_rx
1454                         else:
1455                             for ifs in self.ifstats:
1456                                 # mark this data as unavailable
1457                                 ifs[port].rx = None
1458                             # pitch in the total rx only in the last chain pps
1459                             self.ifstats[-1][port].rx_total = port_rx
1460
1461     @staticmethod
1462     def compare_tx_rates(required, actual):
1463         """Compare the actual TX rate to the required TX rate."""
1464         threshold = 0.9
1465         are_different = False
1466         try:
1467             if float(actual) / required < threshold:
1468                 are_different = True
1469         except ZeroDivisionError:
1470             are_different = True
1471
1472         if are_different:
1473             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1474                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1475                   "to achieve the requested TX rate.".format(r=required, a=actual)
1476             LOG.info(msg)
1477             return msg
1478
1479         return None
1480
1481     def get_per_direction_rate(self):
1482         """Get the rate for each direction."""
1483         divisor = 2 if self.run_config['bidirectional'] else 1
1484         if 'rate_percent' in self.current_total_rate:
1485             # don't split rate if it's percentage
1486             divisor = 1
1487
1488         return utils.divide_rate(self.current_total_rate, divisor)
1489
1490     def close(self):
1491         """Close this instance."""
1492         try:
1493             self.gen.stop_traffic()
1494         except Exception:
1495             pass
1496         self.gen.clear_stats()
1497         self.gen.cleanup()