NFVBENCH-171 Not accurate flow count with some IP and UDP ranges combinations
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16 import socket
17 import struct
18 import time
19
20 from attrdict import AttrDict
21 import bitmath
22 from hdrh.histogram import HdrHistogram
23 from netaddr import IPNetwork
24 # pylint: disable=import-error
25 from trex.stl.api import Ether
26 from trex.stl.api import STLError
27 from trex.stl.api import UDP
28 # pylint: disable=wrong-import-order
29 from scapy.contrib.mpls import MPLS  # flake8: noqa
30 # pylint: enable=wrong-import-order
31 # pylint: enable=import-error
32
33 from .log import LOG
34 from .packet_stats import InterfaceStats
35 from .packet_stats import PacketPathStats
36 from .stats_collector import IntervalCollector
37 from .stats_collector import IterationCollector
38 from .traffic_gen import traffic_utils as utils
39 from .utils import cast_integer, find_max_size, find_tuples_equal_to_lcm_value, get_divisors, lcm
40
41 class TrafficClientException(Exception):
42     """Generic traffic client exception."""
43
44 class TrafficRunner(object):
45     """Serialize various steps required to run traffic."""
46
47     def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
48         """Create a traffic runner."""
49         self.client = client
50         self.start_time = None
51         self.duration_sec = duration_sec
52         self.interval_sec = interval_sec
53         self.service_mode = service_mode
54
55     def run(self):
56         """Clear stats and instruct the traffic generator to start generating traffic."""
57         if self.is_running():
58             return None
59         LOG.info('Running traffic generator')
60         self.client.gen.clear_stats()
61         # Debug use only : new '--service-mode' option available for the NFVBench command line.
62         # A read-only mode TRex console would be able to capture the generated traffic.
63         self.client.gen.set_service_mode(enabled=self.service_mode)
64         LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
65         self.client.gen.start_traffic()
66         self.start_time = time.time()
67         return self.poll_stats()
68
69     def stop(self):
70         """Stop the current run and instruct the traffic generator to stop traffic."""
71         if self.is_running():
72             self.start_time = None
73             self.client.gen.stop_traffic()
74
75     def is_running(self):
76         """Check if a run is still pending."""
77         return self.start_time is not None
78
79     def time_elapsed(self):
80         """Return time elapsed since start of run."""
81         if self.is_running():
82             return time.time() - self.start_time
83         return self.duration_sec
84
85     def poll_stats(self):
86         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
87
88         return: latest stats or None if traffic is stopped
89         """
90         if not self.is_running():
91             return None
92         if self.client.skip_sleep():
93             self.stop()
94             return self.client.get_stats()
95         time_elapsed = self.time_elapsed()
96         if time_elapsed > self.duration_sec:
97             self.stop()
98             return None
99         time_left = self.duration_sec - time_elapsed
100         if self.interval_sec > 0.0:
101             if time_left <= self.interval_sec:
102                 time.sleep(time_left)
103                 self.stop()
104             else:
105                 time.sleep(self.interval_sec)
106         else:
107             time.sleep(self.duration_sec)
108             self.stop()
109         return self.client.get_stats()
110
111
112 class IpBlock(object):
113     """Manage a block of IP addresses."""
114
115     def __init__(self, base_ip, step_ip, count_ip):
116         """Create an IP block."""
117         self.base_ip_int = Device.ip_to_int(base_ip)
118         if step_ip == 'random':
119             step_ip = '0.0.0.1'
120         self.step = Device.ip_to_int(step_ip)
121         self.max_available = count_ip
122         self.next_free = 0
123
124     def get_ip(self, index=0):
125         """Return the IP address at given index."""
126         if index < 0 or index >= self.max_available:
127             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
128         return Device.int_to_ip(self.base_ip_int + index * self.step)
129
130     def get_ip_from_chain_first_ip(self, first_ip, index=0):
131         """Return the IP address at given index starting from chain first ip."""
132         if index < 0 or index >= self.max_available:
133             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
134         return Device.int_to_ip(first_ip + index * self.step)
135
136     def reserve_ip_range(self, count):
137         """Reserve a range of count consecutive IP addresses spaced by step.
138         """
139         if self.next_free + count > self.max_available:
140             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
141                              (self.next_free,
142                               self.max_available,
143                               count))
144         first_ip = self.get_ip(self.next_free)
145         last_ip = self.get_ip(self.next_free + count - 1)
146         self.next_free += count
147         return (first_ip, last_ip)
148
149     def reset_reservation(self):
150         """Reset all reservations and restart with a completely unused IP block."""
151         self.next_free = 0
152
153
154 class UdpPorts(object):
155
156     def __init__(self, src_min, src_max, dst_min, dst_max, udp_src_size, udp_dst_size, step):
157
158         self.src_min = int(src_min)
159         self.src_max = int(src_max)
160         self.dst_min = int(dst_min)
161         self.dst_max = int(dst_max)
162         self.udp_src_size = udp_src_size
163         self.udp_dst_size = udp_dst_size
164         self.step = step
165
166     def get_src_max(self, index=0):
167         """Return the UDP src port at given index."""
168         return int(self.src_min) + index * int(self.step)
169
170     def get_dst_max(self, index=0):
171         """Return the UDP dst port at given index."""
172         return int(self.dst_min) + index * int(self.step)
173
174
175 class Device(object):
176     """Represent a port device and all information associated to it.
177
178     In the curent version we only support 2 port devices for the traffic generator
179     identified as port 0 or port 1.
180     """
181
182     def __init__(self, port, generator_config):
183         """Create a new device for a given port."""
184         self.generator_config = generator_config
185         self.chain_count = generator_config.service_chain_count
186         if generator_config.bidirectional:
187             self.flow_count = generator_config.flow_count / 2
188         else:
189             self.flow_count = generator_config.flow_count
190
191         self.port = port
192         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
193         self.vtep_vlan = None
194         self.vtep_src_mac = None
195         self.vxlan = False
196         self.mpls = False
197         self.inner_labels = None
198         self.outer_labels = None
199         self.pci = generator_config.interfaces[port].pci
200         self.mac = None
201         self.dest_macs = None
202         self.vtep_dst_mac = None
203         self.vtep_dst_ip = None
204         if generator_config.vteps is None:
205             self.vtep_src_ip = None
206         else:
207             self.vtep_src_ip = generator_config.vteps[port]
208         self.vnis = None
209         self.vlans = None
210         self.ip_addrs = generator_config.ip_addrs[port]
211         self.ip_src_static = generator_config.ip_src_static
212         self.ip_addrs_step = generator_config.ip_addrs_step
213         if self.ip_addrs_step == 'random':
214             # Set step to 1 to calculate the IP range size (see check_range_size below)
215             step = '0.0.0.1'
216         else:
217             step = self.ip_addrs_step
218         self.ip_size = self.check_range_size(IPNetwork(self.ip_addrs).size, Device.ip_to_int(step))
219         self.ip = str(IPNetwork(self.ip_addrs).network)
220         ip_addrs_left = generator_config.ip_addrs[0]
221         ip_addrs_right = generator_config.ip_addrs[1]
222         self.ip_addrs_size = {
223             'left': self.check_range_size(IPNetwork(ip_addrs_left).size, Device.ip_to_int(step)),
224             'right': self.check_range_size(IPNetwork(ip_addrs_right).size, Device.ip_to_int(step))}
225         udp_src_port = generator_config.gen_config.udp_src_port
226         if udp_src_port is None:
227             udp_src_port = 53
228         udp_dst_port = generator_config.gen_config.udp_dst_port
229         if udp_dst_port is None:
230             udp_dst_port = 53
231         src_max, src_min = self.define_udp_range(udp_src_port, 'udp_src_port')
232         dst_max, dst_min = self.define_udp_range(udp_dst_port, 'udp_dst_port')
233         if generator_config.gen_config.udp_port_step == 'random':
234             # Set step to 1 to calculate the UDP range size
235             udp_step = 1
236         else:
237             udp_step = int(generator_config.gen_config.udp_port_step)
238         udp_src_size = self.check_range_size(int(src_max) - int(src_min) + 1, udp_step)
239         udp_dst_size = self.check_range_size(int(dst_max) - int(dst_min) + 1, udp_step)
240         lcm_port = lcm(udp_src_size, udp_dst_size)
241         if self.ip_src_static is True:
242             lcm_ip = lcm(1, min(self.ip_addrs_size['left'], self.ip_addrs_size['right']))
243         else:
244             lcm_ip = lcm(self.ip_addrs_size['left'], self.ip_addrs_size['right'])
245         flow_max = lcm(lcm_port, lcm_ip)
246         if self.flow_count > flow_max:
247             raise TrafficClientException('Trying to set unachievable traffic (%d > %d)' %
248                                          (self.flow_count, flow_max))
249
250         self.udp_ports = UdpPorts(src_min, src_max, dst_min, dst_max, udp_src_size, udp_dst_size,
251                                   generator_config.gen_config.udp_port_step)
252
253         self.ip_block = IpBlock(self.ip, step, self.ip_size)
254
255         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
256                                    generator_config.gateway_ip_addrs_step,
257                                    self.chain_count)
258         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
259         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
260                                       generator_config.tg_gateway_ip_addrs_step,
261                                       self.chain_count)
262
263     def limit_ip_udp_ranges(self, peer_ip_size, cur_chain_flow_count):
264         # init to min value in case of no matching values found with lcm calculation
265         new_src_ip_size = 1
266         new_peer_ip_size = 1
267         new_src_udp_size = 1
268         new_dst_udp_size = 1
269
270         if self.ip_src_static is True:
271             src_ip_size = 1
272         else:
273             src_ip_size = self.ip_size
274         ip_src_divisors = list(get_divisors(src_ip_size))
275         ip_dst_divisors = list(get_divisors(peer_ip_size))
276         udp_src_divisors = list(get_divisors(self.udp_ports.udp_src_size))
277         udp_dst_divisors = list(get_divisors(self.udp_ports.udp_dst_size))
278         fc = int(cur_chain_flow_count)
279         tuples_ip = list(find_tuples_equal_to_lcm_value(ip_src_divisors, ip_dst_divisors, fc))
280         tuples_udp = list(find_tuples_equal_to_lcm_value(udp_src_divisors, udp_dst_divisors, fc))
281
282         if tuples_ip:
283             new_src_ip_size = tuples_ip[-1][0]
284             new_peer_ip_size = tuples_ip[-1][1]
285
286         if tuples_udp:
287             new_src_udp_size = tuples_udp[-1][0]
288             new_dst_udp_size = tuples_udp[-1][1]
289
290         tuples_src = []
291         tuples_dst = []
292         if not tuples_ip and not tuples_udp:
293             # in case of not divisors in common matching LCM value (i.e. requested flow count)
294             # try to find an accurate UDP range to fit requested flow count
295             udp_src_int = range(self.udp_ports.src_min, self.udp_ports.src_max)
296             udp_dst_int = range(self.udp_ports.dst_min, self.udp_ports.dst_max)
297             tuples_src = list(find_tuples_equal_to_lcm_value(ip_src_divisors, udp_src_int, fc))
298             tuples_dst = list(find_tuples_equal_to_lcm_value(ip_dst_divisors, udp_dst_int, fc))
299
300             if not tuples_src and not tuples_dst:
301                 # iterate IP and UDP ranges to find a tuple that match flow count values
302                 src_ip_range = range(1,src_ip_size)
303                 dst_ip_range = range(1, peer_ip_size)
304                 tuples_src = list(find_tuples_equal_to_lcm_value(src_ip_range, udp_src_int, fc))
305                 tuples_dst = list(find_tuples_equal_to_lcm_value(dst_ip_range, udp_dst_int, fc))
306
307         if tuples_src or tuples_dst:
308             if tuples_src:
309                 new_src_ip_size = tuples_src[-1][0]
310                 new_src_udp_size = tuples_src[-1][1]
311             if tuples_dst:
312                 new_peer_ip_size = tuples_dst[-1][0]
313                 new_dst_udp_size = tuples_dst[-1][1]
314         else:
315             if not tuples_ip:
316                 if src_ip_size != 1:
317                     if src_ip_size > fc:
318                         new_src_ip_size = fc
319                     else:
320                         new_src_ip_size = find_max_size(src_ip_size, tuples_udp, fc)
321                 if peer_ip_size != 1:
322                     if peer_ip_size > fc:
323                         new_peer_ip_size = fc
324                     else:
325                         new_peer_ip_size = find_max_size(peer_ip_size, tuples_udp, fc)
326
327             if not tuples_udp:
328                 if self.udp_ports.udp_src_size != 1:
329                     if self.udp_ports.udp_src_size > fc:
330                         new_src_udp_size = fc
331                     else:
332                         new_src_udp_size = find_max_size(self.udp_ports.udp_src_size,
333                                                          tuples_ip, fc)
334                 if self.udp_ports.udp_dst_size != 1:
335                     if self.udp_ports.udp_dst_size > fc:
336                         new_dst_udp_size = fc
337                     else:
338                         new_dst_udp_size = find_max_size(self.udp_ports.udp_dst_size,
339                                                          tuples_ip, fc)
340         max_possible_flows = lcm(lcm(new_src_ip_size, new_peer_ip_size),
341                                  lcm(new_src_udp_size, new_dst_udp_size))
342
343         LOG.debug("IP dst size: %d", new_peer_ip_size)
344         LOG.debug("LCM IP: %d", lcm(new_src_ip_size, new_peer_ip_size))
345         LOG.debug("LCM UDP: %d", lcm(new_src_udp_size, new_dst_udp_size))
346         LOG.debug("Global LCM: %d", max_possible_flows)
347         LOG.debug("IP src size: %d, IP dst size: %d, UDP src size: %d, UDP dst size: %d",
348                   new_src_ip_size, new_peer_ip_size, self.udp_ports.udp_src_size,
349                   self.udp_ports.udp_dst_size)
350         if not max_possible_flows == cur_chain_flow_count:
351             if (self.ip_addrs_step != '0.0.0.1' or self.udp_ports.step != '1') and not (
352                     self.ip_addrs_step == 'random' and self.udp_ports.step == 'random'):
353                 LOG.warning("Current values of ip_addrs_step and/or udp_port_step properties "
354                             "do not allow to control an accurate flow count. "
355                             "Values will be overridden as follows:")
356                 if self.ip_addrs_step != '0.0.0.1':
357                     LOG.info("ip_addrs_step='0.0.0.1' (previous value: ip_addrs_step='%s')",
358                              self.ip_addrs_step)
359                     self.ip_addrs_step = '0.0.0.1'
360
361                 if self.udp_ports.step != '1':
362                     LOG.info("udp_port_step='1' (previous value: udp_port_step='%s')",
363                              self.udp_ports.step)
364                     self.udp_ports.step = '1'
365                     # override config for not logging random step warning message in trex_gen.py
366                     self.generator_config.gen_config.udp_port_step = self.udp_ports.step
367             else:
368                 LOG.error("Current values of ip_addrs_step and udp_port_step properties "
369                           "do not allow to control an accurate flow count.")
370         else:
371             src_ip_size = new_src_ip_size
372             peer_ip_size = new_peer_ip_size
373             self.udp_ports.udp_src_size = new_src_udp_size
374             self.udp_ports.udp_dst_size = new_dst_udp_size
375         return src_ip_size, peer_ip_size
376
377     @staticmethod
378     def define_udp_range(udp_port, property_name):
379         if isinstance(udp_port, int):
380             min = udp_port
381             max = min
382         elif isinstance(udp_port, tuple):
383             min = udp_port[0]
384             max = udp_port[1]
385         else:
386             raise TrafficClientException('Invalid %s property value (53 or [\'53\',\'1024\'])'
387                                          % property_name)
388         return max, min
389
390
391     @staticmethod
392     def check_range_size(range_size, step):
393         """Check and set the available IPs or UDP ports, considering the step."""
394         try:
395             if range_size % step == 0:
396                 value = range_size // step
397             else:
398                 value = range_size // step + 1
399             return value
400         except ZeroDivisionError:
401             raise ZeroDivisionError("step can't be zero !") from ZeroDivisionError
402
403     def set_mac(self, mac):
404         """Set the local MAC for this port device."""
405         if mac is None:
406             raise TrafficClientException('Trying to set traffic generator MAC address as None')
407         self.mac = mac
408
409     def get_peer_device(self):
410         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
411         return self.generator_config.devices[1 - self.port]
412
413     def set_vtep_dst_mac(self, dest_macs):
414         """Set the list of dest MACs indexed by the chain id.
415
416         This is only called in 2 cases:
417         - VM macs discovered using openstack API
418         - dest MACs provisioned in config file
419         """
420         self.vtep_dst_mac = list(map(str, dest_macs))
421
422     def set_dest_macs(self, dest_macs):
423         """Set the list of dest MACs indexed by the chain id.
424
425         This is only called in 2 cases:
426         - VM macs discovered using openstack API
427         - dest MACs provisioned in config file
428         """
429         self.dest_macs = list(map(str, dest_macs))
430
431     def get_dest_macs(self):
432         """Get the list of dest macs for this device.
433
434         If set_dest_macs was never called, assumes l2-loopback and return
435         a list of peer mac (as many as chains but normally only 1 chain)
436         """
437         if self.dest_macs:
438             return self.dest_macs
439         # assume this is l2-loopback
440         return [self.get_peer_device().mac] * self.chain_count
441
442     def set_vlans(self, vlans):
443         """Set the list of vlans to use indexed by the chain id."""
444         self.vlans = vlans
445         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
446
447     def set_vtep_vlan(self, vlan):
448         """Set the vtep vlan to use indexed by specific port."""
449         self.vtep_vlan = vlan
450         self.vxlan = True
451         self.vlan_tagging = None
452         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
453
454     def set_vxlan_endpoints(self, src_ip, dst_ip):
455         self.vtep_dst_ip = dst_ip
456         self.vtep_src_ip = src_ip
457         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
458                  self.vtep_src_ip, self.vtep_dst_ip)
459
460     def set_mpls_peers(self, src_ip, dst_ip):
461         self.mpls = True
462         self.vtep_dst_ip = dst_ip
463         self.vtep_src_ip = src_ip
464         LOG.info("Port %d: src_mpls_vtep %s, mpls_peer_ip %s", self.port,
465                  self.vtep_src_ip, self.vtep_dst_ip)
466
467     def set_vxlans(self, vnis):
468         self.vnis = vnis
469         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
470
471     def set_mpls_inner_labels(self, labels):
472         self.inner_labels = labels
473         LOG.info("Port %d: MPLS Inner Labels %s", self.port, self.inner_labels)
474
475     def set_mpls_outer_labels(self, labels):
476         self.outer_labels = labels
477         LOG.info("Port %d: MPLS Outer Labels %s", self.port, self.outer_labels)
478
479     def set_gw_ip(self, gateway_ip):
480         self.gw_ip_block = IpBlock(gateway_ip,
481                                    self.generator_config.gateway_ip_addrs_step,
482                                    self.chain_count)
483
484     def get_gw_ip(self, chain_index):
485         """Retrieve the IP address assigned for the gateway of a given chain."""
486         return self.gw_ip_block.get_ip(chain_index)
487
488     def get_stream_configs(self):
489         """Get the stream config for a given chain on this device.
490
491         Called by the traffic generator driver to program the traffic generator properly
492         before generating traffic
493         """
494         configs = []
495         # exact flow count for each chain is calculated as follows:
496         # - all chains except the first will have the same flow count
497         #   calculated as (total_flows + chain_count - 1) / chain_count
498         # - the first chain will have the remainder
499         # example 11 flows and 3 chains => 3, 4, 4
500         flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count)
501         cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1))
502
503         peer = self.get_peer_device()
504         self.ip_block.reset_reservation()
505         peer.ip_block.reset_reservation()
506         dest_macs = self.get_dest_macs()
507
508         # limit ranges of UDP ports and IP to avoid overflow of the number of flows
509         peer_size = peer.ip_size // self.chain_count
510
511         for chain_idx in range(self.chain_count):
512             src_ip_size, peer_ip_size = self.limit_ip_udp_ranges(peer_size, cur_chain_flow_count)
513
514             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range \
515                 (src_ip_size)
516             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range \
517                 (peer_ip_size)
518
519             if self.ip_addrs_step != 'random':
520                 src_ip_last = self.ip_block.get_ip_from_chain_first_ip(
521                     Device.ip_to_int(src_ip_first), src_ip_size - 1)
522                 dst_ip_last = peer.ip_block.get_ip_from_chain_first_ip(
523                     Device.ip_to_int(dst_ip_first), peer_ip_size - 1)
524             if self.udp_ports.step != 'random':
525                 self.udp_ports.src_max = self.udp_ports.get_src_max(self.udp_ports.udp_src_size - 1)
526                 self.udp_ports.dst_max = self.udp_ports.get_dst_max(self.udp_ports.udp_dst_size - 1)
527             if self.ip_src_static:
528                 src_ip_last = src_ip_first
529
530             LOG.info("Port %d, chain %d: IP src range [%s,%s]", self.port, chain_idx,
531                      src_ip_first, src_ip_last)
532             LOG.info("Port %d, chain %d: IP dst range [%s,%s]", self.port, chain_idx,
533                      dst_ip_first, dst_ip_last)
534             LOG.info("Port %d, chain %d: UDP src range [%s,%s]", self.port, chain_idx,
535                      self.udp_ports.src_min, self.udp_ports.src_max)
536             LOG.info("Port %d, chain %d: UDP dst range [%s,%s]", self.port, chain_idx,
537                      self.udp_ports.dst_min, self.udp_ports.dst_max)
538
539             configs.append({
540                 'count': cur_chain_flow_count,
541                 'mac_src': self.mac,
542                 'mac_dst': dest_macs[chain_idx],
543                 'ip_src_addr': src_ip_first,
544                 'ip_src_addr_max': src_ip_last,
545                 'ip_src_count': src_ip_size,
546                 'ip_dst_addr': dst_ip_first,
547                 'ip_dst_addr_max': dst_ip_last,
548                 'ip_dst_count': peer_ip_size,
549                 'ip_addrs_step': self.ip_addrs_step,
550                 'ip_src_static': self.ip_src_static,
551                 'udp_src_port': self.udp_ports.src_min,
552                 'udp_src_port_max': self.udp_ports.src_max,
553                 'udp_src_count': self.udp_ports.udp_src_size,
554                 'udp_dst_port': self.udp_ports.dst_min,
555                 'udp_dst_port_max': self.udp_ports.dst_max,
556                 'udp_dst_count': self.udp_ports.udp_dst_size,
557                 'udp_port_step': self.udp_ports.step,
558                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
559                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
560                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
561                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
562                 'vxlan': self.vxlan,
563                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
564                 'vtep_src_mac': self.mac if (self.vxlan or self.mpls) else None,
565                 'vtep_dst_mac': self.vtep_dst_mac if (self.vxlan or self.mpls) else None,
566                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
567                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
568                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None,
569                 'mpls': self.mpls,
570                 'mpls_outer_label': self.outer_labels[chain_idx] if self.mpls is True else None,
571                 'mpls_inner_label': self.inner_labels[chain_idx] if self.mpls is True else None
572
573             })
574             # after first chain, fall back to the flow count for all other chains
575             cur_chain_flow_count = flows_per_chain
576         return configs
577
578     @staticmethod
579     def ip_to_int(addr):
580         """Convert an IP address from string to numeric."""
581         return struct.unpack("!I", socket.inet_aton(addr))[0]
582
583     @staticmethod
584     def int_to_ip(nvalue):
585         """Convert an IP address from numeric to string."""
586         return socket.inet_ntoa(struct.pack("!I", int(nvalue)))
587
588
589 class GeneratorConfig(object):
590     """Represents traffic configuration for currently running traffic profile."""
591
592     DEFAULT_IP_STEP = '0.0.0.1'
593     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
594
595     def __init__(self, config):
596         """Create a generator config."""
597         self.config = config
598         # name of the generator profile (normally trex or dummy)
599         # pick the default one if not specified explicitly from cli options
600         if not config.generator_profile:
601             config.generator_profile = config.traffic_generator.default_profile
602         # pick up the profile dict based on the name
603         gen_config = self.__match_generator_profile(config.traffic_generator,
604                                                     config.generator_profile)
605         self.gen_config = gen_config
606         # copy over fields from the dict
607         self.tool = gen_config.tool
608         self.ip = gen_config.ip
609         # overrides on config.cores and config.mbuf_factor
610         if config.cores:
611             self.cores = config.cores
612         else:
613             self.cores = gen_config.get('cores', 1)
614         # let's report the value actually used in the end
615         config.cores_used = self.cores
616         self.mbuf_factor = config.mbuf_factor
617         self.mbuf_64 = config.mbuf_64
618         self.hdrh = not config.disable_hdrh
619         if config.intf_speed:
620             # interface speed is overriden from the command line
621             self.intf_speed = config.intf_speed
622         elif gen_config.intf_speed:
623             # interface speed is overriden from the generator config
624             self.intf_speed = gen_config.intf_speed
625         else:
626             self.intf_speed = "auto"
627         if self.intf_speed == "auto" or self.intf_speed == "0":
628             # interface speed is discovered/provided by the traffic generator
629             self.intf_speed = 0
630         else:
631             self.intf_speed = bitmath.parse_string(self.intf_speed.replace('ps', '')).bits
632         self.name = gen_config.name
633         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
634         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
635         self.limit_memory = gen_config.get('limit_memory', 1024)
636         self.software_mode = gen_config.get('software_mode', False)
637         self.interfaces = gen_config.interfaces
638         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
639             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
640         self.service_chain = config.service_chain
641         self.service_chain_count = config.service_chain_count
642         self.flow_count = config.flow_count
643         self.host_name = gen_config.host_name
644         self.bidirectional = config.traffic.bidirectional
645         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
646         self.ip_addrs = gen_config.ip_addrs
647         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
648         self.tg_gateway_ip_addrs_step = \
649             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
650         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
651         self.gateway_ips = gen_config.gateway_ip_addrs
652         self.ip_src_static = gen_config.ip_src_static
653         self.vteps = gen_config.get('vteps')
654         self.devices = [Device(port, self) for port in [0, 1]]
655         # This should normally always be [0, 1]
656         self.ports = [device.port for device in self.devices]
657
658         # check that pci is not empty
659         if not gen_config.interfaces[0].get('pci', None) or \
660            not gen_config.interfaces[1].get('pci', None):
661             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
662
663         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
664         self.vlan_tagging = config.vlan_tagging
665
666         # needed for result/summarizer
667         config['tg-name'] = gen_config.name
668         config['tg-tool'] = self.tool
669
670     def to_json(self):
671         """Get json form to display the content into the overall result dict."""
672         return dict(self.gen_config)
673
674     def set_dest_macs(self, port_index, dest_macs):
675         """Set the list of dest MACs indexed by the chain id on given port.
676
677         port_index: the port for which dest macs must be set
678         dest_macs: a list of dest MACs indexed by chain id
679         """
680         if len(dest_macs) < self.config.service_chain_count:
681             raise TrafficClientException('Dest MAC list %s must have %d entries' %
682                                          (dest_macs, self.config.service_chain_count))
683         # only pass the first scc dest MACs
684         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
685         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
686
687     def set_vtep_dest_macs(self, port_index, dest_macs):
688         """Set the list of dest MACs indexed by the chain id on given port.
689
690         port_index: the port for which dest macs must be set
691         dest_macs: a list of dest MACs indexed by chain id
692         """
693         if len(dest_macs) != self.config.service_chain_count:
694             raise TrafficClientException('Dest MAC list %s must have %d entries' %
695                                          (dest_macs, self.config.service_chain_count))
696         self.devices[port_index].set_vtep_dst_mac(dest_macs)
697         LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs})
698
699     def get_dest_macs(self):
700         """Return the list of dest macs indexed by port."""
701         return [dev.get_dest_macs() for dev in self.devices]
702
703     def set_vlans(self, port_index, vlans):
704         """Set the list of vlans to use indexed by the chain id on given port.
705
706         port_index: the port for which VLANs must be set
707         vlans: a  list of vlan lists indexed by chain id
708         """
709         if len(vlans) != self.config.service_chain_count:
710             raise TrafficClientException('VLAN list %s must have %d entries' %
711                                          (vlans, self.config.service_chain_count))
712         self.devices[port_index].set_vlans(vlans)
713
714     def set_vxlans(self, port_index, vxlans):
715         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
716
717         port_index: the port for which VXLANs must be set
718         VXLANs: a  list of VNIs lists indexed by chain id
719         """
720         if len(vxlans) != self.config.service_chain_count:
721             raise TrafficClientException('VXLAN list %s must have %d entries' %
722                                          (vxlans, self.config.service_chain_count))
723         self.devices[port_index].set_vxlans(vxlans)
724
725     def set_mpls_inner_labels(self, port_index, labels):
726         """Set the list of MPLS Labels to use indexed by the chain id on given port.
727
728         port_index: the port for which Labels must be set
729         Labels: a list of Labels lists indexed by chain id
730         """
731         if len(labels) != self.config.service_chain_count:
732             raise TrafficClientException('Inner MPLS list %s must have %d entries' %
733                                          (labels, self.config.service_chain_count))
734         self.devices[port_index].set_mpls_inner_labels(labels)
735
736     def set_mpls_outer_labels(self, port_index, labels):
737         """Set the list of MPLS Labels to use indexed by the chain id on given port.
738
739         port_index: the port for which Labels must be set
740         Labels: a list of Labels lists indexed by chain id
741         """
742         if len(labels) != self.config.service_chain_count:
743             raise TrafficClientException('Outer MPLS list %s must have %d entries' %
744                                          (labels, self.config.service_chain_count))
745         self.devices[port_index].set_mpls_outer_labels(labels)
746
747     def set_vtep_vlan(self, port_index, vlan):
748         """Set the vtep vlan to use indexed by the chain id on given port.
749         port_index: the port for which VLAN must be set
750         """
751         self.devices[port_index].set_vtep_vlan(vlan)
752
753     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
754         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
755
756     def set_mpls_peers(self, port_index, src_ip, dst_ip):
757         self.devices[port_index].set_mpls_peers(src_ip, dst_ip)
758
759     @staticmethod
760     def __match_generator_profile(traffic_generator, generator_profile):
761         gen_config = AttrDict(traffic_generator)
762         gen_config.pop('default_profile')
763         gen_config.pop('generator_profile')
764         matching_profile = [profile for profile in traffic_generator.generator_profile if
765                             profile.name == generator_profile]
766         if len(matching_profile) != 1:
767             raise Exception('Traffic generator profile not found: ' + generator_profile)
768
769         gen_config.update(matching_profile[0])
770         return gen_config
771
772
773 class TrafficClient(object):
774     """Traffic generator client with NDR/PDR binary seearch."""
775
776     PORTS = [0, 1]
777
778     def __init__(self, config, notifier=None):
779         """Create a new TrafficClient instance.
780
781         config: nfvbench config
782         notifier: notifier (optional)
783
784         A new instance is created everytime the nfvbench config may have changed.
785         """
786         self.config = config
787         self.generator_config = GeneratorConfig(config)
788         self.tool = self.generator_config.tool
789         self.gen = self._get_generator()
790         self.notifier = notifier
791         self.interval_collector = None
792         self.iteration_collector = None
793         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
794                                     self.config.service_mode)
795         self.config.frame_sizes = self._get_frame_sizes()
796         self.run_config = {
797             'l2frame_size': None,
798             'duration_sec': self.config.duration_sec,
799             'bidirectional': True,
800             'rates': []  # to avoid unsbuscriptable-obj warning
801         }
802         self.current_total_rate = {'rate_percent': '10'}
803         if self.config.single_run:
804             self.current_total_rate = utils.parse_rate_str(self.config.rate)
805         self.ifstats = None
806         # Speed is either discovered when connecting to TG or set from config
807         # This variable is 0 if not yet discovered from TG or must be the speed of
808         # each interface in bits per second
809         self.intf_speed = self.generator_config.intf_speed
810
811     def _get_generator(self):
812         tool = self.tool.lower()
813         if tool == 'trex':
814             from .traffic_gen import trex_gen
815             return trex_gen.TRex(self)
816         if tool == 'dummy':
817             from .traffic_gen import dummy
818             return dummy.DummyTG(self)
819         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
820
821     def skip_sleep(self):
822         """Skip all sleeps when doing unit testing with dummy TG.
823
824         Must be overriden using mock.patch
825         """
826         return False
827
828     def _get_frame_sizes(self):
829         traffic_profile_name = self.config.traffic.profile
830         matching_profiles = [profile for profile in self.config.traffic_profile if
831                              profile.name == traffic_profile_name]
832         if len(matching_profiles) > 1:
833             raise TrafficClientException('Multiple traffic profiles with name: ' +
834                                          traffic_profile_name)
835         if not matching_profiles:
836             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
837         return matching_profiles[0].l2frame_size
838
839     def start_traffic_generator(self):
840         """Start the traffic generator process (traffic not started yet)."""
841         self.gen.connect()
842         # pick up the interface speed if it is not set from config
843         intf_speeds = self.gen.get_port_speed_gbps()
844         # convert Gbps unit into bps
845         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
846         if self.intf_speed:
847             # interface speed is overriden from config
848             if self.intf_speed != tg_if_speed:
849                 # Warn the user if the speed in the config is different
850                 LOG.warning(
851                     'Interface speed provided (%g Gbps) is different from actual speed (%d Gbps)',
852                     self.intf_speed / 1000000000.0, intf_speeds[0])
853         else:
854             # interface speed not provisioned by config
855             self.intf_speed = tg_if_speed
856             # also update the speed in the tg config
857             self.generator_config.intf_speed = tg_if_speed
858         # let's report detected and actually used interface speed
859         self.config.intf_speed_detected = tg_if_speed
860         self.config.intf_speed_used = self.intf_speed
861
862         # Save the traffic generator local MAC
863         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
864             device.set_mac(mac)
865
866     def setup(self):
867         """Set up the traffic client."""
868         self.gen.clear_stats()
869
870     def get_version(self):
871         """Get the traffic generator version."""
872         return self.gen.get_version()
873
874     def ensure_end_to_end(self):
875         """Ensure traffic generator receives packets it has transmitted.
876
877         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
878
879         VMs that are started and in active state may not pass traffic yet. It is imperative to make
880         sure that all VMs are passing traffic in both directions before starting any benchmarking.
881         To verify this, we need to send at a low frequency bi-directional packets and make sure
882         that we receive all packets back from all VMs. The number of flows is equal to 2 times
883         the number of chains (1 per direction) and we need to make sure we receive packets coming
884         from exactly 2 x chain count different source MAC addresses.
885
886         Example:
887             PVP chain (1 VM per chain)
888             N = 10 (number of chains)
889             Flow count = 20 (number of flows)
890             If the number of unique source MAC addresses from received packets is 20 then
891             all 10 VMs 10 VMs are in operational state.
892         """
893         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
894         # send 2pps on each chain and each direction
895         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
896         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
897                                 e2e=True)
898         # ensures enough traffic is coming back
899         retry_count = int((self.config.check_traffic_time_sec +
900                            self.config.generic_poll_sec - 1) / self.config.generic_poll_sec)
901
902         # we expect to see packets coming from 2 unique MAC per chain
903         # because there can be flooding in the case of shared net
904         # we must verify that packets from the right VMs are received
905         # and not just count unique src MAC
906         # create a dict of (port, chain) tuples indexed by dest mac
907         mac_map = {}
908         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
909             for chain, mac in enumerate(dest_macs):
910                 mac_map[mac] = (port, chain)
911         unique_src_mac_count = len(mac_map)
912         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
913             get_mac_id = lambda packet: packet['binary'][60:66]
914         elif self.config.vxlan:
915             get_mac_id = lambda packet: packet['binary'][56:62]
916         elif self.config.mpls:
917             get_mac_id = lambda packet: packet['binary'][24:30]
918             # mpls_transport_label = lambda packet: packet['binary'][14:18]
919         else:
920             get_mac_id = lambda packet: packet['binary'][6:12]
921         for it in range(retry_count):
922             self.gen.clear_stats()
923             self.gen.start_traffic()
924             self.gen.start_capture()
925             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
926                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
927                      it + 1, retry_count)
928             if not self.skip_sleep():
929                 time.sleep(self.config.generic_poll_sec)
930             self.gen.stop_traffic()
931             self.gen.fetch_capture_packets()
932             self.gen.stop_capture()
933             for packet in self.gen.packet_list:
934                 mac_id = get_mac_id(packet).decode('latin-1')
935                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
936                 if self.config.mpls:
937                     if src_mac in mac_map and self.is_mpls(packet):
938                         port, chain = mac_map[src_mac]
939                         LOG.info('Received mpls packet from mac: %s (chain=%d, port=%d)',
940                                  src_mac, chain, port)
941                         mac_map.pop(src_mac, None)
942                 else:
943                     if src_mac in mac_map and self.is_udp(packet):
944                         port, chain = mac_map[src_mac]
945                         LOG.info('Received udp packet from mac: %s (chain=%d, port=%d)',
946                                  src_mac, chain, port)
947                         mac_map.pop(src_mac, None)
948
949                 if not mac_map:
950                     LOG.info('End-to-end connectivity established')
951                     return
952             if self.config.l3_router and not self.config.no_arp:
953                 # In case of L3 traffic mode, routers are not able to route traffic
954                 # until VM interfaces are up and ARP requests are done
955                 LOG.info('Waiting for loopback service completely started...')
956                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
957                 self.ensure_arp_successful()
958         raise TrafficClientException('End-to-end connectivity cannot be ensured')
959
960     def is_udp(self, packet):
961         pkt = Ether(packet['binary'])
962         return UDP in pkt
963
964     def is_mpls(self, packet):
965         pkt = Ether(packet['binary'])
966         return MPLS in pkt
967
968     def ensure_arp_successful(self):
969         """Resolve all IP using ARP and throw an exception in case of failure."""
970         dest_macs = self.gen.resolve_arp()
971         if dest_macs:
972             # all dest macs are discovered, saved them into the generator config
973             if self.config.vxlan or self.config.mpls:
974                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
975                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
976             else:
977                 self.generator_config.set_dest_macs(0, dest_macs[0])
978                 self.generator_config.set_dest_macs(1, dest_macs[1])
979         else:
980             raise TrafficClientException('ARP cannot be resolved')
981
982     def set_traffic(self, frame_size, bidirectional):
983         """Reconfigure the traffic generator for a new frame size."""
984         self.run_config['bidirectional'] = bidirectional
985         self.run_config['l2frame_size'] = frame_size
986         self.run_config['rates'] = [self.get_per_direction_rate()]
987         if bidirectional:
988             self.run_config['rates'].append(self.get_per_direction_rate())
989         else:
990             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
991             if unidir_reverse_pps > 0:
992                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
993         # Fix for [NFVBENCH-67], convert the rate string to PPS
994         for idx, rate in enumerate(self.run_config['rates']):
995             if 'rate_pps' not in rate:
996                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
997
998         self.gen.clear_streamblock()
999
1000         if self.config.no_latency_streams:
1001             LOG.info("Latency streams are disabled")
1002         # in service mode, we must disable flow stats (e2e=True)
1003         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
1004                                 latency=not self.config.no_latency_streams,
1005                                 e2e=self.runner.service_mode)
1006
1007     def _modify_load(self, load):
1008         self.current_total_rate = {'rate_percent': str(load)}
1009         rate_per_direction = self.get_per_direction_rate()
1010
1011         self.gen.modify_rate(rate_per_direction, False)
1012         self.run_config['rates'][0] = rate_per_direction
1013         if self.run_config['bidirectional']:
1014             self.gen.modify_rate(rate_per_direction, True)
1015             self.run_config['rates'][1] = rate_per_direction
1016
1017     def get_ndr_and_pdr(self):
1018         """Start the NDR/PDR iteration and return the results."""
1019         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
1020         targets = {}
1021         if self.config.ndr_run:
1022             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
1023             targets['ndr'] = self.config.measurement.NDR
1024         if self.config.pdr_run:
1025             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
1026             targets['pdr'] = self.config.measurement.PDR
1027
1028         self.run_config['start_time'] = time.time()
1029         self.interval_collector = IntervalCollector(self.run_config['start_time'])
1030         self.interval_collector.attach_notifier(self.notifier)
1031         self.iteration_collector = IterationCollector(self.run_config['start_time'])
1032         results = {}
1033         self.__range_search(0.0, 200.0, targets, results)
1034
1035         results['iteration_stats'] = {
1036             'ndr_pdr': self.iteration_collector.get()
1037         }
1038
1039         if self.config.ndr_run:
1040             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
1041             results['ndr']['time_taken_sec'] = \
1042                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
1043             if self.config.pdr_run:
1044                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
1045                 results['pdr']['time_taken_sec'] = \
1046                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
1047         else:
1048             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
1049             results['pdr']['time_taken_sec'] = \
1050                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
1051         return results
1052
1053     def __get_dropped_rate(self, result):
1054         dropped_pkts = result['rx']['dropped_pkts']
1055         total_pkts = result['tx']['total_pkts']
1056         if not total_pkts:
1057             return float('inf')
1058         return float(dropped_pkts) / total_pkts * 100
1059
1060     def get_stats(self):
1061         """Collect final stats for previous run."""
1062         stats = self.gen.get_stats(self.ifstats)
1063         retDict = {'total_tx_rate': stats['total_tx_rate'],
1064                    'offered_tx_rate_bps': stats['offered_tx_rate_bps'],
1065                    'theoretical_tx_rate_bps': stats['theoretical_tx_rate_bps'],
1066                    'theoretical_tx_rate_pps': stats['theoretical_tx_rate_pps']}
1067
1068         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
1069         rx_keys = tx_keys + ['dropped_pkts']
1070
1071         for port in self.PORTS:
1072             port_stats = {'tx': {}, 'rx': {}}
1073             for key in tx_keys:
1074                 port_stats['tx'][key] = int(stats[port]['tx'][key])
1075             for key in rx_keys:
1076                 try:
1077                     port_stats['rx'][key] = int(stats[port]['rx'][key])
1078                 except ValueError:
1079                     port_stats['rx'][key] = 0
1080             port_stats['rx']['avg_delay_usec'] = cast_integer(
1081                 stats[port]['rx']['avg_delay_usec'])
1082             port_stats['rx']['min_delay_usec'] = cast_integer(
1083                 stats[port]['rx']['min_delay_usec'])
1084             port_stats['rx']['max_delay_usec'] = cast_integer(
1085                 stats[port]['rx']['max_delay_usec'])
1086             port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats)
1087             retDict[str(port)] = port_stats
1088
1089         ports = sorted(list(retDict.keys()), key=str)
1090         if self.run_config['bidirectional']:
1091             retDict['overall'] = {'tx': {}, 'rx': {}}
1092             for key in tx_keys:
1093                 retDict['overall']['tx'][key] = \
1094                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
1095             for key in rx_keys:
1096                 retDict['overall']['rx'][key] = \
1097                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
1098             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
1099                           retDict[ports[1]]['rx']['total_pkts']]
1100             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
1101                           retDict[ports[1]]['rx']['avg_delay_usec']]
1102             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
1103                           retDict[ports[1]]['rx']['max_delay_usec']]
1104             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
1105                           retDict[ports[1]]['rx']['min_delay_usec']]
1106             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
1107             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
1108             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
1109             for key in ['pkt_bit_rate', 'pkt_rate']:
1110                 for dirc in ['tx', 'rx']:
1111                     retDict['overall'][dirc][key] /= 2.0
1112                 retDict['overall']['hdrh'] = stats.get('hdrh', None)
1113                 if retDict['overall']['hdrh']:
1114                     decoded_histogram = HdrHistogram.decode(retDict['overall']['hdrh'])
1115                     # override min max and avg from hdrh
1116                     retDict['overall']['rx']['min_delay_usec'] = decoded_histogram.get_min_value()
1117                     retDict['overall']['rx']['max_delay_usec'] = decoded_histogram.get_max_value()
1118                     retDict['overall']['rx']['avg_delay_usec'] = decoded_histogram.get_mean_value()
1119                     retDict['overall']['rx']['lat_percentile'] = {}
1120                     for percentile in self.config.lat_percentiles:
1121                         retDict['overall']['rx']['lat_percentile'][percentile] = \
1122                             decoded_histogram.get_value_at_percentile(percentile)
1123
1124         else:
1125             retDict['overall'] = retDict[ports[0]]
1126         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
1127         return retDict
1128
1129     def __convert_rates(self, rate):
1130         return utils.convert_rates(self.run_config['l2frame_size'],
1131                                    rate,
1132                                    self.intf_speed)
1133
1134     def __ndr_pdr_found(self, tag, load):
1135         rates = self.__convert_rates({'rate_percent': load})
1136         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
1137         last_stats = self.iteration_collector.peek()
1138         self.interval_collector.add_ndr_pdr(tag, last_stats)
1139
1140     def __format_output_stats(self, stats):
1141         for key in self.PORTS + ['overall']:
1142             key = str(key)
1143             interface = stats[key]
1144             stats[key] = {
1145                 'tx_pkts': interface['tx']['total_pkts'],
1146                 'rx_pkts': interface['rx']['total_pkts'],
1147                 'drop_percentage': interface['drop_rate_percent'],
1148                 'drop_pct': interface['rx']['dropped_pkts'],
1149                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
1150                 'max_delay_usec': interface['rx']['max_delay_usec'],
1151                 'min_delay_usec': interface['rx']['min_delay_usec'],
1152             }
1153
1154             if key == 'overall':
1155                 stats[key]['hdrh'] = interface.get('hdrh', None)
1156                 if stats[key]['hdrh']:
1157                     decoded_histogram = HdrHistogram.decode(stats[key]['hdrh'])
1158                     # override min max and avg from hdrh
1159                     stats[key]['min_delay_usec'] = decoded_histogram.get_min_value()
1160                     stats[key]['max_delay_usec'] = decoded_histogram.get_max_value()
1161                     stats[key]['avg_delay_usec'] = decoded_histogram.get_mean_value()
1162                     stats[key]['lat_percentile'] = {}
1163                     for percentile in self.config.lat_percentiles:
1164                         stats[key]['lat_percentile'][percentile] = decoded_histogram.\
1165                             get_value_at_percentile(percentile)
1166
1167
1168         return stats
1169
1170     def __targets_found(self, rate, targets, results):
1171         for tag, target in list(targets.items()):
1172             LOG.info('Found %s (%s) load: %s', tag, target, rate)
1173             self.__ndr_pdr_found(tag, rate)
1174             results[tag]['timestamp_sec'] = time.time()
1175
1176     def __range_search(self, left, right, targets, results):
1177         """Perform a binary search for a list of targets inside a [left..right] range or rate.
1178
1179         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
1180                 indicating the rate to send on each interface
1181         right   the right side of the range to search as a % of line rate
1182                 indicating the rate to send on each interface
1183         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
1184                 ('ndr', 'pdr')
1185         results a dict to store results
1186         """
1187         if not targets:
1188             return
1189         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
1190
1191         # Terminate search when gap is less than load epsilon
1192         if right - left < self.config.measurement.load_epsilon:
1193             self.__targets_found(left, targets, results)
1194             return
1195
1196         # Obtain the average drop rate in for middle load
1197         middle = (left + right) / 2.0
1198         try:
1199             stats, rates = self.__run_search_iteration(middle)
1200         except STLError:
1201             LOG.exception("Got exception from traffic generator during binary search")
1202             self.__targets_found(left, targets, results)
1203             return
1204         # Split target dicts based on the avg drop rate
1205         left_targets = {}
1206         right_targets = {}
1207         for tag, target in list(targets.items()):
1208             if stats['overall']['drop_rate_percent'] <= target:
1209                 # record the best possible rate found for this target
1210                 results[tag] = rates
1211                 results[tag].update({
1212                     'load_percent_per_direction': middle,
1213                     'stats': self.__format_output_stats(dict(stats)),
1214                     'timestamp_sec': None
1215                 })
1216                 right_targets[tag] = target
1217             else:
1218                 # initialize to 0 all fields of result for
1219                 # the worst case scenario of the binary search (if ndr/pdr is not found)
1220                 if tag not in results:
1221                     results[tag] = dict.fromkeys(rates, 0)
1222                     empty_stats = self.__format_output_stats(dict(stats))
1223                     for key in empty_stats:
1224                         if isinstance(empty_stats[key], dict):
1225                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
1226                         else:
1227                             empty_stats[key] = 0
1228                     results[tag].update({
1229                         'load_percent_per_direction': 0,
1230                         'stats': empty_stats,
1231                         'timestamp_sec': None
1232                     })
1233                 left_targets[tag] = target
1234
1235         # search lower half
1236         self.__range_search(left, middle, left_targets, results)
1237
1238         # search upper half only if the upper rate does not exceed
1239         # 100%, this only happens when the first search at 100%
1240         # yields a DR that is < target DR
1241         if middle >= 100:
1242             self.__targets_found(100, right_targets, results)
1243         else:
1244             self.__range_search(middle, right, right_targets, results)
1245
1246     def __run_search_iteration(self, rate):
1247         """Run one iteration at the given rate level.
1248
1249         rate: the rate to send on each port in percent (0 to 100)
1250         """
1251         self._modify_load(rate)
1252
1253         # poll interval stats and collect them
1254         for stats in self.run_traffic():
1255             self.interval_collector.add(stats)
1256             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
1257             if time_elapsed_ratio >= 1:
1258                 self.cancel_traffic()
1259                 if not self.skip_sleep():
1260                     time.sleep(self.config.pause_sec)
1261         self.interval_collector.reset()
1262
1263         # get stats from the run
1264         stats = self.runner.client.get_stats()
1265         current_traffic_config = self._get_traffic_config()
1266         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
1267                                         stats['total_tx_rate'])
1268         if warning is not None:
1269             stats['warning'] = warning
1270
1271         # save reliable stats from whole iteration
1272         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
1273         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
1274         return stats, current_traffic_config['direction-total']
1275
1276     def log_stats(self, stats):
1277         """Log estimated stats during run."""
1278         # Calculate a rolling drop rate based on differential to
1279         # the previous reading
1280         cur_tx = stats['overall']['tx']['total_pkts']
1281         cur_rx = stats['overall']['rx']['total_pkts']
1282         delta_tx = cur_tx - self.prev_tx
1283         delta_rx = cur_rx - self.prev_rx
1284         drops = delta_tx - delta_rx
1285         drop_rate_pct = 100 * (delta_tx - delta_rx)/delta_tx
1286         self.prev_tx = cur_tx
1287         self.prev_rx = cur_rx
1288         LOG.info('TX: %15s; RX: %15s; (Est.) Dropped: %12s; Drop rate: %8.4f%%',
1289                  format(cur_tx, ',d'),
1290                  format(cur_rx, ',d'),
1291                  format(drops, ',d'),
1292                  drop_rate_pct)
1293
1294     def run_traffic(self):
1295         """Start traffic and return intermediate stats for each interval."""
1296         stats = self.runner.run()
1297         self.prev_tx = 0
1298         self.prev_rx = 0
1299         while self.runner.is_running:
1300             self.log_stats(stats)
1301             yield stats
1302             stats = self.runner.poll_stats()
1303             if stats is None:
1304                 return
1305         self.log_stats(stats)
1306         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
1307         yield stats
1308
1309     def cancel_traffic(self):
1310         """Stop traffic."""
1311         self.runner.stop()
1312
1313     def _get_traffic_config(self):
1314         config = {}
1315         load_total = 0.0
1316         bps_total = 0.0
1317         pps_total = 0.0
1318         for idx, rate in enumerate(self.run_config['rates']):
1319             key = 'direction-forward' if idx == 0 else 'direction-reverse'
1320             config[key] = {
1321                 'l2frame_size': self.run_config['l2frame_size'],
1322                 'duration_sec': self.run_config['duration_sec']
1323             }
1324             config[key].update(rate)
1325             config[key].update(self.__convert_rates(rate))
1326             load_total += float(config[key]['rate_percent'])
1327             bps_total += float(config[key]['rate_bps'])
1328             pps_total += float(config[key]['rate_pps'])
1329         config['direction-total'] = dict(config['direction-forward'])
1330         config['direction-total'].update({
1331             'rate_percent': load_total,
1332             'rate_pps': cast_integer(pps_total),
1333             'rate_bps': bps_total
1334         })
1335
1336         return config
1337
1338     def get_run_config(self, results):
1339         """Return configuration which was used for the last run."""
1340         r = {}
1341         # because we want each direction to have the far end RX rates,
1342         # use the far end index (1-idx) to retrieve the RX rates
1343         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
1344             tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec
1345             rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec
1346             r[key] = {
1347                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
1348                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
1349                 "rx": self.__convert_rates({'rate_pps': rx_rate})
1350             }
1351
1352         total = {}
1353         for direction in ['orig', 'tx', 'rx']:
1354             total[direction] = {}
1355             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
1356                 total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())])
1357
1358         r['direction-total'] = total
1359         return r
1360
1361     def insert_interface_stats(self, pps_list):
1362         """Insert interface stats to a list of packet path stats.
1363
1364         pps_list: a list of packet path stats instances indexed by chain index
1365
1366         This function will insert the packet path stats for the traffic gen ports 0 and 1
1367         with itemized per chain tx/rx counters.
1368         There will be as many packet path stats as chains.
1369         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1370         self.pps_list:
1371         [
1372         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1373         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1374         ...
1375         ]
1376         """
1377         def get_if_stats(chain_idx):
1378             return [InterfaceStats('p' + str(port), self.tool)
1379                     for port in range(2)]
1380         # keep the list of list of interface stats indexed by the chain id
1381         self.ifstats = [get_if_stats(chain_idx)
1382                         for chain_idx in range(self.config.service_chain_count)]
1383         # note that we need to make a copy of the ifs list so that any modification in the
1384         # list from pps will not change the list saved in self.ifstats
1385         self.pps_list = [PacketPathStats(self.config, list(ifs)) for ifs in self.ifstats]
1386         # insert the corresponding pps in the passed list
1387         pps_list.extend(self.pps_list)
1388
1389     def update_interface_stats(self, diff=False):
1390         """Update all interface stats.
1391
1392         diff: if False, simply refresh the interface stats values with latest values
1393               if True, diff the interface stats with the latest values
1394         Make sure that the interface stats inserted in insert_interface_stats() are updated
1395         with proper values.
1396         self.ifstats:
1397         [
1398         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1399         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1400         ...
1401         ]
1402         """
1403         if diff:
1404             stats = self.gen.get_stats(self.ifstats)
1405             for chain_idx, ifs in enumerate(self.ifstats):
1406                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1407                 # corresponding to the
1408                 # port 0 and port 1 for the given chain_idx
1409                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1410                 # interface stats for the pps because it could have been modified to contain
1411                 # additional interface stats
1412                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1413             # special handling for vxlan
1414             # in case of vxlan, flow stats are not available so all rx counters will be
1415             # zeros when the total rx port counter is non zero.
1416             # in that case,
1417             for port in range(2):
1418                 total_rx = 0
1419                 for ifs in self.ifstats:
1420                     total_rx += ifs[port].rx
1421                 if total_rx == 0:
1422                     # check if the total port rx from Trex is also zero
1423                     port_rx = stats[port]['rx']['total_pkts']
1424                     if port_rx:
1425                         # the total rx for all chains from port level stats is non zero
1426                         # which means that the per-chain stats are not available
1427                         if len(self.ifstats) == 1:
1428                             # only one chain, simply report the port level rx to the chain rx stats
1429                             self.ifstats[0][port].rx = port_rx
1430                         else:
1431                             for ifs in self.ifstats:
1432                                 # mark this data as unavailable
1433                                 ifs[port].rx = None
1434                             # pitch in the total rx only in the last chain pps
1435                             self.ifstats[-1][port].rx_total = port_rx
1436
1437     @staticmethod
1438     def compare_tx_rates(required, actual):
1439         """Compare the actual TX rate to the required TX rate."""
1440         threshold = 0.9
1441         are_different = False
1442         try:
1443             if float(actual) / required < threshold:
1444                 are_different = True
1445         except ZeroDivisionError:
1446             are_different = True
1447
1448         if are_different:
1449             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1450                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1451                   "to achieve the requested TX rate.".format(r=required, a=actual)
1452             LOG.info(msg)
1453             return msg
1454
1455         return None
1456
1457     def get_per_direction_rate(self):
1458         """Get the rate for each direction."""
1459         divisor = 2 if self.run_config['bidirectional'] else 1
1460         if 'rate_percent' in self.current_total_rate:
1461             # don't split rate if it's percentage
1462             divisor = 1
1463
1464         return utils.divide_rate(self.current_total_rate, divisor)
1465
1466     def close(self):
1467         """Close this instance."""
1468         try:
1469             self.gen.stop_traffic()
1470         except Exception:
1471             pass
1472         self.gen.clear_stats()
1473         self.gen.cleanup()