VxLAN per-chain counter support
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import STLError
27 # pylint: enable=import-error
28
29 from log import LOG
30 from packet_stats import InterfaceStats
31 from packet_stats import PacketPathStats
32 from stats_collector import IntervalCollector
33 from stats_collector import IterationCollector
34 import traffic_gen.traffic_utils as utils
35 from utils import cast_integer
36
37
38 class TrafficClientException(Exception):
39     """Generic traffic client exception."""
40
41     pass
42
43
44 class TrafficRunner(object):
45     """Serialize various steps required to run traffic."""
46
47     def __init__(self, client, duration_sec, interval_sec=0):
48         """Create a traffic runner."""
49         self.client = client
50         self.start_time = None
51         self.duration_sec = duration_sec
52         self.interval_sec = interval_sec
53
54     def run(self):
55         """Clear stats and instruct the traffic generator to start generating traffic."""
56         if self.is_running():
57             return None
58         LOG.info('Running traffic generator')
59         self.client.gen.clear_stats()
60         self.client.gen.start_traffic()
61         self.start_time = time.time()
62         return self.poll_stats()
63
64     def stop(self):
65         """Stop the current run and instruct the traffic generator to stop traffic."""
66         if self.is_running():
67             self.start_time = None
68             self.client.gen.stop_traffic()
69
70     def is_running(self):
71         """Check if a run is still pending."""
72         return self.start_time is not None
73
74     def time_elapsed(self):
75         """Return time elapsed since start of run."""
76         if self.is_running():
77             return time.time() - self.start_time
78         return self.duration_sec
79
80     def poll_stats(self):
81         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
82
83         return: latest stats or None if traffic is stopped
84         """
85         if not self.is_running():
86             return None
87         if self.client.skip_sleep():
88             self.stop()
89             return self.client.get_stats()
90         time_elapsed = self.time_elapsed()
91         if time_elapsed > self.duration_sec:
92             self.stop()
93             return None
94         time_left = self.duration_sec - time_elapsed
95         if self.interval_sec > 0.0:
96             if time_left <= self.interval_sec:
97                 time.sleep(time_left)
98                 self.stop()
99             else:
100                 time.sleep(self.interval_sec)
101         else:
102             time.sleep(self.duration_sec)
103             self.stop()
104         return self.client.get_stats()
105
106
107 class IpBlock(object):
108     """Manage a block of IP addresses."""
109
110     def __init__(self, base_ip, step_ip, count_ip):
111         """Create an IP block."""
112         self.base_ip_int = Device.ip_to_int(base_ip)
113         self.step = Device.ip_to_int(step_ip)
114         self.max_available = count_ip
115         self.next_free = 0
116
117     def get_ip(self, index=0):
118         """Return the IP address at given index."""
119         if index < 0 or index >= self.max_available:
120             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
121         return Device.int_to_ip(self.base_ip_int + index * self.step)
122
123     def reserve_ip_range(self, count):
124         """Reserve a range of count consecutive IP addresses spaced by step."""
125         if self.next_free + count > self.max_available:
126             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
127                              (self.next_free,
128                               self.max_available,
129                               count))
130         first_ip = self.get_ip(self.next_free)
131         last_ip = self.get_ip(self.next_free + count - 1)
132         self.next_free += count
133         return (first_ip, last_ip)
134
135     def reset_reservation(self):
136         """Reset all reservations and restart with a completely unused IP block."""
137         self.next_free = 0
138
139
140 class Device(object):
141     """Represent a port device and all information associated to it.
142
143     In the curent version we only support 2 port devices for the traffic generator
144     identified as port 0 or port 1.
145     """
146
147     def __init__(self, port, generator_config):
148         """Create a new device for a given port."""
149         self.generator_config = generator_config
150         self.chain_count = generator_config.service_chain_count
151         self.flow_count = generator_config.flow_count / 2
152         self.port = port
153         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
154         self.vtep_vlan = None
155         self.vtep_src_mac = None
156         self.vxlan = False
157         self.pci = generator_config.interfaces[port].pci
158         self.mac = None
159         self.dest_macs = None
160         self.vtep_dst_mac = None
161         self.vtep_dst_ip = None
162         if generator_config.vteps is None:
163             self.vtep_src_ip = None
164         else:
165             self.vtep_src_ip = generator_config.vteps[port]
166         self.vnis = None
167         self.vlans = None
168         self.ip_addrs = generator_config.ip_addrs[port]
169         subnet = IPNetwork(self.ip_addrs)
170         self.ip = subnet.ip.format()
171         self.ip_addrs_step = generator_config.ip_addrs_step
172         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
173         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
174                                    generator_config.gateway_ip_addrs_step,
175                                    self.chain_count)
176         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
177         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
178                                       generator_config.tg_gateway_ip_addrs_step,
179                                       self.chain_count)
180         self.udp_src_port = generator_config.udp_src_port
181         self.udp_dst_port = generator_config.udp_dst_port
182
183     def set_mac(self, mac):
184         """Set the local MAC for this port device."""
185         if mac is None:
186             raise TrafficClientException('Trying to set traffic generator MAC address as None')
187         self.mac = mac
188
189     def get_peer_device(self):
190         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
191         return self.generator_config.devices[1 - self.port]
192
193     def set_vtep_dst_mac(self, dest_macs):
194         """Set the list of dest MACs indexed by the chain id.
195
196         This is only called in 2 cases:
197         - VM macs discovered using openstack API
198         - dest MACs provisioned in config file
199         """
200         self.vtep_dst_mac = map(str, dest_macs)
201
202     def set_dest_macs(self, dest_macs):
203         """Set the list of dest MACs indexed by the chain id.
204
205         This is only called in 2 cases:
206         - VM macs discovered using openstack API
207         - dest MACs provisioned in config file
208         """
209         self.dest_macs = map(str, dest_macs)
210
211     def get_dest_macs(self):
212         """Get the list of dest macs for this device.
213
214         If set_dest_macs was never called, assumes l2-loopback and return
215         a list of peer mac (as many as chains but normally only 1 chain)
216         """
217         if self.dest_macs:
218             return self.dest_macs
219         # assume this is l2-loopback
220         return [self.get_peer_device().mac] * self.chain_count
221
222     def set_vlans(self, vlans):
223         """Set the list of vlans to use indexed by the chain id."""
224         self.vlans = vlans
225         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
226
227     def set_vtep_vlan(self, vlan):
228         """Set the vtep vlan to use indexed by specific port."""
229         self.vtep_vlan = vlan
230         self.vxlan = True
231         self.vlan_tagging = None
232         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
233
234     def set_vxlan_endpoints(self, src_ip, dst_ip):
235         self.vtep_dst_ip = dst_ip
236         self.vtep_src_ip = src_ip
237         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
238                  self.vtep_src_ip, self.vtep_dst_ip)
239
240     def set_vxlans(self, vnis):
241         self.vnis = vnis
242         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
243
244     def get_gw_ip(self, chain_index):
245         """Retrieve the IP address assigned for the gateway of a given chain."""
246         return self.gw_ip_block.get_ip(chain_index)
247
248     def get_stream_configs(self):
249         """Get the stream config for a given chain on this device.
250
251         Called by the traffic generator driver to program the traffic generator properly
252         before generating traffic
253         """
254         configs = []
255         # exact flow count for each chain is calculated as follows:
256         # - all chains except the first will have the same flow count
257         #   calculated as (total_flows + chain_count - 1) / chain_count
258         # - the first chain will have the remainder
259         # example 11 flows and 3 chains => 3, 4, 4
260         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
261         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
262         peer = self.get_peer_device()
263         self.ip_block.reset_reservation()
264         peer.ip_block.reset_reservation()
265         dest_macs = self.get_dest_macs()
266
267         for chain_idx in xrange(self.chain_count):
268             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
269             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
270
271             configs.append({
272                 'count': cur_chain_flow_count,
273                 'mac_src': self.mac,
274                 'mac_dst': dest_macs[chain_idx],
275                 'ip_src_addr': src_ip_first,
276                 'ip_src_addr_max': src_ip_last,
277                 'ip_src_count': cur_chain_flow_count,
278                 'ip_dst_addr': dst_ip_first,
279                 'ip_dst_addr_max': dst_ip_last,
280                 'ip_dst_count': cur_chain_flow_count,
281                 'ip_addrs_step': self.ip_addrs_step,
282                 'udp_src_port': self.udp_src_port,
283                 'udp_dst_port': self.udp_dst_port,
284                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
285                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
286                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
287                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
288                 'vxlan': self.vxlan,
289                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
290                 'vtep_src_mac': self.mac if self.vxlan is True else None,
291                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
292                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
293                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
294                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
295             })
296             # after first chain, fall back to the flow count for all other chains
297             cur_chain_flow_count = flows_per_chain
298         return configs
299
300     @staticmethod
301     def ip_to_int(addr):
302         """Convert an IP address from string to numeric."""
303         return struct.unpack("!I", socket.inet_aton(addr))[0]
304
305     @staticmethod
306     def int_to_ip(nvalue):
307         """Convert an IP address from numeric to string."""
308         return socket.inet_ntoa(struct.pack("!I", nvalue))
309
310
311 class GeneratorConfig(object):
312     """Represents traffic configuration for currently running traffic profile."""
313
314     DEFAULT_IP_STEP = '0.0.0.1'
315     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
316
317     def __init__(self, config):
318         """Create a generator config."""
319         self.config = config
320         # name of the generator profile (normally trex or dummy)
321         # pick the default one if not specified explicitly from cli options
322         if not config.generator_profile:
323             config.generator_profile = config.traffic_generator.default_profile
324         # pick up the profile dict based on the name
325         gen_config = self.__match_generator_profile(config.traffic_generator,
326                                                     config.generator_profile)
327         self.gen_config = gen_config
328         # copy over fields from the dict
329         self.tool = gen_config.tool
330         self.ip = gen_config.ip
331         # overrides on config.cores and config.mbuf_factor
332         if config.cores:
333             self.cores = config.cores
334         else:
335             self.cores = gen_config.get('cores', 1)
336         self.mbuf_factor = config.mbuf_factor
337         if gen_config.intf_speed:
338             # interface speed is overriden from config
339             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
340         else:
341             # interface speed is discovered/provided by the traffic generator
342             self.intf_speed = 0
343         self.name = gen_config.name
344         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
345         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
346         self.limit_memory = gen_config.get('limit_memory', 1024)
347         self.software_mode = gen_config.get('software_mode', False)
348         self.interfaces = gen_config.interfaces
349         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
350             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
351         if hasattr(gen_config, 'platform'):
352             self.platform = gen_config.platform
353         self.service_chain = config.service_chain
354         self.service_chain_count = config.service_chain_count
355         self.flow_count = config.flow_count
356         self.host_name = gen_config.host_name
357
358         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
359         self.ip_addrs = gen_config.ip_addrs
360         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
361         self.tg_gateway_ip_addrs_step = \
362             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
363         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
364         self.gateway_ips = gen_config.gateway_ip_addrs
365         self.udp_src_port = gen_config.udp_src_port
366         self.udp_dst_port = gen_config.udp_dst_port
367         self.vteps = gen_config.get('vteps')
368         self.devices = [Device(port, self) for port in [0, 1]]
369         # This should normally always be [0, 1]
370         self.ports = [device.port for device in self.devices]
371
372         # check that pci is not empty
373         if not gen_config.interfaces[0].get('pci', None) or \
374            not gen_config.interfaces[1].get('pci', None):
375             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
376
377         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
378         self.vlan_tagging = config.vlan_tagging
379
380         # needed for result/summarizer
381         config['tg-name'] = gen_config.name
382         config['tg-tool'] = self.tool
383
384     def to_json(self):
385         """Get json form to display the content into the overall result dict."""
386         return dict(self.gen_config)
387
388     def set_dest_macs(self, port_index, dest_macs):
389         """Set the list of dest MACs indexed by the chain id on given port.
390
391         port_index: the port for which dest macs must be set
392         dest_macs: a list of dest MACs indexed by chain id
393         """
394         if len(dest_macs) < self.config.service_chain_count:
395             raise TrafficClientException('Dest MAC list %s must have %d entries' %
396                                          (dest_macs, self.config.service_chain_count))
397         # only pass the first scc dest MACs
398         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
399         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
400
401     def set_vtep_dest_macs(self, port_index, dest_macs):
402         """Set the list of dest MACs indexed by the chain id on given port.
403
404         port_index: the port for which dest macs must be set
405         dest_macs: a list of dest MACs indexed by chain id
406         """
407         if len(dest_macs) != self.config.service_chain_count:
408             raise TrafficClientException('Dest MAC list %s must have %d entries' %
409                                          (dest_macs, self.config.service_chain_count))
410         self.devices[port_index].set_vtep_dst_mac(dest_macs)
411         LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
412
413     def get_dest_macs(self):
414         """Return the list of dest macs indexed by port."""
415         return [dev.get_dest_macs() for dev in self.devices]
416
417     def set_vlans(self, port_index, vlans):
418         """Set the list of vlans to use indexed by the chain id on given port.
419
420         port_index: the port for which VLANs must be set
421         vlans: a  list of vlan lists indexed by chain id
422         """
423         if len(vlans) != self.config.service_chain_count:
424             raise TrafficClientException('VLAN list %s must have %d entries' %
425                                          (vlans, self.config.service_chain_count))
426         self.devices[port_index].set_vlans(vlans)
427
428     def set_vxlans(self, port_index, vxlans):
429         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
430
431         port_index: the port for which VXLANs must be set
432         VXLANs: a  list of VNIs lists indexed by chain id
433         """
434         if len(vxlans) != self.config.service_chain_count:
435             raise TrafficClientException('VXLAN list %s must have %d entries' %
436                                          (vxlans, self.config.service_chain_count))
437         self.devices[port_index].set_vxlans(vxlans)
438
439     def set_vtep_vlan(self, port_index, vlan):
440         """Set the vtep vlan to use indexed by the chain id on given port.
441         port_index: the port for which VLAN must be set
442         """
443         self.devices[port_index].set_vtep_vlan(vlan)
444
445     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
446         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
447
448     @staticmethod
449     def __match_generator_profile(traffic_generator, generator_profile):
450         gen_config = AttrDict(traffic_generator)
451         gen_config.pop('default_profile')
452         gen_config.pop('generator_profile')
453         matching_profile = [profile for profile in traffic_generator.generator_profile if
454                             profile.name == generator_profile]
455         if len(matching_profile) != 1:
456             raise Exception('Traffic generator profile not found: ' + generator_profile)
457
458         gen_config.update(matching_profile[0])
459         return gen_config
460
461
462 class TrafficClient(object):
463     """Traffic generator client with NDR/PDR binary seearch."""
464
465     PORTS = [0, 1]
466
467     def __init__(self, config, notifier=None):
468         """Create a new TrafficClient instance.
469
470         config: nfvbench config
471         notifier: notifier (optional)
472
473         A new instance is created everytime the nfvbench config may have changed.
474         """
475         self.config = config
476         self.generator_config = GeneratorConfig(config)
477         self.tool = self.generator_config.tool
478         self.gen = self._get_generator()
479         self.notifier = notifier
480         self.interval_collector = None
481         self.iteration_collector = None
482         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
483         self.config.frame_sizes = self._get_frame_sizes()
484         self.run_config = {
485             'l2frame_size': None,
486             'duration_sec': self.config.duration_sec,
487             'bidirectional': True,
488             'rates': []  # to avoid unsbuscriptable-obj warning
489         }
490         self.current_total_rate = {'rate_percent': '10'}
491         if self.config.single_run:
492             self.current_total_rate = utils.parse_rate_str(self.config.rate)
493         self.ifstats = None
494         # Speed is either discovered when connecting to TG or set from config
495         # This variable is 0 if not yet discovered from TG or must be the speed of
496         # each interface in bits per second
497         self.intf_speed = self.generator_config.intf_speed
498
499     def _get_generator(self):
500         tool = self.tool.lower()
501         if tool == 'trex':
502             from traffic_gen import trex_gen
503             return trex_gen.TRex(self)
504         if tool == 'dummy':
505             from traffic_gen import dummy
506             return dummy.DummyTG(self)
507         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
508
509     def skip_sleep(self):
510         """Skip all sleeps when doing unit testing with dummy TG.
511
512         Must be overriden using mock.patch
513         """
514         return False
515
516     def _get_frame_sizes(self):
517         traffic_profile_name = self.config.traffic.profile
518         matching_profiles = [profile for profile in self.config.traffic_profile if
519                              profile.name == traffic_profile_name]
520         if len(matching_profiles) > 1:
521             raise TrafficClientException('Multiple traffic profiles with name: ' +
522                                          traffic_profile_name)
523         elif not matching_profiles:
524             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
525         return matching_profiles[0].l2frame_size
526
527     def start_traffic_generator(self):
528         """Start the traffic generator process (traffic not started yet)."""
529         self.gen.connect()
530         # pick up the interface speed if it is not set from config
531         intf_speeds = self.gen.get_port_speed_gbps()
532         # convert Gbps unit into bps
533         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
534         if self.intf_speed:
535             # interface speed is overriden from config
536             if self.intf_speed != tg_if_speed:
537                 # Warn the user if the speed in the config is different
538                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
539                             intf_speeds[0])
540         else:
541             # interface speed not provisioned by config
542             self.intf_speed = tg_if_speed
543             # also update the speed in the tg config
544             self.generator_config.intf_speed = tg_if_speed
545
546         # Save the traffic generator local MAC
547         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
548             device.set_mac(mac)
549
550     def setup(self):
551         """Set up the traffic client."""
552         self.gen.clear_stats()
553
554     def get_version(self):
555         """Get the traffic generator version."""
556         return self.gen.get_version()
557
558     def ensure_end_to_end(self):
559         """Ensure traffic generator receives packets it has transmitted.
560
561         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
562
563         VMs that are started and in active state may not pass traffic yet. It is imperative to make
564         sure that all VMs are passing traffic in both directions before starting any benchmarking.
565         To verify this, we need to send at a low frequency bi-directional packets and make sure
566         that we receive all packets back from all VMs. The number of flows is equal to 2 times
567         the number of chains (1 per direction) and we need to make sure we receive packets coming
568         from exactly 2 x chain count different source MAC addresses.
569
570         Example:
571             PVP chain (1 VM per chain)
572             N = 10 (number of chains)
573             Flow count = 20 (number of flows)
574             If the number of unique source MAC addresses from received packets is 20 then
575             all 10 VMs 10 VMs are in operational state.
576         """
577         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
578         # send 2pps on each chain and each direction
579         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
580         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
581                                 e2e=True)
582         # ensures enough traffic is coming back
583         retry_count = (self.config.check_traffic_time_sec +
584                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
585
586         # we expect to see packets coming from 2 unique MAC per chain
587         # because there can be flooding in the case of shared net
588         # we must verify that packets from the right VMs are received
589         # and not just count unique src MAC
590         # create a dict of (port, chain) tuples indexed by dest mac
591         mac_map = {}
592         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
593             for chain, mac in enumerate(dest_macs):
594                 mac_map[mac] = (port, chain)
595         unique_src_mac_count = len(mac_map)
596         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
597             get_mac_id = lambda packet: packet['binary'][60:66]
598         elif self.config.vxlan:
599             get_mac_id = lambda packet: packet['binary'][56:62]
600         else:
601             get_mac_id = lambda packet: packet['binary'][6:12]
602         for it in xrange(retry_count):
603             self.gen.clear_stats()
604             self.gen.start_traffic()
605             self.gen.start_capture()
606             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
607                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
608                      it + 1, retry_count)
609             if not self.skip_sleep():
610                 time.sleep(self.config.generic_poll_sec)
611             self.gen.stop_traffic()
612             self.gen.fetch_capture_packets()
613             self.gen.stop_capture()
614
615             for packet in self.gen.packet_list:
616                 mac_id = get_mac_id(packet)
617                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
618                 if src_mac in mac_map:
619                     port, chain = mac_map[src_mac]
620                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
621                              src_mac, chain, port)
622                     mac_map.pop(src_mac, None)
623
624                 if not mac_map:
625                     LOG.info('End-to-end connectivity established')
626                     return
627
628         raise TrafficClientException('End-to-end connectivity cannot be ensured')
629
630     def ensure_arp_successful(self):
631         """Resolve all IP using ARP and throw an exception in case of failure."""
632         dest_macs = self.gen.resolve_arp()
633         if dest_macs:
634             # all dest macs are discovered, saved them into the generator config
635             if self.config.vxlan:
636                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
637                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
638             else:
639                 self.generator_config.set_dest_macs(0, dest_macs[0])
640                 self.generator_config.set_dest_macs(1, dest_macs[1])
641         else:
642             raise TrafficClientException('ARP cannot be resolved')
643
644     def set_traffic(self, frame_size, bidirectional):
645         """Reconfigure the traffic generator for a new frame size."""
646         self.run_config['bidirectional'] = bidirectional
647         self.run_config['l2frame_size'] = frame_size
648         self.run_config['rates'] = [self.get_per_direction_rate()]
649         if bidirectional:
650             self.run_config['rates'].append(self.get_per_direction_rate())
651         else:
652             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
653             if unidir_reverse_pps > 0:
654                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
655         # Fix for [NFVBENCH-67], convert the rate string to PPS
656         for idx, rate in enumerate(self.run_config['rates']):
657             if 'rate_pps' not in rate:
658                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
659
660         self.gen.clear_streamblock()
661         if not self.config.vxlan:
662             self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
663                                     latency=True)
664         else:
665             self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
666                                     latency=False)
667
668     def _modify_load(self, load):
669         self.current_total_rate = {'rate_percent': str(load)}
670         rate_per_direction = self.get_per_direction_rate()
671
672         self.gen.modify_rate(rate_per_direction, False)
673         self.run_config['rates'][0] = rate_per_direction
674         if self.run_config['bidirectional']:
675             self.gen.modify_rate(rate_per_direction, True)
676             self.run_config['rates'][1] = rate_per_direction
677
678     def get_ndr_and_pdr(self):
679         """Start the NDR/PDR iteration and return the results."""
680         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
681         targets = {}
682         if self.config.ndr_run:
683             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
684             targets['ndr'] = self.config.measurement.NDR
685         if self.config.pdr_run:
686             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
687             targets['pdr'] = self.config.measurement.PDR
688
689         self.run_config['start_time'] = time.time()
690         self.interval_collector = IntervalCollector(self.run_config['start_time'])
691         self.interval_collector.attach_notifier(self.notifier)
692         self.iteration_collector = IterationCollector(self.run_config['start_time'])
693         results = {}
694         self.__range_search(0.0, 200.0, targets, results)
695
696         results['iteration_stats'] = {
697             'ndr_pdr': self.iteration_collector.get()
698         }
699
700         if self.config.ndr_run:
701             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
702             results['ndr']['time_taken_sec'] = \
703                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
704             if self.config.pdr_run:
705                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
706                 results['pdr']['time_taken_sec'] = \
707                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
708         else:
709             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
710             results['pdr']['time_taken_sec'] = \
711                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
712         return results
713
714     def __get_dropped_rate(self, result):
715         dropped_pkts = result['rx']['dropped_pkts']
716         total_pkts = result['tx']['total_pkts']
717         if not total_pkts:
718             return float('inf')
719         return float(dropped_pkts) / total_pkts * 100
720
721     def get_stats(self):
722         """Collect final stats for previous run."""
723         stats = self.gen.get_stats()
724         retDict = {'total_tx_rate': stats['total_tx_rate']}
725         for port in self.PORTS:
726             retDict[port] = {'tx': {}, 'rx': {}}
727
728         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
729         rx_keys = tx_keys + ['dropped_pkts']
730
731         for port in self.PORTS:
732             for key in tx_keys:
733                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
734             for key in rx_keys:
735                 try:
736                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
737                 except ValueError:
738                     retDict[port]['rx'][key] = 0
739             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
740                 stats[port]['rx']['avg_delay_usec'])
741             retDict[port]['rx']['min_delay_usec'] = cast_integer(
742                 stats[port]['rx']['min_delay_usec'])
743             retDict[port]['rx']['max_delay_usec'] = cast_integer(
744                 stats[port]['rx']['max_delay_usec'])
745             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
746
747         ports = sorted(retDict.keys())
748         if self.run_config['bidirectional']:
749             retDict['overall'] = {'tx': {}, 'rx': {}}
750             for key in tx_keys:
751                 retDict['overall']['tx'][key] = \
752                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
753             for key in rx_keys:
754                 retDict['overall']['rx'][key] = \
755                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
756             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
757                           retDict[ports[1]]['rx']['total_pkts']]
758             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
759                           retDict[ports[1]]['rx']['avg_delay_usec']]
760             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
761                           retDict[ports[1]]['rx']['max_delay_usec']]
762             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
763                           retDict[ports[1]]['rx']['min_delay_usec']]
764             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
765             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
766             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
767             for key in ['pkt_bit_rate', 'pkt_rate']:
768                 for dirc in ['tx', 'rx']:
769                     retDict['overall'][dirc][key] /= 2.0
770         else:
771             retDict['overall'] = retDict[ports[0]]
772         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
773         return retDict
774
775     def __convert_rates(self, rate):
776         return utils.convert_rates(self.run_config['l2frame_size'],
777                                    rate,
778                                    self.intf_speed)
779
780     def __ndr_pdr_found(self, tag, load):
781         rates = self.__convert_rates({'rate_percent': load})
782         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
783         last_stats = self.iteration_collector.peek()
784         self.interval_collector.add_ndr_pdr(tag, last_stats)
785
786     def __format_output_stats(self, stats):
787         for key in self.PORTS + ['overall']:
788             interface = stats[key]
789             stats[key] = {
790                 'tx_pkts': interface['tx']['total_pkts'],
791                 'rx_pkts': interface['rx']['total_pkts'],
792                 'drop_percentage': interface['drop_rate_percent'],
793                 'drop_pct': interface['rx']['dropped_pkts'],
794                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
795                 'max_delay_usec': interface['rx']['max_delay_usec'],
796                 'min_delay_usec': interface['rx']['min_delay_usec'],
797             }
798
799         return stats
800
801     def __targets_found(self, rate, targets, results):
802         for tag, target in targets.iteritems():
803             LOG.info('Found %s (%s) load: %s', tag, target, rate)
804             self.__ndr_pdr_found(tag, rate)
805             results[tag]['timestamp_sec'] = time.time()
806
807     def __range_search(self, left, right, targets, results):
808         """Perform a binary search for a list of targets inside a [left..right] range or rate.
809
810         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
811                 indicating the rate to send on each interface
812         right   the right side of the range to search as a % of line rate
813                 indicating the rate to send on each interface
814         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
815                 ('ndr', 'pdr')
816         results a dict to store results
817         """
818         if not targets:
819             return
820         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
821
822         # Terminate search when gap is less than load epsilon
823         if right - left < self.config.measurement.load_epsilon:
824             self.__targets_found(left, targets, results)
825             return
826
827         # Obtain the average drop rate in for middle load
828         middle = (left + right) / 2.0
829         try:
830             stats, rates = self.__run_search_iteration(middle)
831         except STLError:
832             LOG.exception("Got exception from traffic generator during binary search")
833             self.__targets_found(left, targets, results)
834             return
835         # Split target dicts based on the avg drop rate
836         left_targets = {}
837         right_targets = {}
838         for tag, target in targets.iteritems():
839             if stats['overall']['drop_rate_percent'] <= target:
840                 # record the best possible rate found for this target
841                 results[tag] = rates
842                 results[tag].update({
843                     'load_percent_per_direction': middle,
844                     'stats': self.__format_output_stats(dict(stats)),
845                     'timestamp_sec': None
846                 })
847                 right_targets[tag] = target
848             else:
849                 # initialize to 0 all fields of result for
850                 # the worst case scenario of the binary search (if ndr/pdr is not found)
851                 if tag not in results:
852                     results[tag] = dict.fromkeys(rates, 0)
853                     empty_stats = self.__format_output_stats(dict(stats))
854                     for key in empty_stats:
855                         if isinstance(empty_stats[key], dict):
856                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
857                         else:
858                             empty_stats[key] = 0
859                     results[tag].update({
860                         'load_percent_per_direction': 0,
861                         'stats': empty_stats,
862                         'timestamp_sec': None
863                     })
864                 left_targets[tag] = target
865
866         # search lower half
867         self.__range_search(left, middle, left_targets, results)
868
869         # search upper half only if the upper rate does not exceed
870         # 100%, this only happens when the first search at 100%
871         # yields a DR that is < target DR
872         if middle >= 100:
873             self.__targets_found(100, right_targets, results)
874         else:
875             self.__range_search(middle, right, right_targets, results)
876
877     def __run_search_iteration(self, rate):
878         """Run one iteration at the given rate level.
879
880         rate: the rate to send on each port in percent (0 to 100)
881         """
882         self._modify_load(rate)
883
884         # poll interval stats and collect them
885         for stats in self.run_traffic():
886             self.interval_collector.add(stats)
887             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
888             if time_elapsed_ratio >= 1:
889                 self.cancel_traffic()
890                 if not self.skip_sleep():
891                     time.sleep(self.config.pause_sec)
892         self.interval_collector.reset()
893
894         # get stats from the run
895         stats = self.runner.client.get_stats()
896         current_traffic_config = self._get_traffic_config()
897         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
898                                         stats['total_tx_rate'])
899         if warning is not None:
900             stats['warning'] = warning
901
902         # save reliable stats from whole iteration
903         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
904         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
905         return stats, current_traffic_config['direction-total']
906
907     @staticmethod
908     def log_stats(stats):
909         """Log estimated stats during run."""
910         report = {
911             'datetime': str(datetime.now()),
912             'tx_packets': stats['overall']['tx']['total_pkts'],
913             'rx_packets': stats['overall']['rx']['total_pkts'],
914             'drop_packets': stats['overall']['rx']['dropped_pkts'],
915             'drop_rate_percent': stats['overall']['drop_rate_percent']
916         }
917         LOG.info('TX: %(tx_packets)d; '
918                  'RX: %(rx_packets)d; '
919                  'Est. Dropped: %(drop_packets)d; '
920                  'Est. Drop rate: %(drop_rate_percent).4f%%',
921                  report)
922
923     def run_traffic(self):
924         """Start traffic and return intermediate stats for each interval."""
925         stats = self.runner.run()
926         while self.runner.is_running:
927             self.log_stats(stats)
928             yield stats
929             stats = self.runner.poll_stats()
930             if stats is None:
931                 return
932         self.log_stats(stats)
933         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
934         yield stats
935
936     def cancel_traffic(self):
937         """Stop traffic."""
938         self.runner.stop()
939
940     def _get_traffic_config(self):
941         config = {}
942         load_total = 0.0
943         bps_total = 0.0
944         pps_total = 0.0
945         for idx, rate in enumerate(self.run_config['rates']):
946             key = 'direction-forward' if idx == 0 else 'direction-reverse'
947             config[key] = {
948                 'l2frame_size': self.run_config['l2frame_size'],
949                 'duration_sec': self.run_config['duration_sec']
950             }
951             config[key].update(rate)
952             config[key].update(self.__convert_rates(rate))
953             load_total += float(config[key]['rate_percent'])
954             bps_total += float(config[key]['rate_bps'])
955             pps_total += float(config[key]['rate_pps'])
956         config['direction-total'] = dict(config['direction-forward'])
957         config['direction-total'].update({
958             'rate_percent': load_total,
959             'rate_pps': cast_integer(pps_total),
960             'rate_bps': bps_total
961         })
962
963         return config
964
965     def get_run_config(self, results):
966         """Return configuration which was used for the last run."""
967         r = {}
968         # because we want each direction to have the far end RX rates,
969         # use the far end index (1-idx) to retrieve the RX rates
970         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
971             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
972             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
973             r[key] = {
974                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
975                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
976                 "rx": self.__convert_rates({'rate_pps': rx_rate})
977             }
978
979         total = {}
980         for direction in ['orig', 'tx', 'rx']:
981             total[direction] = {}
982             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
983                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
984
985         r['direction-total'] = total
986         return r
987
988     def insert_interface_stats(self, pps_list):
989         """Insert interface stats to a list of packet path stats.
990
991         pps_list: a list of packet path stats instances indexed by chain index
992
993         This function will insert the packet path stats for the traffic gen ports 0 and 1
994         with itemized per chain tx/rx counters.
995         There will be as many packet path stats as chains.
996         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
997         self.pps_list:
998         [
999         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1000         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1001         ...
1002         ]
1003         """
1004         def get_if_stats(chain_idx):
1005             return [InterfaceStats('p' + str(port), self.tool)
1006                     for port in range(2)]
1007         # keep the list of list of interface stats indexed by the chain id
1008         self.ifstats = [get_if_stats(chain_idx)
1009                         for chain_idx in range(self.config.service_chain_count)]
1010         # note that we need to make a copy of the ifs list so that any modification in the
1011         # list from pps will not change the list saved in self.ifstats
1012         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1013         # insert the corresponding pps in the passed list
1014         pps_list.extend(self.pps_list)
1015
1016     def update_interface_stats(self, diff=False):
1017         """Update all interface stats.
1018
1019         diff: if False, simply refresh the interface stats values with latest values
1020               if True, diff the interface stats with the latest values
1021         Make sure that the interface stats inserted in insert_interface_stats() are updated
1022         with proper values.
1023         self.ifstats:
1024         [
1025         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1026         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1027         ...
1028         ]
1029         """
1030         if diff:
1031             stats = self.gen.get_stats()
1032             for chain_idx, ifs in enumerate(self.ifstats):
1033                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1034                 # corresponding to the
1035                 # port 0 and port 1 for the given chain_idx
1036                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1037                 # interface stats for the pps because it could have been modified to contain
1038                 # additional interface stats
1039                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1040             # special handling for vxlan
1041             # in case of vxlan, flow stats are not available so all rx counters will be
1042             # zeros when the total rx port counter is non zero.
1043             # in that case,
1044             for port in range(2):
1045                 total_rx = 0
1046                 for ifs in self.ifstats:
1047                     total_rx += ifs[port].rx
1048                 if total_rx == 0:
1049                     # check if the total port rx from Trex is also zero
1050                     port_rx = stats[port]['rx']['total_pkts']
1051                     if port_rx:
1052                         # the total rx for all chains from port level stats is non zero
1053                         # which means that the per-chain stats are not available
1054                         if len(self.ifstats) == 1:
1055                             # only one chain, simply report the port level rx to the chain rx stats
1056                             self.ifstats[0][port].rx = port_rx
1057                         else:
1058                             for ifs in self.ifstats:
1059                                 # mark this data as unavailable
1060                                 ifs[port].rx = None
1061                             # pitch in the total rx only in the last chain pps
1062                             self.ifstats[-1][port].rx_total = port_rx
1063
1064     @staticmethod
1065     def compare_tx_rates(required, actual):
1066         """Compare the actual TX rate to the required TX rate."""
1067         threshold = 0.9
1068         are_different = False
1069         try:
1070             if float(actual) / required < threshold:
1071                 are_different = True
1072         except ZeroDivisionError:
1073             are_different = True
1074
1075         if are_different:
1076             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1077                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1078                   "to achieve the requested TX rate.".format(r=required, a=actual)
1079             LOG.info(msg)
1080             return msg
1081
1082         return None
1083
1084     def get_per_direction_rate(self):
1085         """Get the rate for each direction."""
1086         divisor = 2 if self.run_config['bidirectional'] else 1
1087         if 'rate_percent' in self.current_total_rate:
1088             # don't split rate if it's percentage
1089             divisor = 1
1090
1091         return utils.divide_rate(self.current_total_rate, divisor)
1092
1093     def close(self):
1094         """Close this instance."""
1095         try:
1096             self.gen.stop_traffic()
1097         except Exception:
1098             pass
1099         self.gen.clear_stats()
1100         self.gen.cleanup()