NFVBENCH-153 Add support for python3
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: enable=import-error
30
31 from .log import LOG
32 from .packet_stats import InterfaceStats
33 from .packet_stats import PacketPathStats
34 from .stats_collector import IntervalCollector
35 from .stats_collector import IterationCollector
36 from .traffic_gen import traffic_utils as utils
37 from .utils import cast_integer
38
39
40 class TrafficClientException(Exception):
41     """Generic traffic client exception."""
42
43 class TrafficRunner(object):
44     """Serialize various steps required to run traffic."""
45
46     def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
47         """Create a traffic runner."""
48         self.client = client
49         self.start_time = None
50         self.duration_sec = duration_sec
51         self.interval_sec = interval_sec
52         self.service_mode = service_mode
53
54     def run(self):
55         """Clear stats and instruct the traffic generator to start generating traffic."""
56         if self.is_running():
57             return None
58         LOG.info('Running traffic generator')
59         self.client.gen.clear_stats()
60         # Debug use only : new '--service-mode' option available for the NFVBench command line.
61         # A read-only mode TRex console would be able to capture the generated traffic.
62         self.client.gen.set_service_mode(enabled=self.service_mode)
63         LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
64         self.client.gen.start_traffic()
65         self.start_time = time.time()
66         return self.poll_stats()
67
68     def stop(self):
69         """Stop the current run and instruct the traffic generator to stop traffic."""
70         if self.is_running():
71             self.start_time = None
72             self.client.gen.stop_traffic()
73
74     def is_running(self):
75         """Check if a run is still pending."""
76         return self.start_time is not None
77
78     def time_elapsed(self):
79         """Return time elapsed since start of run."""
80         if self.is_running():
81             return time.time() - self.start_time
82         return self.duration_sec
83
84     def poll_stats(self):
85         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
86
87         return: latest stats or None if traffic is stopped
88         """
89         if not self.is_running():
90             return None
91         if self.client.skip_sleep():
92             self.stop()
93             return self.client.get_stats()
94         time_elapsed = self.time_elapsed()
95         if time_elapsed > self.duration_sec:
96             self.stop()
97             return None
98         time_left = self.duration_sec - time_elapsed
99         if self.interval_sec > 0.0:
100             if time_left <= self.interval_sec:
101                 time.sleep(time_left)
102                 self.stop()
103             else:
104                 time.sleep(self.interval_sec)
105         else:
106             time.sleep(self.duration_sec)
107             self.stop()
108         return self.client.get_stats()
109
110
111 class IpBlock(object):
112     """Manage a block of IP addresses."""
113
114     def __init__(self, base_ip, step_ip, count_ip):
115         """Create an IP block."""
116         self.base_ip_int = Device.ip_to_int(base_ip)
117         self.step = Device.ip_to_int(step_ip)
118         self.max_available = count_ip
119         self.next_free = 0
120
121     def get_ip(self, index=0):
122         """Return the IP address at given index."""
123         if index < 0 or index >= self.max_available:
124             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
125         return Device.int_to_ip(self.base_ip_int + index * self.step)
126
127     def reserve_ip_range(self, count):
128         """Reserve a range of count consecutive IP addresses spaced by step."""
129         if self.next_free + count > self.max_available:
130             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
131                              (self.next_free,
132                               self.max_available,
133                               count))
134         first_ip = self.get_ip(self.next_free)
135         last_ip = self.get_ip(self.next_free + count - 1)
136         self.next_free += count
137         return (first_ip, last_ip)
138
139     def reset_reservation(self):
140         """Reset all reservations and restart with a completely unused IP block."""
141         self.next_free = 0
142
143
144 class Device(object):
145     """Represent a port device and all information associated to it.
146
147     In the curent version we only support 2 port devices for the traffic generator
148     identified as port 0 or port 1.
149     """
150
151     def __init__(self, port, generator_config):
152         """Create a new device for a given port."""
153         self.generator_config = generator_config
154         self.chain_count = generator_config.service_chain_count
155         self.flow_count = generator_config.flow_count / 2
156         self.port = port
157         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
158         self.vtep_vlan = None
159         self.vtep_src_mac = None
160         self.vxlan = False
161         self.pci = generator_config.interfaces[port].pci
162         self.mac = None
163         self.dest_macs = None
164         self.vtep_dst_mac = None
165         self.vtep_dst_ip = None
166         if generator_config.vteps is None:
167             self.vtep_src_ip = None
168         else:
169             self.vtep_src_ip = generator_config.vteps[port]
170         self.vnis = None
171         self.vlans = None
172         self.ip_addrs = generator_config.ip_addrs[port]
173         subnet = IPNetwork(self.ip_addrs)
174         self.ip = subnet.ip.format()
175         self.ip_addrs_step = generator_config.ip_addrs_step
176         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
177         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
178                                    generator_config.gateway_ip_addrs_step,
179                                    self.chain_count)
180         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
181         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
182                                       generator_config.tg_gateway_ip_addrs_step,
183                                       self.chain_count)
184         self.udp_src_port = generator_config.udp_src_port
185         self.udp_dst_port = generator_config.udp_dst_port
186
187     def set_mac(self, mac):
188         """Set the local MAC for this port device."""
189         if mac is None:
190             raise TrafficClientException('Trying to set traffic generator MAC address as None')
191         self.mac = mac
192
193     def get_peer_device(self):
194         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
195         return self.generator_config.devices[1 - self.port]
196
197     def set_vtep_dst_mac(self, dest_macs):
198         """Set the list of dest MACs indexed by the chain id.
199
200         This is only called in 2 cases:
201         - VM macs discovered using openstack API
202         - dest MACs provisioned in config file
203         """
204         self.vtep_dst_mac = list(map(str, dest_macs))
205
206     def set_dest_macs(self, dest_macs):
207         """Set the list of dest MACs indexed by the chain id.
208
209         This is only called in 2 cases:
210         - VM macs discovered using openstack API
211         - dest MACs provisioned in config file
212         """
213         self.dest_macs = list(map(str, dest_macs))
214
215     def get_dest_macs(self):
216         """Get the list of dest macs for this device.
217
218         If set_dest_macs was never called, assumes l2-loopback and return
219         a list of peer mac (as many as chains but normally only 1 chain)
220         """
221         if self.dest_macs:
222             return self.dest_macs
223         # assume this is l2-loopback
224         return [self.get_peer_device().mac] * self.chain_count
225
226     def set_vlans(self, vlans):
227         """Set the list of vlans to use indexed by the chain id."""
228         self.vlans = vlans
229         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
230
231     def set_vtep_vlan(self, vlan):
232         """Set the vtep vlan to use indexed by specific port."""
233         self.vtep_vlan = vlan
234         self.vxlan = True
235         self.vlan_tagging = None
236         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
237
238     def set_vxlan_endpoints(self, src_ip, dst_ip):
239         self.vtep_dst_ip = dst_ip
240         self.vtep_src_ip = src_ip
241         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
242                  self.vtep_src_ip, self.vtep_dst_ip)
243
244     def set_vxlans(self, vnis):
245         self.vnis = vnis
246         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
247
248     def set_gw_ip(self, gateway_ip):
249         self.gw_ip_block = IpBlock(gateway_ip,
250                                    self.generator_config.gateway_ip_addrs_step,
251                                    self.chain_count)
252
253     def get_gw_ip(self, chain_index):
254         """Retrieve the IP address assigned for the gateway of a given chain."""
255         return self.gw_ip_block.get_ip(chain_index)
256
257     def get_stream_configs(self):
258         """Get the stream config for a given chain on this device.
259
260         Called by the traffic generator driver to program the traffic generator properly
261         before generating traffic
262         """
263         configs = []
264         # exact flow count for each chain is calculated as follows:
265         # - all chains except the first will have the same flow count
266         #   calculated as (total_flows + chain_count - 1) / chain_count
267         # - the first chain will have the remainder
268         # example 11 flows and 3 chains => 3, 4, 4
269         flows_per_chain = int((self.flow_count + self.chain_count - 1) / self.chain_count)
270         cur_chain_flow_count = int(self.flow_count - flows_per_chain * (self.chain_count - 1))
271         peer = self.get_peer_device()
272         self.ip_block.reset_reservation()
273         peer.ip_block.reset_reservation()
274         dest_macs = self.get_dest_macs()
275
276         for chain_idx in range(self.chain_count):
277             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
278             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
279
280             configs.append({
281                 'count': cur_chain_flow_count,
282                 'mac_src': self.mac,
283                 'mac_dst': dest_macs[chain_idx],
284                 'ip_src_addr': src_ip_first,
285                 'ip_src_addr_max': src_ip_last,
286                 'ip_src_count': cur_chain_flow_count,
287                 'ip_dst_addr': dst_ip_first,
288                 'ip_dst_addr_max': dst_ip_last,
289                 'ip_dst_count': cur_chain_flow_count,
290                 'ip_addrs_step': self.ip_addrs_step,
291                 'udp_src_port': self.udp_src_port,
292                 'udp_dst_port': self.udp_dst_port,
293                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
294                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
295                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
296                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
297                 'vxlan': self.vxlan,
298                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
299                 'vtep_src_mac': self.mac if self.vxlan is True else None,
300                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
301                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
302                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
303                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
304             })
305             # after first chain, fall back to the flow count for all other chains
306             cur_chain_flow_count = flows_per_chain
307         return configs
308
309     @staticmethod
310     def ip_to_int(addr):
311         """Convert an IP address from string to numeric."""
312         return struct.unpack("!I", socket.inet_aton(addr))[0]
313
314     @staticmethod
315     def int_to_ip(nvalue):
316         """Convert an IP address from numeric to string."""
317         return socket.inet_ntoa(struct.pack("!I", int(nvalue)))
318
319
320 class GeneratorConfig(object):
321     """Represents traffic configuration for currently running traffic profile."""
322
323     DEFAULT_IP_STEP = '0.0.0.1'
324     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
325
326     def __init__(self, config):
327         """Create a generator config."""
328         self.config = config
329         # name of the generator profile (normally trex or dummy)
330         # pick the default one if not specified explicitly from cli options
331         if not config.generator_profile:
332             config.generator_profile = config.traffic_generator.default_profile
333         # pick up the profile dict based on the name
334         gen_config = self.__match_generator_profile(config.traffic_generator,
335                                                     config.generator_profile)
336         self.gen_config = gen_config
337         # copy over fields from the dict
338         self.tool = gen_config.tool
339         self.ip = gen_config.ip
340         # overrides on config.cores and config.mbuf_factor
341         if config.cores:
342             self.cores = config.cores
343         else:
344             self.cores = gen_config.get('cores', 1)
345         self.mbuf_factor = config.mbuf_factor
346         self.mbuf_64 = config.mbuf_64
347         self.hdrh = not config.disable_hdrh
348         if gen_config.intf_speed:
349             # interface speed is overriden from config
350             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
351         else:
352             # interface speed is discovered/provided by the traffic generator
353             self.intf_speed = 0
354         self.name = gen_config.name
355         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
356         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
357         self.limit_memory = gen_config.get('limit_memory', 1024)
358         self.software_mode = gen_config.get('software_mode', False)
359         self.interfaces = gen_config.interfaces
360         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
361             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
362         self.service_chain = config.service_chain
363         self.service_chain_count = config.service_chain_count
364         self.flow_count = config.flow_count
365         self.host_name = gen_config.host_name
366
367         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
368         self.ip_addrs = gen_config.ip_addrs
369         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
370         self.tg_gateway_ip_addrs_step = \
371             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
372         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
373         self.gateway_ips = gen_config.gateway_ip_addrs
374         self.udp_src_port = gen_config.udp_src_port
375         self.udp_dst_port = gen_config.udp_dst_port
376         self.vteps = gen_config.get('vteps')
377         self.devices = [Device(port, self) for port in [0, 1]]
378         # This should normally always be [0, 1]
379         self.ports = [device.port for device in self.devices]
380
381         # check that pci is not empty
382         if not gen_config.interfaces[0].get('pci', None) or \
383            not gen_config.interfaces[1].get('pci', None):
384             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
385
386         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
387         self.vlan_tagging = config.vlan_tagging
388
389         # needed for result/summarizer
390         config['tg-name'] = gen_config.name
391         config['tg-tool'] = self.tool
392
393     def to_json(self):
394         """Get json form to display the content into the overall result dict."""
395         return dict(self.gen_config)
396
397     def set_dest_macs(self, port_index, dest_macs):
398         """Set the list of dest MACs indexed by the chain id on given port.
399
400         port_index: the port for which dest macs must be set
401         dest_macs: a list of dest MACs indexed by chain id
402         """
403         if len(dest_macs) < self.config.service_chain_count:
404             raise TrafficClientException('Dest MAC list %s must have %d entries' %
405                                          (dest_macs, self.config.service_chain_count))
406         # only pass the first scc dest MACs
407         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
408         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
409
410     def set_vtep_dest_macs(self, port_index, dest_macs):
411         """Set the list of dest MACs indexed by the chain id on given port.
412
413         port_index: the port for which dest macs must be set
414         dest_macs: a list of dest MACs indexed by chain id
415         """
416         if len(dest_macs) != self.config.service_chain_count:
417             raise TrafficClientException('Dest MAC list %s must have %d entries' %
418                                          (dest_macs, self.config.service_chain_count))
419         self.devices[port_index].set_vtep_dst_mac(dest_macs)
420         LOG.info('Port %d: vtep dst MAC %s', port_index, {str(mac) for mac in dest_macs})
421
422     def get_dest_macs(self):
423         """Return the list of dest macs indexed by port."""
424         return [dev.get_dest_macs() for dev in self.devices]
425
426     def set_vlans(self, port_index, vlans):
427         """Set the list of vlans to use indexed by the chain id on given port.
428
429         port_index: the port for which VLANs must be set
430         vlans: a  list of vlan lists indexed by chain id
431         """
432         if len(vlans) != self.config.service_chain_count:
433             raise TrafficClientException('VLAN list %s must have %d entries' %
434                                          (vlans, self.config.service_chain_count))
435         self.devices[port_index].set_vlans(vlans)
436
437     def set_vxlans(self, port_index, vxlans):
438         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
439
440         port_index: the port for which VXLANs must be set
441         VXLANs: a  list of VNIs lists indexed by chain id
442         """
443         if len(vxlans) != self.config.service_chain_count:
444             raise TrafficClientException('VXLAN list %s must have %d entries' %
445                                          (vxlans, self.config.service_chain_count))
446         self.devices[port_index].set_vxlans(vxlans)
447
448     def set_vtep_vlan(self, port_index, vlan):
449         """Set the vtep vlan to use indexed by the chain id on given port.
450         port_index: the port for which VLAN must be set
451         """
452         self.devices[port_index].set_vtep_vlan(vlan)
453
454     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
455         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
456
457     @staticmethod
458     def __match_generator_profile(traffic_generator, generator_profile):
459         gen_config = AttrDict(traffic_generator)
460         gen_config.pop('default_profile')
461         gen_config.pop('generator_profile')
462         matching_profile = [profile for profile in traffic_generator.generator_profile if
463                             profile.name == generator_profile]
464         if len(matching_profile) != 1:
465             raise Exception('Traffic generator profile not found: ' + generator_profile)
466
467         gen_config.update(matching_profile[0])
468         return gen_config
469
470
471 class TrafficClient(object):
472     """Traffic generator client with NDR/PDR binary seearch."""
473
474     PORTS = [0, 1]
475
476     def __init__(self, config, notifier=None):
477         """Create a new TrafficClient instance.
478
479         config: nfvbench config
480         notifier: notifier (optional)
481
482         A new instance is created everytime the nfvbench config may have changed.
483         """
484         self.config = config
485         self.generator_config = GeneratorConfig(config)
486         self.tool = self.generator_config.tool
487         self.gen = self._get_generator()
488         self.notifier = notifier
489         self.interval_collector = None
490         self.iteration_collector = None
491         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
492                                     self.config.service_mode)
493         self.config.frame_sizes = self._get_frame_sizes()
494         self.run_config = {
495             'l2frame_size': None,
496             'duration_sec': self.config.duration_sec,
497             'bidirectional': True,
498             'rates': []  # to avoid unsbuscriptable-obj warning
499         }
500         self.current_total_rate = {'rate_percent': '10'}
501         if self.config.single_run:
502             self.current_total_rate = utils.parse_rate_str(self.config.rate)
503         self.ifstats = None
504         # Speed is either discovered when connecting to TG or set from config
505         # This variable is 0 if not yet discovered from TG or must be the speed of
506         # each interface in bits per second
507         self.intf_speed = self.generator_config.intf_speed
508
509     def _get_generator(self):
510         tool = self.tool.lower()
511         if tool == 'trex':
512             from .traffic_gen import trex_gen
513             return trex_gen.TRex(self)
514         if tool == 'dummy':
515             from .traffic_gen import dummy
516             return dummy.DummyTG(self)
517         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
518
519     def skip_sleep(self):
520         """Skip all sleeps when doing unit testing with dummy TG.
521
522         Must be overriden using mock.patch
523         """
524         return False
525
526     def _get_frame_sizes(self):
527         traffic_profile_name = self.config.traffic.profile
528         matching_profiles = [profile for profile in self.config.traffic_profile if
529                              profile.name == traffic_profile_name]
530         if len(matching_profiles) > 1:
531             raise TrafficClientException('Multiple traffic profiles with name: ' +
532                                          traffic_profile_name)
533         if not matching_profiles:
534             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
535         return matching_profiles[0].l2frame_size
536
537     def start_traffic_generator(self):
538         """Start the traffic generator process (traffic not started yet)."""
539         self.gen.connect()
540         # pick up the interface speed if it is not set from config
541         intf_speeds = self.gen.get_port_speed_gbps()
542         # convert Gbps unit into bps
543         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
544         if self.intf_speed:
545             # interface speed is overriden from config
546             if self.intf_speed != tg_if_speed:
547                 # Warn the user if the speed in the config is different
548                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
549                             intf_speeds[0])
550         else:
551             # interface speed not provisioned by config
552             self.intf_speed = tg_if_speed
553             # also update the speed in the tg config
554             self.generator_config.intf_speed = tg_if_speed
555
556         # Save the traffic generator local MAC
557         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
558             device.set_mac(mac)
559
560     def setup(self):
561         """Set up the traffic client."""
562         self.gen.clear_stats()
563
564     def get_version(self):
565         """Get the traffic generator version."""
566         return self.gen.get_version()
567
568     def ensure_end_to_end(self):
569         """Ensure traffic generator receives packets it has transmitted.
570
571         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
572
573         VMs that are started and in active state may not pass traffic yet. It is imperative to make
574         sure that all VMs are passing traffic in both directions before starting any benchmarking.
575         To verify this, we need to send at a low frequency bi-directional packets and make sure
576         that we receive all packets back from all VMs. The number of flows is equal to 2 times
577         the number of chains (1 per direction) and we need to make sure we receive packets coming
578         from exactly 2 x chain count different source MAC addresses.
579
580         Example:
581             PVP chain (1 VM per chain)
582             N = 10 (number of chains)
583             Flow count = 20 (number of flows)
584             If the number of unique source MAC addresses from received packets is 20 then
585             all 10 VMs 10 VMs are in operational state.
586         """
587         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
588         # send 2pps on each chain and each direction
589         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
590         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
591                                 e2e=True)
592         # ensures enough traffic is coming back
593         retry_count = int((self.config.check_traffic_time_sec +
594                            self.config.generic_poll_sec - 1) / self.config.generic_poll_sec)
595
596         # we expect to see packets coming from 2 unique MAC per chain
597         # because there can be flooding in the case of shared net
598         # we must verify that packets from the right VMs are received
599         # and not just count unique src MAC
600         # create a dict of (port, chain) tuples indexed by dest mac
601         mac_map = {}
602         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
603             for chain, mac in enumerate(dest_macs):
604                 mac_map[mac] = (port, chain)
605         unique_src_mac_count = len(mac_map)
606         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
607             get_mac_id = lambda packet: packet['binary'][60:66]
608         elif self.config.vxlan:
609             get_mac_id = lambda packet: packet['binary'][56:62]
610         else:
611             get_mac_id = lambda packet: packet['binary'][6:12]
612         for it in range(retry_count):
613             self.gen.clear_stats()
614             self.gen.start_traffic()
615             self.gen.start_capture()
616             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
617                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
618                      it + 1, retry_count)
619             if not self.skip_sleep():
620                 time.sleep(self.config.generic_poll_sec)
621             self.gen.stop_traffic()
622             self.gen.fetch_capture_packets()
623             self.gen.stop_capture()
624             for packet in self.gen.packet_list:
625                 mac_id = get_mac_id(packet).decode('latin-1')
626                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
627                 if src_mac in mac_map and self.is_udp(packet):
628                     port, chain = mac_map[src_mac]
629                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
630                              src_mac, chain, port)
631                     mac_map.pop(src_mac, None)
632
633                 if not mac_map:
634                     LOG.info('End-to-end connectivity established')
635                     return
636             if self.config.l3_router and not self.config.no_arp:
637                 # In case of L3 traffic mode, routers are not able to route traffic
638                 # until VM interfaces are up and ARP requests are done
639                 LOG.info('Waiting for loopback service completely started...')
640                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
641                 self.ensure_arp_successful()
642         raise TrafficClientException('End-to-end connectivity cannot be ensured')
643
644     def is_udp(self, packet):
645         pkt = Ether(packet['binary'])
646         return UDP in pkt
647
648     def ensure_arp_successful(self):
649         """Resolve all IP using ARP and throw an exception in case of failure."""
650         dest_macs = self.gen.resolve_arp()
651         if dest_macs:
652             # all dest macs are discovered, saved them into the generator config
653             if self.config.vxlan:
654                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
655                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
656             else:
657                 self.generator_config.set_dest_macs(0, dest_macs[0])
658                 self.generator_config.set_dest_macs(1, dest_macs[1])
659         else:
660             raise TrafficClientException('ARP cannot be resolved')
661
662     def set_traffic(self, frame_size, bidirectional):
663         """Reconfigure the traffic generator for a new frame size."""
664         self.run_config['bidirectional'] = bidirectional
665         self.run_config['l2frame_size'] = frame_size
666         self.run_config['rates'] = [self.get_per_direction_rate()]
667         if bidirectional:
668             self.run_config['rates'].append(self.get_per_direction_rate())
669         else:
670             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
671             if unidir_reverse_pps > 0:
672                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
673         # Fix for [NFVBENCH-67], convert the rate string to PPS
674         for idx, rate in enumerate(self.run_config['rates']):
675             if 'rate_pps' not in rate:
676                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
677
678         self.gen.clear_streamblock()
679         if self.config.no_latency_streams:
680             LOG.info("Latency streams are disabled")
681         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
682                                 latency=not self.config.no_latency_streams)
683
684     def _modify_load(self, load):
685         self.current_total_rate = {'rate_percent': str(load)}
686         rate_per_direction = self.get_per_direction_rate()
687
688         self.gen.modify_rate(rate_per_direction, False)
689         self.run_config['rates'][0] = rate_per_direction
690         if self.run_config['bidirectional']:
691             self.gen.modify_rate(rate_per_direction, True)
692             self.run_config['rates'][1] = rate_per_direction
693
694     def get_ndr_and_pdr(self):
695         """Start the NDR/PDR iteration and return the results."""
696         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
697         targets = {}
698         if self.config.ndr_run:
699             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
700             targets['ndr'] = self.config.measurement.NDR
701         if self.config.pdr_run:
702             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
703             targets['pdr'] = self.config.measurement.PDR
704
705         self.run_config['start_time'] = time.time()
706         self.interval_collector = IntervalCollector(self.run_config['start_time'])
707         self.interval_collector.attach_notifier(self.notifier)
708         self.iteration_collector = IterationCollector(self.run_config['start_time'])
709         results = {}
710         self.__range_search(0.0, 200.0, targets, results)
711
712         results['iteration_stats'] = {
713             'ndr_pdr': self.iteration_collector.get()
714         }
715
716         if self.config.ndr_run:
717             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
718             results['ndr']['time_taken_sec'] = \
719                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
720             if self.config.pdr_run:
721                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
722                 results['pdr']['time_taken_sec'] = \
723                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
724         else:
725             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
726             results['pdr']['time_taken_sec'] = \
727                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
728         return results
729
730     def __get_dropped_rate(self, result):
731         dropped_pkts = result['rx']['dropped_pkts']
732         total_pkts = result['tx']['total_pkts']
733         if not total_pkts:
734             return float('inf')
735         return float(dropped_pkts) / total_pkts * 100
736
737     def get_stats(self):
738         """Collect final stats for previous run."""
739         stats = self.gen.get_stats()
740         retDict = {'total_tx_rate': stats['total_tx_rate']}
741
742         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
743         rx_keys = tx_keys + ['dropped_pkts']
744
745         for port in self.PORTS:
746             port_stats = {'tx': {}, 'rx': {}}
747             for key in tx_keys:
748                 port_stats['tx'][key] = int(stats[port]['tx'][key])
749             for key in rx_keys:
750                 try:
751                     port_stats['rx'][key] = int(stats[port]['rx'][key])
752                 except ValueError:
753                     port_stats['rx'][key] = 0
754             port_stats['rx']['avg_delay_usec'] = cast_integer(
755                 stats[port]['rx']['avg_delay_usec'])
756             port_stats['rx']['min_delay_usec'] = cast_integer(
757                 stats[port]['rx']['min_delay_usec'])
758             port_stats['rx']['max_delay_usec'] = cast_integer(
759                 stats[port]['rx']['max_delay_usec'])
760             port_stats['drop_rate_percent'] = self.__get_dropped_rate(port_stats)
761             retDict[str(port)] = port_stats
762
763         ports = sorted(list(retDict.keys()), key=str)
764         if self.run_config['bidirectional']:
765             retDict['overall'] = {'tx': {}, 'rx': {}}
766             for key in tx_keys:
767                 retDict['overall']['tx'][key] = \
768                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
769             for key in rx_keys:
770                 retDict['overall']['rx'][key] = \
771                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
772             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
773                           retDict[ports[1]]['rx']['total_pkts']]
774             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
775                           retDict[ports[1]]['rx']['avg_delay_usec']]
776             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
777                           retDict[ports[1]]['rx']['max_delay_usec']]
778             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
779                           retDict[ports[1]]['rx']['min_delay_usec']]
780             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
781             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
782             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
783             for key in ['pkt_bit_rate', 'pkt_rate']:
784                 for dirc in ['tx', 'rx']:
785                     retDict['overall'][dirc][key] /= 2.0
786         else:
787             retDict['overall'] = retDict[ports[0]]
788         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
789         return retDict
790
791     def __convert_rates(self, rate):
792         return utils.convert_rates(self.run_config['l2frame_size'],
793                                    rate,
794                                    self.intf_speed)
795
796     def __ndr_pdr_found(self, tag, load):
797         rates = self.__convert_rates({'rate_percent': load})
798         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
799         last_stats = self.iteration_collector.peek()
800         self.interval_collector.add_ndr_pdr(tag, last_stats)
801
802     def __format_output_stats(self, stats):
803         for key in self.PORTS + ['overall']:
804             key = str(key)
805             interface = stats[key]
806             stats[key] = {
807                 'tx_pkts': interface['tx']['total_pkts'],
808                 'rx_pkts': interface['rx']['total_pkts'],
809                 'drop_percentage': interface['drop_rate_percent'],
810                 'drop_pct': interface['rx']['dropped_pkts'],
811                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
812                 'max_delay_usec': interface['rx']['max_delay_usec'],
813                 'min_delay_usec': interface['rx']['min_delay_usec'],
814             }
815
816         return stats
817
818     def __targets_found(self, rate, targets, results):
819         for tag, target in list(targets.items()):
820             LOG.info('Found %s (%s) load: %s', tag, target, rate)
821             self.__ndr_pdr_found(tag, rate)
822             results[tag]['timestamp_sec'] = time.time()
823
824     def __range_search(self, left, right, targets, results):
825         """Perform a binary search for a list of targets inside a [left..right] range or rate.
826
827         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
828                 indicating the rate to send on each interface
829         right   the right side of the range to search as a % of line rate
830                 indicating the rate to send on each interface
831         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
832                 ('ndr', 'pdr')
833         results a dict to store results
834         """
835         if not targets:
836             return
837         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
838
839         # Terminate search when gap is less than load epsilon
840         if right - left < self.config.measurement.load_epsilon:
841             self.__targets_found(left, targets, results)
842             return
843
844         # Obtain the average drop rate in for middle load
845         middle = (left + right) / 2.0
846         try:
847             stats, rates = self.__run_search_iteration(middle)
848         except STLError:
849             LOG.exception("Got exception from traffic generator during binary search")
850             self.__targets_found(left, targets, results)
851             return
852         # Split target dicts based on the avg drop rate
853         left_targets = {}
854         right_targets = {}
855         for tag, target in list(targets.items()):
856             if stats['overall']['drop_rate_percent'] <= target:
857                 # record the best possible rate found for this target
858                 results[tag] = rates
859                 results[tag].update({
860                     'load_percent_per_direction': middle,
861                     'stats': self.__format_output_stats(dict(stats)),
862                     'timestamp_sec': None
863                 })
864                 right_targets[tag] = target
865             else:
866                 # initialize to 0 all fields of result for
867                 # the worst case scenario of the binary search (if ndr/pdr is not found)
868                 if tag not in results:
869                     results[tag] = dict.fromkeys(rates, 0)
870                     empty_stats = self.__format_output_stats(dict(stats))
871                     for key in empty_stats:
872                         if isinstance(empty_stats[key], dict):
873                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
874                         else:
875                             empty_stats[key] = 0
876                     results[tag].update({
877                         'load_percent_per_direction': 0,
878                         'stats': empty_stats,
879                         'timestamp_sec': None
880                     })
881                 left_targets[tag] = target
882
883         # search lower half
884         self.__range_search(left, middle, left_targets, results)
885
886         # search upper half only if the upper rate does not exceed
887         # 100%, this only happens when the first search at 100%
888         # yields a DR that is < target DR
889         if middle >= 100:
890             self.__targets_found(100, right_targets, results)
891         else:
892             self.__range_search(middle, right, right_targets, results)
893
894     def __run_search_iteration(self, rate):
895         """Run one iteration at the given rate level.
896
897         rate: the rate to send on each port in percent (0 to 100)
898         """
899         self._modify_load(rate)
900
901         # poll interval stats and collect them
902         for stats in self.run_traffic():
903             self.interval_collector.add(stats)
904             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
905             if time_elapsed_ratio >= 1:
906                 self.cancel_traffic()
907                 if not self.skip_sleep():
908                     time.sleep(self.config.pause_sec)
909         self.interval_collector.reset()
910
911         # get stats from the run
912         stats = self.runner.client.get_stats()
913         current_traffic_config = self._get_traffic_config()
914         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
915                                         stats['total_tx_rate'])
916         if warning is not None:
917             stats['warning'] = warning
918
919         # save reliable stats from whole iteration
920         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
921         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
922         return stats, current_traffic_config['direction-total']
923
924     @staticmethod
925     def log_stats(stats):
926         """Log estimated stats during run."""
927         report = {
928             'datetime': str(datetime.now()),
929             'tx_packets': stats['overall']['tx']['total_pkts'],
930             'rx_packets': stats['overall']['rx']['total_pkts'],
931             'drop_packets': stats['overall']['rx']['dropped_pkts'],
932             'drop_rate_percent': stats['overall']['drop_rate_percent']
933         }
934         LOG.info('TX: %(tx_packets)d; '
935                  'RX: %(rx_packets)d; '
936                  'Est. Dropped: %(drop_packets)d; '
937                  'Est. Drop rate: %(drop_rate_percent).4f%%',
938                  report)
939
940     def run_traffic(self):
941         """Start traffic and return intermediate stats for each interval."""
942         stats = self.runner.run()
943         while self.runner.is_running:
944             self.log_stats(stats)
945             yield stats
946             stats = self.runner.poll_stats()
947             if stats is None:
948                 return
949         self.log_stats(stats)
950         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
951         yield stats
952
953     def cancel_traffic(self):
954         """Stop traffic."""
955         self.runner.stop()
956
957     def _get_traffic_config(self):
958         config = {}
959         load_total = 0.0
960         bps_total = 0.0
961         pps_total = 0.0
962         for idx, rate in enumerate(self.run_config['rates']):
963             key = 'direction-forward' if idx == 0 else 'direction-reverse'
964             config[key] = {
965                 'l2frame_size': self.run_config['l2frame_size'],
966                 'duration_sec': self.run_config['duration_sec']
967             }
968             config[key].update(rate)
969             config[key].update(self.__convert_rates(rate))
970             load_total += float(config[key]['rate_percent'])
971             bps_total += float(config[key]['rate_bps'])
972             pps_total += float(config[key]['rate_pps'])
973         config['direction-total'] = dict(config['direction-forward'])
974         config['direction-total'].update({
975             'rate_percent': load_total,
976             'rate_pps': cast_integer(pps_total),
977             'rate_bps': bps_total
978         })
979
980         return config
981
982     def get_run_config(self, results):
983         """Return configuration which was used for the last run."""
984         r = {}
985         # because we want each direction to have the far end RX rates,
986         # use the far end index (1-idx) to retrieve the RX rates
987         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
988             tx_rate = results["stats"][str(idx)]["tx"]["total_pkts"] / self.config.duration_sec
989             rx_rate = results["stats"][str(1 - idx)]["rx"]["total_pkts"] / self.config.duration_sec
990             r[key] = {
991                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
992                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
993                 "rx": self.__convert_rates({'rate_pps': rx_rate})
994             }
995
996         total = {}
997         for direction in ['orig', 'tx', 'rx']:
998             total[direction] = {}
999             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
1000                 total[direction][unit] = sum([float(x[direction][unit]) for x in list(r.values())])
1001
1002         r['direction-total'] = total
1003         return r
1004
1005     def insert_interface_stats(self, pps_list):
1006         """Insert interface stats to a list of packet path stats.
1007
1008         pps_list: a list of packet path stats instances indexed by chain index
1009
1010         This function will insert the packet path stats for the traffic gen ports 0 and 1
1011         with itemized per chain tx/rx counters.
1012         There will be as many packet path stats as chains.
1013         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1014         self.pps_list:
1015         [
1016         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1017         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1018         ...
1019         ]
1020         """
1021         def get_if_stats(chain_idx):
1022             return [InterfaceStats('p' + str(port), self.tool)
1023                     for port in range(2)]
1024         # keep the list of list of interface stats indexed by the chain id
1025         self.ifstats = [get_if_stats(chain_idx)
1026                         for chain_idx in range(self.config.service_chain_count)]
1027         # note that we need to make a copy of the ifs list so that any modification in the
1028         # list from pps will not change the list saved in self.ifstats
1029         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1030         # insert the corresponding pps in the passed list
1031         pps_list.extend(self.pps_list)
1032
1033     def update_interface_stats(self, diff=False):
1034         """Update all interface stats.
1035
1036         diff: if False, simply refresh the interface stats values with latest values
1037               if True, diff the interface stats with the latest values
1038         Make sure that the interface stats inserted in insert_interface_stats() are updated
1039         with proper values.
1040         self.ifstats:
1041         [
1042         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1043         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1044         ...
1045         ]
1046         """
1047         if diff:
1048             stats = self.gen.get_stats()
1049             for chain_idx, ifs in enumerate(self.ifstats):
1050                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1051                 # corresponding to the
1052                 # port 0 and port 1 for the given chain_idx
1053                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1054                 # interface stats for the pps because it could have been modified to contain
1055                 # additional interface stats
1056                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1057             # special handling for vxlan
1058             # in case of vxlan, flow stats are not available so all rx counters will be
1059             # zeros when the total rx port counter is non zero.
1060             # in that case,
1061             for port in range(2):
1062                 total_rx = 0
1063                 for ifs in self.ifstats:
1064                     total_rx += ifs[port].rx
1065                 if total_rx == 0:
1066                     # check if the total port rx from Trex is also zero
1067                     port_rx = stats[port]['rx']['total_pkts']
1068                     if port_rx:
1069                         # the total rx for all chains from port level stats is non zero
1070                         # which means that the per-chain stats are not available
1071                         if len(self.ifstats) == 1:
1072                             # only one chain, simply report the port level rx to the chain rx stats
1073                             self.ifstats[0][port].rx = port_rx
1074                         else:
1075                             for ifs in self.ifstats:
1076                                 # mark this data as unavailable
1077                                 ifs[port].rx = None
1078                             # pitch in the total rx only in the last chain pps
1079                             self.ifstats[-1][port].rx_total = port_rx
1080
1081     @staticmethod
1082     def compare_tx_rates(required, actual):
1083         """Compare the actual TX rate to the required TX rate."""
1084         threshold = 0.9
1085         are_different = False
1086         try:
1087             if float(actual) / required < threshold:
1088                 are_different = True
1089         except ZeroDivisionError:
1090             are_different = True
1091
1092         if are_different:
1093             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1094                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1095                   "to achieve the requested TX rate.".format(r=required, a=actual)
1096             LOG.info(msg)
1097             return msg
1098
1099         return None
1100
1101     def get_per_direction_rate(self):
1102         """Get the rate for each direction."""
1103         divisor = 2 if self.run_config['bidirectional'] else 1
1104         if 'rate_percent' in self.current_total_rate:
1105             # don't split rate if it's percentage
1106             divisor = 1
1107
1108         return utils.divide_rate(self.current_total_rate, divisor)
1109
1110     def close(self):
1111         """Close this instance."""
1112         try:
1113             self.gen.stop_traffic()
1114         except Exception:
1115             pass
1116         self.gen.clear_stats()
1117         self.gen.cleanup()