NFVBENCH-186: Fix a bug: NDR mode now fully supports 'intf_speed' overriding
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16 import socket
17 import struct
18 import time
19
20 from attrdict import AttrDict
21 import bitmath
22 from hdrh.histogram import HdrHistogram
23 from netaddr import IPNetwork
24 # pylint: disable=import-error
25 from trex.stl.api import Ether
26 from trex.stl.api import STLError
27 from trex.stl.api import UDP
28 # pylint: disable=wrong-import-order
29 from scapy.contrib.mpls import MPLS  # flake8: noqa
30 # pylint: enable=wrong-import-order
31 # pylint: enable=import-error
32
33 from .log import LOG
34 from .packet_stats import InterfaceStats
35 from .packet_stats import PacketPathStats
36 from .stats_collector import IntervalCollector
37 from .stats_collector import IterationCollector
38 from .traffic_gen import traffic_utils as utils
39 from .utils import cast_integer, find_max_size, find_tuples_equal_to_lcm_value, get_divisors, lcm
40
41 class TrafficClientException(Exception):
42     """Generic traffic client exception."""
43
44 class TrafficRunner(object):
45     """Serialize various steps required to run traffic."""
46
47     def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
48         """Create a traffic runner."""
49         self.client = client
50         self.start_time = None
51         self.duration_sec = duration_sec
52         self.interval_sec = interval_sec
53         self.service_mode = service_mode
54
55     def run(self):
56         """Clear stats and instruct the traffic generator to start generating traffic."""
57         if self.is_running():
58             return None
59         LOG.info('Running traffic generator')
60         self.client.gen.clear_stats()
61         # Debug use only: the service_mode flag may have been set in
62         # the configuration, in order to enable the 'service' mode
63         # in the trex generator, before starting the traffic (run).
64         # From this point, a T-rex console (launched in readonly mode) would
65         # then be able to capture the transmitted and/or received traffic.
66         self.client.gen.set_service_mode(enabled=self.service_mode)
67         LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
68         self.client.gen.start_traffic()
69         self.start_time = time.time()
70         return self.poll_stats()
71
72     def stop(self):
73         """Stop the current run and instruct the traffic generator to stop traffic."""
74         if self.is_running():
75             self.start_time = None
76             self.client.gen.stop_traffic()
77
78     def is_running(self):
79         """Check if a run is still pending."""
80         return self.start_time is not None
81
82     def time_elapsed(self):
83         """Return time elapsed since start of run."""
84         if self.is_running():
85             return time.time() - self.start_time
86         return self.duration_sec
87
88     def poll_stats(self):
89         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
90
91         return: latest stats or None if traffic is stopped
92         """
93         if not self.is_running():
94             return None
95         if self.client.skip_sleep():
96             self.stop()
97             return self.client.get_stats()
98         time_elapsed = self.time_elapsed()
99         if time_elapsed > self.duration_sec:
100             self.stop()
101             return None
102         time_left = self.duration_sec - time_elapsed
103         if self.interval_sec > 0.0:
104             if time_left <= self.interval_sec:
105                 time.sleep(time_left)
106                 self.stop()
107             else:
108                 time.sleep(self.interval_sec)
109         else:
110             time.sleep(self.duration_sec)
111             self.stop()
112         return self.client.get_stats()
113
114
115 class IpBlock(object):
116     """Manage a block of IP addresses."""
117
118     def __init__(self, base_ip, step_ip, count_ip):
119         """Create an IP block."""
120         self.base_ip_int = Device.ip_to_int(base_ip)
121         if step_ip == 'random':
122             step_ip = '0.0.0.1'
123         self.step = Device.ip_to_int(step_ip)
124         self.max_available = count_ip
125         self.next_free = 0
126
127     def get_ip(self, index=0):
128         """Return the IP address at given index."""
129         if index < 0 or index >= self.max_available:
130             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
131         return Device.int_to_ip(self.base_ip_int + index * self.step)
132
133     def get_ip_from_chain_first_ip(self, first_ip, index=0):
134         """Return the IP address at given index starting from chain first ip."""
135         if index < 0 or index >= self.max_available:
136             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
137         return Device.int_to_ip(first_ip + index * self.step)
138
139     def reserve_ip_range(self, count):
140         """Reserve a range of count consecutive IP addresses spaced by step.
141         """
142         if self.next_free + count > self.max_available:
143             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
144                              (self.next_free,
145                               self.max_available,
146                               count))
147         first_ip = self.get_ip(self.next_free)
148         last_ip = self.get_ip(self.next_free + count - 1)
149         self.next_free += count
150         return (first_ip, last_ip)
151
152     def reset_reservation(self):
153         """Reset all reservations and restart with a completely unused IP block."""
154         self.next_free = 0
155
156
157 class UdpPorts(object):
158
159     def __init__(self, src_min, src_max, dst_min, dst_max, udp_src_size, udp_dst_size, step):
160
161         self.src_min = int(src_min)
162         self.src_max = int(src_max)
163         self.dst_min = int(dst_min)
164         self.dst_max = int(dst_max)
165         self.udp_src_size = udp_src_size
166         self.udp_dst_size = udp_dst_size
167         self.step = step
168
169     def get_src_max(self, index=0):
170         """Return the UDP src port at given index."""
171         return int(self.src_min) + index * int(self.step)
172
173     def get_dst_max(self, index=0):
174         """Return the UDP dst port at given index."""
175         return int(self.dst_min) + index * int(self.step)
176
177
178 class Device(object):
179     """Represent a port device and all information associated to it.
180
181     In the curent version we only support 2 port devices for the traffic generator
182     identified as port 0 or port 1.
183     """
184
185     def __init__(self, port, generator_config):
186         """Create a new device for a given port."""
187         self.generator_config = generator_config
188         self.chain_count = generator_config.service_chain_count
189         if generator_config.bidirectional:
190             self.flow_count = generator_config.flow_count / 2
191         else:
192             self.flow_count = generator_config.flow_count
193
194         self.port = port
195         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
196         self.vtep_vlan = None
197         self.vtep_src_mac = None
198         self.vxlan = False
199         self.mpls = False
200         self.inner_labels = None
201         self.outer_labels = None
202         self.pci = generator_config.interfaces[port].pci
203         self.mac = None
204         self.dest_macs = None
205         self.vtep_dst_mac = None
206         self.vtep_dst_ip = None
207         if generator_config.vteps is None:
208             self.vtep_src_ip = None
209         else:
210             self.vtep_src_ip = generator_config.vteps[port]
211         self.vnis = None
212         self.vlans = None
213         self.ip_addrs = generator_config.ip_addrs[port]
214         self.ip_src_static = generator_config.ip_src_static
215         self.ip_addrs_step = generator_config.ip_addrs_step
216         if self.ip_addrs_step == 'random':
217             # Set step to 1 to calculate the IP range size (see check_range_size below)
218             step = '0.0.0.1'
219         else:
220             step = self.ip_addrs_step
221         self.ip_size = self.check_range_size(IPNetwork(self.ip_addrs).size, Device.ip_to_int(step))
222         self.ip = str(IPNetwork(self.ip_addrs).network)
223         ip_addrs_left = generator_config.ip_addrs[0]
224         ip_addrs_right = generator_config.ip_addrs[1]
225         self.ip_addrs_size = {
226             'left': self.check_range_size(IPNetwork(ip_addrs_left).size, Device.ip_to_int(step)),
227             'right': self.check_range_size(IPNetwork(ip_addrs_right).size, Device.ip_to_int(step))}
228         udp_src_port = generator_config.gen_config.udp_src_port
229         if udp_src_port is None:
230             udp_src_port = 53
231         udp_dst_port = generator_config.gen_config.udp_dst_port
232         if udp_dst_port is None:
233             udp_dst_port = 53
234         src_max, src_min = self.define_udp_range(udp_src_port, 'udp_src_port')
235         dst_max, dst_min = self.define_udp_range(udp_dst_port, 'udp_dst_port')
236         if generator_config.gen_config.udp_port_step == 'random':
237             # Set step to 1 to calculate the UDP range size
238             udp_step = 1
239         else:
240             udp_step = int(generator_config.gen_config.udp_port_step)
241         udp_src_size = self.check_range_size(int(src_max) - int(src_min) + 1, udp_step)
242         udp_dst_size = self.check_range_size(int(dst_max) - int(dst_min) + 1, udp_step)
243         lcm_port = lcm(udp_src_size, udp_dst_size)
244         if self.ip_src_static is True:
245             lcm_ip = lcm(1, min(self.ip_addrs_size['left'], self.ip_addrs_size['right']))
246         else:
247             lcm_ip = lcm(self.ip_addrs_size['left'], self.ip_addrs_size['right'])
248         flow_max = lcm(lcm_port, lcm_ip)
249         if self.flow_count > flow_max:
250             raise TrafficClientException('Trying to set unachievable traffic (%d > %d)' %
251                                          (self.flow_count, flow_max))
252
253         self.udp_ports = UdpPorts(src_min, src_max, dst_min, dst_max, udp_src_size, udp_dst_size,
254                                   generator_config.gen_config.udp_port_step)
255
256         self.ip_block = IpBlock(self.ip, step, self.ip_size)
257
258         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
259                                    generator_config.gateway_ip_addrs_step,
260                                    self.chain_count)
261         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
262         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
263                                       generator_config.tg_gateway_ip_addrs_step,
264                                       self.chain_count)
265
266     def limit_ip_udp_ranges(self, peer_ip_size, cur_chain_flow_count):
267         # init to min value in case of no matching values found with lcm calculation
268         new_src_ip_size = 1
269         new_peer_ip_size = 1
270         new_src_udp_size = 1
271         new_dst_udp_size = 1
272
273         if self.ip_src_static is True:
274             src_ip_size = 1
275         else:
276             src_ip_size = self.ip_size
277         ip_src_divisors = list(get_divisors(src_ip_size))
278         ip_dst_divisors = list(get_divisors(peer_ip_size))
279         udp_src_divisors = list(get_divisors(self.udp_ports.udp_src_size))
280         udp_dst_divisors = list(get_divisors(self.udp_ports.udp_dst_size))
281         fc = int(cur_chain_flow_count)
282         tuples_ip = list(find_tuples_equal_to_lcm_value(ip_src_divisors, ip_dst_divisors, fc))
283         tuples_udp = list(find_tuples_equal_to_lcm_value(udp_src_divisors, udp_dst_divisors, fc))
284
285         if tuples_ip:
286             new_src_ip_size = tuples_ip[-1][0]
287             new_peer_ip_size = tuples_ip[-1][1]
288
289         if tuples_udp:
290             new_src_udp_size = tuples_udp[-1][0]
291             new_dst_udp_size = tuples_udp[-1][1]
292
293         tuples_src = []
294         tuples_dst = []
295         if not tuples_ip and not tuples_udp:
296             # in case of not divisors in common matching LCM value (i.e. requested flow count)
297             # try to find an accurate UDP range to fit requested flow count
298             udp_src_int = range(self.udp_ports.src_min, self.udp_ports.src_max)
299             udp_dst_int = range(self.udp_ports.dst_min, self.udp_ports.dst_max)
300             tuples_src = list(find_tuples_equal_to_lcm_value(ip_src_divisors, udp_src_int, fc))
301             tuples_dst = list(find_tuples_equal_to_lcm_value(ip_dst_divisors, udp_dst_int, fc))
302
303             if not tuples_src and not tuples_dst:
304                 # iterate IP and UDP ranges to find a tuple that match flow count values
305                 src_ip_range = range(1,src_ip_size)
306                 dst_ip_range = range(1, peer_ip_size)
307                 tuples_src = list(find_tuples_equal_to_lcm_value(src_ip_range, udp_src_int, fc))
308                 tuples_dst = list(find_tuples_equal_to_lcm_value(dst_ip_range, udp_dst_int, fc))
309
310         if tuples_src or tuples_dst:
311             if tuples_src:
312                 new_src_ip_size = tuples_src[-1][0]
313                 new_src_udp_size = tuples_src[-1][1]
314             if tuples_dst:
315                 new_peer_ip_size = tuples_dst[-1][0]
316                 new_dst_udp_size = tuples_dst[-1][1]
317         else:
318             if not tuples_ip:
319                 if src_ip_size != 1:
320                     if src_ip_size > fc:
321                         new_src_ip_size = fc
322                     else:
323                         new_src_ip_size = find_max_size(src_ip_size, tuples_udp, fc)
324                 if peer_ip_size != 1:
325                     if peer_ip_size > fc:
326                         new_peer_ip_size = fc
327                     else:
328                         new_peer_ip_size = find_max_size(peer_ip_size, tuples_udp, fc)
329
330             if not tuples_udp:
331                 if self.udp_ports.udp_src_size != 1:
332                     if self.udp_ports.udp_src_size > fc:
333                         new_src_udp_size = fc
334                     else:
335                         new_src_udp_size = find_max_size(self.udp_ports.udp_src_size,
336                                                          tuples_ip, fc)
337                 if self.udp_ports.udp_dst_size != 1:
338                     if self.udp_ports.udp_dst_size > fc:
339                         new_dst_udp_size = fc
340                     else:
341                         new_dst_udp_size = find_max_size(self.udp_ports.udp_dst_size,
342                                                          tuples_ip, fc)
343         max_possible_flows = lcm(lcm(new_src_ip_size, new_peer_ip_size),
344                                  lcm(new_src_udp_size, new_dst_udp_size))
345
346         LOG.debug("IP dst size: %d", new_peer_ip_size)
347         LOG.debug("LCM IP: %d", lcm(new_src_ip_size, new_peer_ip_size))
348         LOG.debug("LCM UDP: %d", lcm(new_src_udp_size, new_dst_udp_size))
349         LOG.debug("Global LCM: %d", max_possible_flows)
350         LOG.debug("IP src size: %d, IP dst size: %d, UDP src size: %d, UDP dst size: %d",
351                   new_src_ip_size, new_peer_ip_size, self.udp_ports.udp_src_size,
352                   self.udp_ports.udp_dst_size)
353         if not max_possible_flows == cur_chain_flow_count:
354             if (self.ip_addrs_step != '0.0.0.1' or self.udp_ports.step != '1') and not (
355                     self.ip_addrs_step == 'random' and self.udp_ports.step == 'random'):
356                 LOG.warning("Current values of ip_addrs_step and/or udp_port_step properties "
357                             "do not allow to control an accurate flow count. "
358                             "Values will be overridden as follows:")
359                 if self.ip_addrs_step != '0.0.0.1':
360                     LOG.info("ip_addrs_step='0.0.0.1' (previous value: ip_addrs_step='%s')",
361                              self.ip_addrs_step)
362                     self.ip_addrs_step = '0.0.0.1'
363
364                 if self.udp_ports.step != '1':
365                     LOG.info("udp_port_step='1' (previous value: udp_port_step='%s')",
366                              self.udp_ports.step)
367                     self.udp_ports.step = '1'
368                     # override config for not logging random step warning message in trex_gen.py
369                     self.generator_config.gen_config.udp_port_step = self.udp_ports.step
370             else:
371                 LOG.error("Current values of ip_addrs_step and udp_port_step properties "
372                           "do not allow to control an accurate flow count.")
373         else:
374             src_ip_size = new_src_ip_size
375             peer_ip_size = new_peer_ip_size
376             self.udp_ports.udp_src_size = new_src_udp_size
377             self.udp_ports.udp_dst_size = new_dst_udp_size
378         return src_ip_size, peer_ip_size
379
380     @staticmethod
381     def define_udp_range(udp_port, property_name):
382         if isinstance(udp_port, int):
383             min = udp_port
384             max = min
385         elif isinstance(udp_port, tuple):
386             min = udp_port[0]
387             max = udp_port[1]
388         else:
389             raise TrafficClientException('Invalid %s property value (53 or [\'53\',\'1024\'])'
390                                          % property_name)
391         return max, min
392
393
394     @staticmethod
395     def check_range_size(range_size, step):
396         """Check and set the available IPs or UDP ports, considering the step."""
397         try:
398             if range_size % step == 0:
399                 value = range_size // step
400             else:
401                 value = range_size // step + 1
402             return value
403         except ZeroDivisionError:
404             raise ZeroDivisionError("step can't be zero !") from ZeroDivisionError
405
406     def set_mac(self, mac):
407         """Set the local MAC for this port device."""
408         if mac is None:
409             raise TrafficClientException('Trying to set traffic generator MAC address as None')
410         self.mac = mac
411
412     def get_peer_device(self):
413         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
414         return self.generator_config.devices[1 - self.port]
415
416     def set_vtep_dst_mac(self, dest_macs):
417         """Set the list of dest MACs indexed by the chain id.
418
419         This is only called in 2 cases:
420         - VM macs discovered using openstack API
421         - dest MACs provisioned in config file
422         """
423         self.vtep_dst_mac = list(map(str, dest_macs))
424
425     def set_dest_macs(self, dest_macs):
426         """Set the list of dest MACs indexed by the chain id.
427
428         This is only called in 2 cases:
429         - VM macs discovered using openstack API
430         - dest MACs provisioned in config file
431         """
432         self.dest_macs = list(map(str, dest_macs))
433
434     def get_dest_macs(self):
435         """Get the list of dest macs for this device.
436
437         If set_dest_macs was never called, assumes l2-loopback and return
438         a list of peer mac (as many as chains but normally only 1 chain)
439         """
440         if self.dest_macs:
441             return self.dest_macs
442         # assume this is l2-loopback
443         return [self.get_peer_device().mac] * self.chain_count
444
445     def set_vlans(self, vlans):
446         """Set the list of vlans to use indexed by the chain id."""
447         self.vlans = vlans
448         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
449
450     def set_vtep_vlan(self, vlan):
451         """Set the vtep vlan to use indexed by specific port."""
452         self.vtep_vlan = vlan
453         self.vxlan = True
454         self.vlan_tagging = None
455         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
456
457     def set_vxlan_endpoints(self, src_ip, dst_ip):
458         self.vtep_dst_ip = dst_ip
459         self.vtep_src_ip = src_ip
460         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
461                  self.vtep_src_ip, self.vtep_dst_ip)
462
463     def set_mpls_peers(self, src_ip, dst_ip):
464         self.mpls = True
465         self.vtep_dst_ip = dst_ip
466         self.vtep_src_ip = src_ip
467         LOG.info("Port %d: src_mpls_vtep %s, mpls_peer_ip %s", self.port,
468                  self.vtep_src_ip, self.vtep_dst_ip)
469
470     def set_vxlans(self, vnis):
471         self.vnis = vnis
472         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
473
474     def set_mpls_inner_labels(self, labels):
475         self.inner_labels = labels
476         LOG.info("Port %d: MPLS Inner Labels %s", self.port, self.inner_labels)
477
478     def set_mpls_outer_labels(self, labels):
479         self.outer_labels = labels
480         LOG.info("Port %d: MPLS Outer Labels %s", self.port, self.outer_labels)
481
482     def set_gw_ip(self, gateway_ip):
483         self.gw_ip_block = IpBlock(gateway_ip,
484                                    self.generator_config.gateway_ip_addrs_step,
485                                    self.chain_count)
486
487     def get_gw_ip(self, chain_index):
488         """Retrieve the IP address assigned for the gateway of a given chain."""
489         return self.gw_ip_block.get_ip(chain_index)
490
491     def get_stream_configs(self):
492         """Get the stream config for a given chain on this device.
493
494         Called by the traffic generator driver to program the traffic generator properly
495         before generating traffic
496         """
497         configs = []
498         # exact flow count for each chain is calculated as follows:
499         # - all chains except the first will have the same flow count
500         #   calculated as (total_flows + chain_count - 1) / chain_count
501         # - the first chain will have the remainder
502         # example 11 flows and 3 chains => 3, 4, 4
503         flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count)
504         cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1))
505
506         peer = self.get_peer_device()
507         self.ip_block.reset_reservation()
508         peer.ip_block.reset_reservation()
509         dest_macs = self.get_dest_macs()
510
511         # limit ranges of UDP ports and IP to avoid overflow of the number of flows
512         peer_size = peer.ip_size // self.chain_count
513
514         for chain_idx in range(self.chain_count):
515             src_ip_size, peer_ip_size = self.limit_ip_udp_ranges(peer_size, cur_chain_flow_count)
516
517             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range \
518                 (src_ip_size)
519             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range \
520                 (peer_ip_size)
521
522             if self.ip_addrs_step != 'random':
523                 src_ip_last = self.ip_block.get_ip_from_chain_first_ip(
524                     Device.ip_to_int(src_ip_first), src_ip_size - 1)
525                 dst_ip_last = peer.ip_block.get_ip_from_chain_first_ip(
526                     Device.ip_to_int(dst_ip_first), peer_ip_size - 1)
527             if self.udp_ports.step != 'random':
528                 self.udp_ports.src_max = self.udp_ports.get_src_max(self.udp_ports.udp_src_size - 1)
529                 self.udp_ports.dst_max = self.udp_ports.get_dst_max(self.udp_ports.udp_dst_size - 1)
530             if self.ip_src_static:
531                 src_ip_last = src_ip_first
532
533             LOG.info("Port %d, chain %d: IP src range [%s,%s]", self.port, chain_idx,
534                      src_ip_first, src_ip_last)
535             LOG.info("Port %d, chain %d: IP dst range [%s,%s]", self.port, chain_idx,
536                      dst_ip_first, dst_ip_last)
537             LOG.info("Port %d, chain %d: UDP src range [%s,%s]", self.port, chain_idx,
538                      self.udp_ports.src_min, self.udp_ports.src_max)
539             LOG.info("Port %d, chain %d: UDP dst range [%s,%s]", self.port, chain_idx,
540                      self.udp_ports.dst_min, self.udp_ports.dst_max)
541
542             configs.append({
543                 'count': cur_chain_flow_count,
544                 'mac_src': self.mac,
545                 'mac_dst': dest_macs[chain_idx],
546                 'ip_src_addr': src_ip_first,
547                 'ip_src_addr_max': src_ip_last,
548                 'ip_src_count': src_ip_size,
549                 'ip_dst_addr': dst_ip_first,
550                 'ip_dst_addr_max': dst_ip_last,
551                 'ip_dst_count': peer_ip_size,
552                 'ip_addrs_step': self.ip_addrs_step,
553                 'ip_src_static': self.ip_src_static,
554                 'udp_src_port': self.udp_ports.src_min,
555                 'udp_src_port_max': self.udp_ports.src_max,
556                 'udp_src_count': self.udp_ports.udp_src_size,
557                 'udp_dst_port': self.udp_ports.dst_min,
558                 'udp_dst_port_max': self.udp_ports.dst_max,
559                 'udp_dst_count': self.udp_ports.udp_dst_size,
560                 'udp_port_step': self.udp_ports.step,
561                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
562                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
563                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
564                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
565                 'vxlan': self.vxlan,
566                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
567                 'vtep_src_mac': self.mac if (self.vxlan or self.mpls) else None,
568                 'vtep_dst_mac': self.vtep_dst_mac if (self.vxlan or self.mpls) else None,
569                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
570                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
571                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None,
572                 'mpls': self.mpls,
573                 'mpls_outer_label': self.outer_labels[chain_idx] if self.mpls is True else None,
574                 'mpls_inner_label': self.inner_labels[chain_idx] if self.mpls is True else None
575
576             })
577             # after first chain, fall back to the flow count for all other chains
578             cur_chain_flow_count = flows_per_chain
579         return configs
580
581     @staticmethod
582     def ip_to_int(addr):
583         """Convert an IP address from string to numeric."""
584         return struct.unpack("!I", socket.inet_aton(addr))[0]
585
586     @staticmethod
587     def int_to_ip(nvalue):
588         """Convert an IP address from numeric to string."""
589         return socket.inet_ntoa(struct.pack("!I", int(nvalue)))
590
591
592 class GeneratorConfig(object):
593     """Represents traffic configuration for currently running traffic profile."""
594
595     DEFAULT_IP_STEP = '0.0.0.1'
596     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
597
598     def __init__(self, config):
599         """Create a generator config."""
600         self.config = config
601         # name of the generator profile (normally trex or dummy)
602         # pick the default one if not specified explicitly from cli options
603         if not config.generator_profile:
604             config.generator_profile = config.traffic_generator.default_profile
605         # pick up the profile dict based on the name
606         gen_config = self.__match_generator_profile(config.traffic_generator,
607                                                     config.generator_profile)
608         self.gen_config = gen_config
609         # copy over fields from the dict
610         self.tool = gen_config.tool
611         self.ip = gen_config.ip
612         # overrides on config.cores and config.mbuf_factor
613         if config.cores:
614             self.cores = config.cores
615         else:
616             self.cores = gen_config.get('cores', 1)
617         # let's report the value actually used in the end
618         config.cores_used = self.cores
619         self.mbuf_factor = config.mbuf_factor
620         self.mbuf_64 = config.mbuf_64
621         self.hdrh = not config.disable_hdrh
622         if config.intf_speed:
623             # interface speed is overriden from the command line
624             self.intf_speed = config.intf_speed
625         elif gen_config.intf_speed:
626             # interface speed is overriden from the generator config
627             self.intf_speed = gen_config.intf_speed
628         else:
629             self.intf_speed = "auto"
630         if self.intf_speed == "auto" or self.intf_speed == "0":
631             # interface speed is discovered/provided by the traffic generator
632             self.intf_speed = 0
633         else:
634             self.intf_speed = bitmath.parse_string(self.intf_speed.replace('ps', '')).bits
635         self.name = gen_config.name
636         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
637         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
638         self.limit_memory = gen_config.get('limit_memory', 1024)
639         self.software_mode = gen_config.get('software_mode', False)
640         self.interfaces = gen_config.interfaces
641         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
642             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
643         self.service_chain = config.service_chain
644         self.service_chain_count = config.service_chain_count
645         self.flow_count = config.flow_count
646         self.host_name = gen_config.host_name
647         self.bidirectional = config.traffic.bidirectional
648         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
649         self.ip_addrs = gen_config.ip_addrs
650         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
651         self.tg_gateway_ip_addrs_step = \
652             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
653         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
654         self.gateway_ips = gen_config.gateway_ip_addrs
655         self.ip_src_static = gen_config.ip_src_static
656         self.vteps = gen_config.get('vteps')
657         self.devices = [Device(port, self) for port in [0, 1]]
658         # This should normally always be [0, 1]
659         self.ports = [device.port for device in self.devices]
660
661         # check that pci is not empty
662         if not gen_config.interfaces[0].get('pci', None) or \
663            not gen_config.interfaces[1].get('pci', None):
664             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
665
666         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
667         self.vlan_tagging = config.vlan_tagging
668
669         # needed for result/summarizer
670         config['tg-name'] = gen_config.name
671         config['tg-tool'] = self.tool
672
673     def to_json(self):
674         """Get json form to display the content into the overall result dict."""
675         return dict(self.gen_config)
676
677     def set_dest_macs(self, port_index, dest_macs):
678         """Set the list of dest MACs indexed by the chain id on given port.
679
680         port_index: the port for which dest macs must be set
681         dest_macs: a list of dest MACs indexed by chain id
682         """
683         if len(dest_macs) < self.config.service_chain_count:
684             raise TrafficClientException('Dest MAC list %s must have %d entries' %
685                                          (dest_macs, self.config.service_chain_count))
686         # only pass the first scc dest MACs
687         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
688         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
689
690     def set_vtep_dest_macs(self, port_index, dest_macs):
691         """Set the list of dest MACs indexed by the chain id on given port.
692
693         port_index: the port for which dest macs must be set
694         dest_macs: a list of dest MACs indexed by chain id
695         """
696         if len(dest_macs) != self.config.service_chain_count:
697             raise TrafficClientException('Dest MAC list %s must have %d entries' %
698                                          (dest_macs, self.config.service_chain_count))
699         self.devices[port_index].set_vtep_dst_mac(dest_macs)
700         LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs})
701
702     def get_dest_macs(self):
703         """Return the list of dest macs indexed by port."""
704         return [dev.get_dest_macs() for dev in self.devices]
705
706     def set_vlans(self, port_index, vlans):
707         """Set the list of vlans to use indexed by the chain id on given port.
708
709         port_index: the port for which VLANs must be set
710         vlans: a  list of vlan lists indexed by chain id
711         """
712         if len(vlans) != self.config.service_chain_count:
713             raise TrafficClientException('VLAN list %s must have %d entries' %
714                                          (vlans, self.config.service_chain_count))
715         self.devices[port_index].set_vlans(vlans)
716
717     def set_vxlans(self, port_index, vxlans):
718         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
719
720         port_index: the port for which VXLANs must be set
721         VXLANs: a  list of VNIs lists indexed by chain id
722         """
723         if len(vxlans) != self.config.service_chain_count:
724             raise TrafficClientException('VXLAN list %s must have %d entries' %
725                                          (vxlans, self.config.service_chain_count))
726         self.devices[port_index].set_vxlans(vxlans)
727
728     def set_mpls_inner_labels(self, port_index, labels):
729         """Set the list of MPLS Labels to use indexed by the chain id on given port.
730
731         port_index: the port for which Labels must be set
732         Labels: a list of Labels lists indexed by chain id
733         """
734         if len(labels) != self.config.service_chain_count:
735             raise TrafficClientException('Inner MPLS list %s must have %d entries' %
736                                          (labels, self.config.service_chain_count))
737         self.devices[port_index].set_mpls_inner_labels(labels)
738
739     def set_mpls_outer_labels(self, port_index, labels):
740         """Set the list of MPLS Labels to use indexed by the chain id on given port.
741
742         port_index: the port for which Labels must be set
743         Labels: a list of Labels lists indexed by chain id
744         """
745         if len(labels) != self.config.service_chain_count:
746             raise TrafficClientException('Outer MPLS list %s must have %d entries' %
747                                          (labels, self.config.service_chain_count))
748         self.devices[port_index].set_mpls_outer_labels(labels)
749
750     def set_vtep_vlan(self, port_index, vlan):
751         """Set the vtep vlan to use indexed by the chain id on given port.
752         port_index: the port for which VLAN must be set
753         """
754         self.devices[port_index].set_vtep_vlan(vlan)
755
756     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
757         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
758
759     def set_mpls_peers(self, port_index, src_ip, dst_ip):
760         self.devices[port_index].set_mpls_peers(src_ip, dst_ip)
761
762     @staticmethod
763     def __match_generator_profile(traffic_generator, generator_profile):
764         gen_config = AttrDict(traffic_generator)
765         gen_config.pop('default_profile')
766         gen_config.pop('generator_profile')
767         matching_profile = [profile for profile in traffic_generator.generator_profile if
768                             profile.name == generator_profile]
769         if len(matching_profile) != 1:
770             raise Exception('Traffic generator profile not found: ' + generator_profile)
771
772         gen_config.update(matching_profile[0])
773         return gen_config
774
775
776 class TrafficClient(object):
777     """Traffic generator client with NDR/PDR binary seearch."""
778
779     PORTS = [0, 1]
780
781     def __init__(self, config, notifier=None):
782         """Create a new TrafficClient instance.
783
784         config: nfvbench config
785         notifier: notifier (optional)
786
787         A new instance is created everytime the nfvbench config may have changed.
788         """
789         self.config = config
790         self.generator_config = GeneratorConfig(config)
791         self.tool = self.generator_config.tool
792         self.gen = self._get_generator()
793         self.notifier = notifier
794         self.interval_collector = None
795         self.iteration_collector = None
796         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
797                                     self.config.service_mode)
798         self.config.frame_sizes = self._get_frame_sizes()
799         self.run_config = {
800             'l2frame_size': None,
801             'duration_sec': self.config.duration_sec,
802             'bidirectional': True,
803             'rates': []  # to avoid unsbuscriptable-obj warning
804         }
805         self.current_total_rate = {'rate_percent': '10'}
806         if self.config.single_run:
807             self.current_total_rate = utils.parse_rate_str(self.config.rate)
808         self.ifstats = None
809         # Speed is either discovered when connecting to TG or set from config
810         # This variable is 0 if not yet discovered from TG or must be the speed of
811         # each interface in bits per second
812         self.intf_speed = self.generator_config.intf_speed
813
814     def _get_generator(self):
815         tool = self.tool.lower()
816         if tool == 'trex':
817             from .traffic_gen import trex_gen
818             return trex_gen.TRex(self)
819         if tool == 'dummy':
820             from .traffic_gen import dummy
821             return dummy.DummyTG(self)
822         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
823
824     def skip_sleep(self):
825         """Skip all sleeps when doing unit testing with dummy TG.
826
827         Must be overriden using mock.patch
828         """
829         return False
830
831     def _get_frame_sizes(self):
832         traffic_profile_name = self.config.traffic.profile
833         matching_profiles = [profile for profile in self.config.traffic_profile if
834                              profile.name == traffic_profile_name]
835         if len(matching_profiles) > 1:
836             raise TrafficClientException('Multiple traffic profiles with name: ' +
837                                          traffic_profile_name)
838         if not matching_profiles:
839             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
840         return matching_profiles[0].l2frame_size
841
842     def start_traffic_generator(self):
843         """Start the traffic generator process (traffic not started yet)."""
844         self.gen.connect()
845         # pick up the interface speed if it is not set from config
846         intf_speeds = self.gen.get_port_speed_gbps()
847         # convert Gbps unit into bps
848         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
849         if self.intf_speed:
850             # interface speed is overriden from config
851             if self.intf_speed != tg_if_speed:
852                 # Warn the user if the speed in the config is different
853                 LOG.warning(
854                     'Interface speed provided (%g Gbps) is different from actual speed (%d Gbps)',
855                     self.intf_speed / 1000000000.0, intf_speeds[0])
856         else:
857             # interface speed not provisioned by config
858             self.intf_speed = tg_if_speed
859             # also update the speed in the tg config
860             self.generator_config.intf_speed = tg_if_speed
861         # let's report detected and actually used interface speed
862         self.config.intf_speed_detected = tg_if_speed
863         self.config.intf_speed_used = self.intf_speed
864
865         # Save the traffic generator local MAC
866         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
867             device.set_mac(mac)
868
869     def setup(self):
870         """Set up the traffic client."""
871         self.gen.clear_stats()
872
873     def get_version(self):
874         """Get the traffic generator version."""
875         return self.gen.get_version()
876
877     def ensure_end_to_end(self):
878         """Ensure traffic generator receives packets it has transmitted.
879
880         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
881
882         VMs that are started and in active state may not pass traffic yet. It is imperative to make
883         sure that all VMs are passing traffic in both directions before starting any benchmarking.
884         To verify this, we need to send at a low frequency bi-directional packets and make sure
885         that we receive all packets back from all VMs. The number of flows is equal to 2 times
886         the number of chains (1 per direction) and we need to make sure we receive packets coming
887         from exactly 2 x chain count different source MAC addresses.
888
889         Example:
890             PVP chain (1 VM per chain)
891             N = 10 (number of chains)
892             Flow count = 20 (number of flows)
893             If the number of unique source MAC addresses from received packets is 20 then
894             all 10 VMs 10 VMs are in operational state.
895         """
896         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
897         # send 2pps on each chain and each direction
898         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
899         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
900                                 e2e=True)
901         # ensures enough traffic is coming back
902         retry_count = int((self.config.check_traffic_time_sec +
903                            self.config.generic_poll_sec - 1) / self.config.generic_poll_sec)
904
905         # we expect to see packets coming from 2 unique MAC per chain
906         # because there can be flooding in the case of shared net
907         # we must verify that packets from the right VMs are received
908         # and not just count unique src MAC
909         # create a dict of (port, chain) tuples indexed by dest mac
910         mac_map = {}
911         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
912             for chain, mac in enumerate(dest_macs):
913                 mac_map[mac] = (port, chain)
914         unique_src_mac_count = len(mac_map)
915         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
916             get_mac_id = lambda packet: packet['binary'][60:66]
917         elif self.config.vxlan:
918             get_mac_id = lambda packet: packet['binary'][56:62]
919         elif self.config.mpls:
920             get_mac_id = lambda packet: packet['binary'][24:30]
921             # mpls_transport_label = lambda packet: packet['binary'][14:18]
922         else:
923             get_mac_id = lambda packet: packet['binary'][6:12]
924         for it in range(retry_count):
925             self.gen.clear_stats()
926             self.gen.start_traffic()
927             self.gen.start_capture()
928             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
929                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
930                      it + 1, retry_count)
931             if not self.skip_sleep():
932                 time.sleep(self.config.generic_poll_sec)
933             self.gen.stop_traffic()
934             self.gen.fetch_capture_packets()
935             self.gen.stop_capture()
936             for packet in self.gen.packet_list:
937                 mac_id = get_mac_id(packet).decode('latin-1')
938                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
939                 if self.config.mpls:
940                     if src_mac in mac_map and self.is_mpls(packet):
941                         port, chain = mac_map[src_mac]
942                         LOG.info('Received mpls packet from mac: %s (chain=%d, port=%d)',
943                                  src_mac, chain, port)
944                         mac_map.pop(src_mac, None)
945                 else:
946                     if src_mac in mac_map and self.is_udp(packet):
947                         port, chain = mac_map[src_mac]
948                         LOG.info('Received udp packet from mac: %s (chain=%d, port=%d)',
949                                  src_mac, chain, port)
950                         mac_map.pop(src_mac, None)
951
952                 if not mac_map:
953                     LOG.info('End-to-end connectivity established')
954                     return
955             if self.config.l3_router and not self.config.no_arp:
956                 # In case of L3 traffic mode, routers are not able to route traffic
957                 # until VM interfaces are up and ARP requests are done
958                 LOG.info('Waiting for loopback service completely started...')
959                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
960                 self.ensure_arp_successful()
961         raise TrafficClientException('End-to-end connectivity cannot be ensured')
962
963     def is_udp(self, packet):
964         pkt = Ether(packet['binary'])
965         return UDP in pkt
966
967     def is_mpls(self, packet):
968         pkt = Ether(packet['binary'])
969         return MPLS in pkt
970
971     def ensure_arp_successful(self):
972         """Resolve all IP using ARP and throw an exception in case of failure."""
973         dest_macs = self.gen.resolve_arp()
974         if dest_macs:
975             # all dest macs are discovered, saved them into the generator config
976             if self.config.vxlan or self.config.mpls:
977                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
978                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
979             else:
980                 self.generator_config.set_dest_macs(0, dest_macs[0])
981                 self.generator_config.set_dest_macs(1, dest_macs[1])
982         else:
983             raise TrafficClientException('ARP cannot be resolved')
984
985     def set_traffic(self, frame_size, bidirectional):
986         """Reconfigure the traffic generator for a new frame size."""
987         self.run_config['bidirectional'] = bidirectional
988         self.run_config['l2frame_size'] = frame_size
989         self.run_config['rates'] = [self.get_per_direction_rate()]
990         if bidirectional:
991             self.run_config['rates'].append(self.get_per_direction_rate())
992         else:
993             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
994             if unidir_reverse_pps > 0:
995                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
996         # Fix for [NFVBENCH-67], convert the rate string to PPS
997         for idx, rate in enumerate(self.run_config['rates']):
998             if 'rate_pps' not in rate:
999                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
1000
1001         self.gen.clear_streamblock()
1002
1003         if self.config.no_latency_streams:
1004             LOG.info("Latency streams are disabled")
1005         # in service mode, we must disable flow stats (e2e=True)
1006         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
1007                                 latency=not self.config.no_latency_streams,
1008                                 e2e=self.runner.service_mode)
1009
1010     def _modify_load(self, load):
1011         self.current_total_rate = {'rate_percent': str(load)}
1012         rate_per_direction = self.get_per_direction_rate()
1013
1014         self.gen.modify_rate(rate_per_direction, False)
1015         self.run_config['rates'][0] = rate_per_direction
1016         if self.run_config['bidirectional']:
1017             self.gen.modify_rate(rate_per_direction, True)
1018             self.run_config['rates'][1] = rate_per_direction
1019
1020     def get_ndr_and_pdr(self):
1021         """Start the NDR/PDR iteration and return the results."""
1022         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
1023         targets = {}
1024         if self.config.ndr_run:
1025             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
1026             targets['ndr'] = self.config.measurement.NDR
1027         if self.config.pdr_run:
1028             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
1029             targets['pdr'] = self.config.measurement.PDR
1030
1031         self.run_config['start_time'] = time.time()
1032         self.interval_collector = IntervalCollector(self.run_config['start_time'])
1033         self.interval_collector.attach_notifier(self.notifier)
1034         self.iteration_collector = IterationCollector(self.run_config['start_time'])
1035         results = {}
1036         self.__range_search(0.0, 200.0, targets, results)
1037
1038         results['iteration_stats'] = {
1039             'ndr_pdr': self.iteration_collector.get()
1040         }
1041
1042         if self.config.ndr_run:
1043             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
1044             results['ndr']['time_taken_sec'] = \
1045                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
1046             if self.config.pdr_run:
1047                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
1048                 results['pdr']['time_taken_sec'] = \
1049                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
1050         else:
1051             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
1052             results['pdr']['time_taken_sec'] = \
1053                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
1054         return results
1055
1056     def __get_dropped_rate(self, result):
1057         dropped_pkts = result['rx']['dropped_pkts']
1058         total_pkts = result['tx']['total_pkts']
1059         if not total_pkts:
1060             return float('inf')
1061         return float(dropped_pkts) / total_pkts * 100
1062
1063     def get_stats(self):
1064         """Collect final stats for previous run."""
1065         stats = self.gen.get_stats(self.ifstats)
1066         retDict = {'total_tx_rate': stats['total_tx_rate'],
1067                    'offered_tx_rate_bps': stats['offered_tx_rate_bps'],
1068                    'theoretical_tx_rate_bps': stats['theoretical_tx_rate_bps'],
1069                    'theoretical_tx_rate_pps': stats['theoretical_tx_rate_pps']}
1070
1071         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
1072         rx_keys = tx_keys + ['dropped_pkts']
1073
1074         for port in self.PORTS:
1075             port_stats = {'tx': {}, 'rx': {}}
1076             for key in tx_keys:
1077                 port_stats['tx'][key] = int(stats[port]['tx'][key])
1078             for key in rx_keys:
1079                 try:
1080                     port_stats['rx'][key] = int(stats[port]['rx'][key])
1081                 except ValueError:
1082                     port_stats['rx'][key] = 0
1083             port_stats['rx']['avg_delay_usec'] = cast_integer(
1084                 stats[port]['rx']['avg_delay_usec'])
1085             port_stats['rx']['min_delay_usec'] = cast_integer(
1086                 stats[port]['rx']['min_delay_usec'])
1087             port_stats['rx']['max_delay_usec'] = cast_integer(
1088                 stats[port]['rx']['max_delay_usec'])
1089             port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats)
1090             retDict[str(port)] = port_stats
1091
1092         ports = sorted(list(retDict.keys()), key=str)
1093         if self.run_config['bidirectional']:
1094             retDict['overall'] = {'tx': {}, 'rx': {}}
1095             for key in tx_keys:
1096                 retDict['overall']['tx'][key] = \
1097                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
1098             for key in rx_keys:
1099                 retDict['overall']['rx'][key] = \
1100                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
1101             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
1102                           retDict[ports[1]]['rx']['total_pkts']]
1103             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
1104                           retDict[ports[1]]['rx']['avg_delay_usec']]
1105             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
1106                           retDict[ports[1]]['rx']['max_delay_usec']]
1107             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
1108                           retDict[ports[1]]['rx']['min_delay_usec']]
1109             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
1110             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
1111             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
1112             for key in ['pkt_bit_rate', 'pkt_rate']:
1113                 for dirc in ['tx', 'rx']:
1114                     retDict['overall'][dirc][key] /= 2.0
1115                 retDict['overall']['hdrh'] = stats.get('hdrh', None)
1116                 if retDict['overall']['hdrh']:
1117                     decoded_histogram = HdrHistogram.decode(retDict['overall']['hdrh'])
1118                     # override min max and avg from hdrh
1119                     retDict['overall']['rx']['min_delay_usec'] = decoded_histogram.get_min_value()
1120                     retDict['overall']['rx']['max_delay_usec'] = decoded_histogram.get_max_value()
1121                     retDict['overall']['rx']['avg_delay_usec'] = decoded_histogram.get_mean_value()
1122                     retDict['overall']['rx']['lat_percentile'] = {}
1123                     for percentile in self.config.lat_percentiles:
1124                         retDict['overall']['rx']['lat_percentile'][percentile] = \
1125                             decoded_histogram.get_value_at_percentile(percentile)
1126         else:
1127             retDict['overall'] = retDict[ports[0]]
1128         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
1129         return retDict
1130
1131     def __convert_rates(self, rate):
1132         return utils.convert_rates(self.run_config['l2frame_size'],
1133                                    rate,
1134                                    self.intf_speed)
1135
1136     def __ndr_pdr_found(self, tag, load):
1137         rates = self.__convert_rates({'rate_percent': load})
1138         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
1139         last_stats = self.iteration_collector.peek()
1140         self.interval_collector.add_ndr_pdr(tag, last_stats)
1141
1142     def __format_output_stats(self, stats):
1143         for key in self.PORTS + ['overall']:
1144             key = str(key)
1145             interface = stats[key]
1146             stats[key] = {
1147                 'tx_pkts': interface['tx']['total_pkts'],
1148                 'rx_pkts': interface['rx']['total_pkts'],
1149                 'drop_percentage': interface['drop_rate_percent'],
1150                 'drop_pct': interface['rx']['dropped_pkts'],
1151                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
1152                 'max_delay_usec': interface['rx']['max_delay_usec'],
1153                 'min_delay_usec': interface['rx']['min_delay_usec'],
1154             }
1155
1156             if key == 'overall':
1157                 stats[key]['hdrh'] = interface.get('hdrh', None)
1158                 if stats[key]['hdrh']:
1159                     decoded_histogram = HdrHistogram.decode(stats[key]['hdrh'])
1160                     # override min max and avg from hdrh
1161                     stats[key]['min_delay_usec'] = decoded_histogram.get_min_value()
1162                     stats[key]['max_delay_usec'] = decoded_histogram.get_max_value()
1163                     stats[key]['avg_delay_usec'] = decoded_histogram.get_mean_value()
1164                     stats[key]['lat_percentile'] = {}
1165                     for percentile in self.config.lat_percentiles:
1166                         stats[key]['lat_percentile'][percentile] = decoded_histogram.\
1167                             get_value_at_percentile(percentile)
1168
1169
1170         return stats
1171
1172     def __targets_found(self, rate, targets, results):
1173         for tag, target in list(targets.items()):
1174             LOG.info('Found %s (%s) load: %s', tag, target, rate)
1175             self.__ndr_pdr_found(tag, rate)
1176             results[tag]['timestamp_sec'] = time.time()
1177
1178     def __range_search(self, left, right, targets, results):
1179         """Perform a binary search for a list of targets inside a [left..right] range or rate.
1180
1181         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
1182                 indicating the rate to send on each interface
1183         right   the right side of the range to search as a % of line rate
1184                 indicating the rate to send on each interface
1185         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
1186                 ('ndr', 'pdr')
1187         results a dict to store results
1188         """
1189         if not targets:
1190             return
1191         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
1192
1193         # Terminate search when gap is less than load epsilon
1194         if right - left < self.config.measurement.load_epsilon:
1195             self.__targets_found(left, targets, results)
1196             return
1197
1198         # Obtain the average drop rate in for middle load
1199         middle = (left + right) / 2.0
1200         try:
1201             stats, rates = self.__run_search_iteration(middle)
1202         except STLError:
1203             LOG.exception("Got exception from traffic generator during binary search")
1204             self.__targets_found(left, targets, results)
1205             return
1206         # Split target dicts based on the avg drop rate
1207         left_targets = {}
1208         right_targets = {}
1209         for tag, target in list(targets.items()):
1210             if stats['overall']['drop_rate_percent'] <= target:
1211                 # record the best possible rate found for this target
1212                 results[tag] = rates
1213                 results[tag].update({
1214                     'load_percent_per_direction': middle,
1215                     'stats': self.__format_output_stats(dict(stats)),
1216                     'timestamp_sec': None
1217                 })
1218                 right_targets[tag] = target
1219             else:
1220                 # initialize to 0 all fields of result for
1221                 # the worst case scenario of the binary search (if ndr/pdr is not found)
1222                 if tag not in results:
1223                     results[tag] = dict.fromkeys(rates, 0)
1224                     empty_stats = self.__format_output_stats(dict(stats))
1225                     for key in empty_stats:
1226                         if isinstance(empty_stats[key], dict):
1227                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
1228                         else:
1229                             empty_stats[key] = 0
1230                     results[tag].update({
1231                         'load_percent_per_direction': 0,
1232                         'stats': empty_stats,
1233                         'timestamp_sec': None
1234                     })
1235                 left_targets[tag] = target
1236
1237         # search lower half
1238         self.__range_search(left, middle, left_targets, results)
1239
1240         # search upper half only if the upper rate does not exceed
1241         # 100%, this only happens when the first search at 100%
1242         # yields a DR that is < target DR
1243         if middle >= 100:
1244             self.__targets_found(100, right_targets, results)
1245         else:
1246             self.__range_search(middle, right, right_targets, results)
1247
1248     def __run_search_iteration(self, rate):
1249         """Run one iteration at the given rate level.
1250
1251         rate: the rate to send on each port in percent (0 to 100)
1252         """
1253         self._modify_load(rate)
1254
1255         # There used to be a inconsistency in case of interface speed override.
1256         # The emulated 'intf_speed' value is unknown to the T-Rex generator which
1257         # refers to the detected line rate for converting relative traffic loads.
1258         # Therefore, we need to convert actual rates here, in terms of packets/s.
1259
1260         for idx, str_rate in enumerate(self.gen.rates):
1261             if str_rate.endswith('%'):
1262                 float_rate = float(str_rate.replace('%', '').strip())
1263                 pps_rate = self.__convert_rates({'rate_percent': float_rate})['rate_pps']
1264                 self.gen.rates[idx] = str(pps_rate) + 'pps'
1265
1266         # poll interval stats and collect them
1267         for stats in self.run_traffic():
1268             self.interval_collector.add(stats)
1269             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
1270             if time_elapsed_ratio >= 1:
1271                 self.cancel_traffic()
1272                 if not self.skip_sleep():
1273                     time.sleep(self.config.pause_sec)
1274         self.interval_collector.reset()
1275
1276         # get stats from the run
1277         stats = self.runner.client.get_stats()
1278         current_traffic_config = self._get_traffic_config()
1279         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
1280                                         stats['total_tx_rate'])
1281         if warning is not None:
1282             stats['warning'] = warning
1283
1284         # save reliable stats from whole iteration
1285         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
1286         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
1287         return stats, current_traffic_config['direction-total']
1288
1289     def log_stats(self, stats):
1290         """Log estimated stats during run."""
1291         # Calculate a rolling drop rate based on differential to
1292         # the previous reading
1293         cur_tx = stats['overall']['tx']['total_pkts']
1294         cur_rx = stats['overall']['rx']['total_pkts']
1295         delta_tx = cur_tx - self.prev_tx
1296         delta_rx = cur_rx - self.prev_rx
1297         drops = delta_tx - delta_rx
1298         drop_rate_pct = 100 * (delta_tx - delta_rx)/delta_tx
1299         self.prev_tx = cur_tx
1300         self.prev_rx = cur_rx
1301         LOG.info('TX: %15s; RX: %15s; (Est.) Dropped: %12s; Drop rate: %8.4f%%',
1302                  format(cur_tx, ',d'),
1303                  format(cur_rx, ',d'),
1304                  format(drops, ',d'),
1305                  drop_rate_pct)
1306
1307     def run_traffic(self):
1308         """Start traffic and return intermediate stats for each interval."""
1309         stats = self.runner.run()
1310         self.prev_tx = 0
1311         self.prev_rx = 0
1312         while self.runner.is_running:
1313             self.log_stats(stats)
1314             yield stats
1315             stats = self.runner.poll_stats()
1316             if stats is None:
1317                 return
1318         self.log_stats(stats)
1319         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
1320         yield stats
1321
1322     def cancel_traffic(self):
1323         """Stop traffic."""
1324         self.runner.stop()
1325
1326     def _get_traffic_config(self):
1327         config = {}
1328         load_total = 0.0
1329         bps_total = 0.0
1330         pps_total = 0.0
1331         for idx, rate in enumerate(self.run_config['rates']):
1332             key = 'direction-forward' if idx == 0 else 'direction-reverse'
1333             config[key] = {
1334                 'l2frame_size': self.run_config['l2frame_size'],
1335                 'duration_sec': self.run_config['duration_sec']
1336             }
1337             config[key].update(rate)
1338             config[key].update(self.__convert_rates(rate))
1339             load_total += float(config[key]['rate_percent'])
1340             bps_total += float(config[key]['rate_bps'])
1341             pps_total += float(config[key]['rate_pps'])
1342         config['direction-total'] = dict(config['direction-forward'])
1343         config['direction-total'].update({
1344             'rate_percent': load_total,
1345             'rate_pps': cast_integer(pps_total),
1346             'rate_bps': bps_total
1347         })
1348
1349         return config
1350
1351     def get_run_config(self, results):
1352         """Return configuration which was used for the last run."""
1353         r = {}
1354         # because we want each direction to have the far end RX rates,
1355         # use the far end index (1-idx) to retrieve the RX rates
1356         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
1357             tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec
1358             rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec
1359             r[key] = {
1360                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
1361                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
1362                 "rx": self.__convert_rates({'rate_pps': rx_rate})
1363             }
1364
1365         total = {}
1366         for direction in ['orig', 'tx', 'rx']:
1367             total[direction] = {}
1368             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
1369                 total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())])
1370
1371         r['direction-total'] = total
1372         return r
1373
1374     def insert_interface_stats(self, pps_list):
1375         """Insert interface stats to a list of packet path stats.
1376
1377         pps_list: a list of packet path stats instances indexed by chain index
1378
1379         This function will insert the packet path stats for the traffic gen ports 0 and 1
1380         with itemized per chain tx/rx counters.
1381         There will be as many packet path stats as chains.
1382         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1383         self.pps_list:
1384         [
1385         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1386         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1387         ...
1388         ]
1389         """
1390         def get_if_stats(chain_idx):
1391             return [InterfaceStats('p' + str(port), self.tool)
1392                     for port in range(2)]
1393         # keep the list of list of interface stats indexed by the chain id
1394         self.ifstats = [get_if_stats(chain_idx)
1395                         for chain_idx in range(self.config.service_chain_count)]
1396         # note that we need to make a copy of the ifs list so that any modification in the
1397         # list from pps will not change the list saved in self.ifstats
1398         self.pps_list = [PacketPathStats(self.config, list(ifs)) for ifs in self.ifstats]
1399         # insert the corresponding pps in the passed list
1400         pps_list.extend(self.pps_list)
1401
1402     def update_interface_stats(self, diff=False):
1403         """Update all interface stats.
1404
1405         diff: if False, simply refresh the interface stats values with latest values
1406               if True, diff the interface stats with the latest values
1407         Make sure that the interface stats inserted in insert_interface_stats() are updated
1408         with proper values.
1409         self.ifstats:
1410         [
1411         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1412         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1413         ...
1414         ]
1415         """
1416         if diff:
1417             stats = self.gen.get_stats(self.ifstats)
1418             for chain_idx, ifs in enumerate(self.ifstats):
1419                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1420                 # corresponding to the
1421                 # port 0 and port 1 for the given chain_idx
1422                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1423                 # interface stats for the pps because it could have been modified to contain
1424                 # additional interface stats
1425                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1426             # special handling for vxlan
1427             # in case of vxlan, flow stats are not available so all rx counters will be
1428             # zeros when the total rx port counter is non zero.
1429             # in that case,
1430             for port in range(2):
1431                 total_rx = 0
1432                 for ifs in self.ifstats:
1433                     total_rx += ifs[port].rx
1434                 if total_rx == 0:
1435                     # check if the total port rx from Trex is also zero
1436                     port_rx = stats[port]['rx']['total_pkts']
1437                     if port_rx:
1438                         # the total rx for all chains from port level stats is non zero
1439                         # which means that the per-chain stats are not available
1440                         if len(self.ifstats) == 1:
1441                             # only one chain, simply report the port level rx to the chain rx stats
1442                             self.ifstats[0][port].rx = port_rx
1443                         else:
1444                             for ifs in self.ifstats:
1445                                 # mark this data as unavailable
1446                                 ifs[port].rx = None
1447                             # pitch in the total rx only in the last chain pps
1448                             self.ifstats[-1][port].rx_total = port_rx
1449
1450     @staticmethod
1451     def compare_tx_rates(required, actual):
1452         """Compare the actual TX rate to the required TX rate."""
1453         threshold = 0.9
1454         are_different = False
1455         try:
1456             if float(actual) / required < threshold:
1457                 are_different = True
1458         except ZeroDivisionError:
1459             are_different = True
1460
1461         if are_different:
1462             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1463                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1464                   "to achieve the requested TX rate.".format(r=required, a=actual)
1465             LOG.info(msg)
1466             return msg
1467
1468         return None
1469
1470     def get_per_direction_rate(self):
1471         """Get the rate for each direction."""
1472         divisor = 2 if self.run_config['bidirectional'] else 1
1473         if 'rate_percent' in self.current_total_rate:
1474             # don't split rate if it's percentage
1475             divisor = 1
1476
1477         return utils.divide_rate(self.current_total_rate, divisor)
1478
1479     def close(self):
1480         """Close this instance."""
1481         try:
1482             self.gen.stop_traffic()
1483         except Exception:
1484             pass
1485         self.gen.clear_stats()
1486         self.gen.cleanup()