58ae345f1709f2497199abea632d7f848b7410bf
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex_stl_lib.api import STLError
27 # pylint: enable=import-error
28
29 from log import LOG
30 from packet_stats import InterfaceStats
31 from packet_stats import PacketPathStats
32 from stats_collector import IntervalCollector
33 from stats_collector import IterationCollector
34 import traffic_gen.traffic_utils as utils
35 from utils import cast_integer
36
37
38 class TrafficClientException(Exception):
39     """Generic traffic client exception."""
40
41     pass
42
43
44 class TrafficRunner(object):
45     """Serialize various steps required to run traffic."""
46
47     def __init__(self, client, duration_sec, interval_sec=0):
48         """Create a traffic runner."""
49         self.client = client
50         self.start_time = None
51         self.duration_sec = duration_sec
52         self.interval_sec = interval_sec
53
54     def run(self):
55         """Clear stats and instruct the traffic generator to start generating traffic."""
56         if self.is_running():
57             return None
58         LOG.info('Running traffic generator')
59         self.client.gen.clear_stats()
60         self.client.gen.start_traffic()
61         self.start_time = time.time()
62         return self.poll_stats()
63
64     def stop(self):
65         """Stop the current run and instruct the traffic generator to stop traffic."""
66         if self.is_running():
67             self.start_time = None
68             self.client.gen.stop_traffic()
69
70     def is_running(self):
71         """Check if a run is still pending."""
72         return self.start_time is not None
73
74     def time_elapsed(self):
75         """Return time elapsed since start of run."""
76         if self.is_running():
77             return time.time() - self.start_time
78         return self.duration_sec
79
80     def poll_stats(self):
81         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
82
83         return: latest stats or None if traffic is stopped
84         """
85         if not self.is_running():
86             return None
87         if self.client.skip_sleep():
88             self.stop()
89             return self.client.get_stats()
90         time_elapsed = self.time_elapsed()
91         if time_elapsed > self.duration_sec:
92             self.stop()
93             return None
94         time_left = self.duration_sec - time_elapsed
95         if self.interval_sec > 0.0:
96             if time_left <= self.interval_sec:
97                 time.sleep(time_left)
98                 self.stop()
99             else:
100                 time.sleep(self.interval_sec)
101         else:
102             time.sleep(self.duration_sec)
103             self.stop()
104         return self.client.get_stats()
105
106
107 class IpBlock(object):
108     """Manage a block of IP addresses."""
109
110     def __init__(self, base_ip, step_ip, count_ip):
111         """Create an IP block."""
112         self.base_ip_int = Device.ip_to_int(base_ip)
113         self.step = Device.ip_to_int(step_ip)
114         self.max_available = count_ip
115         self.next_free = 0
116
117     def get_ip(self, index=0):
118         """Return the IP address at given index."""
119         if index < 0 or index >= self.max_available:
120             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
121         return Device.int_to_ip(self.base_ip_int + index * self.step)
122
123     def reserve_ip_range(self, count):
124         """Reserve a range of count consecutive IP addresses spaced by step."""
125         if self.next_free + count > self.max_available:
126             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
127                              (self.next_free,
128                               self.max_available,
129                               count))
130         first_ip = self.get_ip(self.next_free)
131         last_ip = self.get_ip(self.next_free + count - 1)
132         self.next_free += count
133         return (first_ip, last_ip)
134
135     def reset_reservation(self):
136         """Reset all reservations and restart with a completely unused IP block."""
137         self.next_free = 0
138
139
140 class Device(object):
141     """Represent a port device and all information associated to it.
142
143     In the curent version we only support 2 port devices for the traffic generator
144     identified as port 0 or port 1.
145     """
146
147     def __init__(self, port, generator_config):
148         """Create a new device for a given port."""
149         self.generator_config = generator_config
150         self.chain_count = generator_config.service_chain_count
151         self.flow_count = generator_config.flow_count / 2
152         self.port = port
153         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
154         self.vtep_vlan = None
155         self.vtep_src_mac = None
156         self.vxlan = False
157         self.pci = generator_config.interfaces[port].pci
158         self.mac = None
159         self.dest_macs = None
160         self.vtep_dst_mac = None
161         self.vtep_dst_ip = None
162         if generator_config.vteps is None:
163             self.vtep_src_ip = None
164         else:
165             self.vtep_src_ip = generator_config.vteps[port]
166         self.vnis = None
167         self.vlans = None
168         self.ip_addrs = generator_config.ip_addrs[port]
169         subnet = IPNetwork(self.ip_addrs)
170         self.ip = subnet.ip.format()
171         self.ip_addrs_step = generator_config.ip_addrs_step
172         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
173         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
174                                    generator_config.gateway_ip_addrs_step,
175                                    self.chain_count)
176         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
177         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
178                                       generator_config.tg_gateway_ip_addrs_step,
179                                       self.chain_count)
180         self.udp_src_port = generator_config.udp_src_port
181         self.udp_dst_port = generator_config.udp_dst_port
182
183     def set_mac(self, mac):
184         """Set the local MAC for this port device."""
185         if mac is None:
186             raise TrafficClientException('Trying to set traffic generator MAC address as None')
187         self.mac = mac
188
189     def get_peer_device(self):
190         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
191         return self.generator_config.devices[1 - self.port]
192
193     def set_vtep_dst_mac(self, dest_macs):
194         """Set the list of dest MACs indexed by the chain id.
195
196         This is only called in 2 cases:
197         - VM macs discovered using openstack API
198         - dest MACs provisioned in config file
199         """
200         self.vtep_dst_mac = map(str, dest_macs)
201
202     def set_dest_macs(self, dest_macs):
203         """Set the list of dest MACs indexed by the chain id.
204
205         This is only called in 2 cases:
206         - VM macs discovered using openstack API
207         - dest MACs provisioned in config file
208         """
209         self.dest_macs = map(str, dest_macs)
210
211     def get_dest_macs(self):
212         """Get the list of dest macs for this device.
213
214         If set_dest_macs was never called, assumes l2-loopback and return
215         a list of peer mac (as many as chains but normally only 1 chain)
216         """
217         if self.dest_macs:
218             return self.dest_macs
219         # assume this is l2-loopback
220         return [self.get_peer_device().mac] * self.chain_count
221
222     def set_vlans(self, vlans):
223         """Set the list of vlans to use indexed by the chain id."""
224         self.vlans = vlans
225         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
226
227     def set_vtep_vlan(self, vlan):
228         """Set the vtep vlan to use indexed by specific port."""
229         self.vtep_vlan = vlan
230         self.vxlan = True
231         self.vlan_tagging = None
232         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
233
234     def set_vxlan_endpoints(self, src_ip, dst_ip):
235         self.vtep_dst_ip = dst_ip
236         self.vtep_src_ip = src_ip
237         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
238                  self.vtep_src_ip, self.vtep_dst_ip)
239
240     def set_vxlans(self, vnis):
241         self.vnis = vnis
242         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
243
244     def get_gw_ip(self, chain_index):
245         """Retrieve the IP address assigned for the gateway of a given chain."""
246         return self.gw_ip_block.get_ip(chain_index)
247
248     def get_stream_configs(self):
249         """Get the stream config for a given chain on this device.
250
251         Called by the traffic generator driver to program the traffic generator properly
252         before generating traffic
253         """
254         configs = []
255         # exact flow count for each chain is calculated as follows:
256         # - all chains except the first will have the same flow count
257         #   calculated as (total_flows + chain_count - 1) / chain_count
258         # - the first chain will have the remainder
259         # example 11 flows and 3 chains => 3, 4, 4
260         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
261         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
262         peer = self.get_peer_device()
263         self.ip_block.reset_reservation()
264         peer.ip_block.reset_reservation()
265         dest_macs = self.get_dest_macs()
266
267         for chain_idx in xrange(self.chain_count):
268             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
269             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
270
271             configs.append({
272                 'count': cur_chain_flow_count,
273                 'mac_src': self.mac,
274                 'mac_dst': dest_macs[chain_idx],
275                 'ip_src_addr': src_ip_first,
276                 'ip_src_addr_max': src_ip_last,
277                 'ip_src_count': cur_chain_flow_count,
278                 'ip_dst_addr': dst_ip_first,
279                 'ip_dst_addr_max': dst_ip_last,
280                 'ip_dst_count': cur_chain_flow_count,
281                 'ip_addrs_step': self.ip_addrs_step,
282                 'udp_src_port': self.udp_src_port,
283                 'udp_dst_port': self.udp_dst_port,
284                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
285                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
286                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
287                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
288                 'vxlan': self.vxlan,
289                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
290                 'vtep_src_mac': self.mac if self.vxlan is True else None,
291                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
292                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
293                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
294                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
295             })
296             # after first chain, fall back to the flow count for all other chains
297             cur_chain_flow_count = flows_per_chain
298         return configs
299
300     @staticmethod
301     def ip_to_int(addr):
302         """Convert an IP address from string to numeric."""
303         return struct.unpack("!I", socket.inet_aton(addr))[0]
304
305     @staticmethod
306     def int_to_ip(nvalue):
307         """Convert an IP address from numeric to string."""
308         return socket.inet_ntoa(struct.pack("!I", nvalue))
309
310
311 class GeneratorConfig(object):
312     """Represents traffic configuration for currently running traffic profile."""
313
314     DEFAULT_IP_STEP = '0.0.0.1'
315     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
316
317     def __init__(self, config):
318         """Create a generator config."""
319         self.config = config
320         # name of the generator profile (normally trex or dummy)
321         # pick the default one if not specified explicitly from cli options
322         if not config.generator_profile:
323             config.generator_profile = config.traffic_generator.default_profile
324         # pick up the profile dict based on the name
325         gen_config = self.__match_generator_profile(config.traffic_generator,
326                                                     config.generator_profile)
327         self.gen_config = gen_config
328         # copy over fields from the dict
329         self.tool = gen_config.tool
330         self.ip = gen_config.ip
331         # overrides on config.cores and config.mbuf_factor
332         if config.cores:
333             self.cores = config.cores
334         else:
335             self.cores = gen_config.get('cores', 1)
336         self.mbuf_factor = config.mbuf_factor
337         if gen_config.intf_speed:
338             # interface speed is overriden from config
339             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
340         else:
341             # interface speed is discovered/provided by the traffic generator
342             self.intf_speed = 0
343         self.name = gen_config.name
344         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
345         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
346         self.limit_memory = gen_config.get('limit_memory', 1024)
347         self.software_mode = gen_config.get('software_mode', False)
348         self.interfaces = gen_config.interfaces
349         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
350             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
351         if hasattr(gen_config, 'platform'):
352             self.platform = gen_config.platform
353         self.service_chain = config.service_chain
354         self.service_chain_count = config.service_chain_count
355         self.flow_count = config.flow_count
356         self.host_name = gen_config.host_name
357
358         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
359         self.ip_addrs = gen_config.ip_addrs
360         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
361         self.tg_gateway_ip_addrs_step = \
362             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
363         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
364         self.gateway_ips = gen_config.gateway_ip_addrs
365         self.udp_src_port = gen_config.udp_src_port
366         self.udp_dst_port = gen_config.udp_dst_port
367         self.vteps = gen_config.get('vteps')
368         self.devices = [Device(port, self) for port in [0, 1]]
369         # This should normally always be [0, 1]
370         self.ports = [device.port for device in self.devices]
371
372         # check that pci is not empty
373         if not gen_config.interfaces[0].get('pci', None) or \
374            not gen_config.interfaces[1].get('pci', None):
375             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
376
377         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
378         self.vlan_tagging = config.vlan_tagging
379
380         # needed for result/summarizer
381         config['tg-name'] = gen_config.name
382         config['tg-tool'] = self.tool
383
384     def to_json(self):
385         """Get json form to display the content into the overall result dict."""
386         return dict(self.gen_config)
387
388     def set_dest_macs(self, port_index, dest_macs):
389         """Set the list of dest MACs indexed by the chain id on given port.
390
391         port_index: the port for which dest macs must be set
392         dest_macs: a list of dest MACs indexed by chain id
393         """
394         if len(dest_macs) < self.config.service_chain_count:
395             raise TrafficClientException('Dest MAC list %s must have %d entries' %
396                                          (dest_macs, self.config.service_chain_count))
397         # only pass the first scc dest MACs
398         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
399         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
400
401     def set_vtep_dest_macs(self, port_index, dest_macs):
402         """Set the list of dest MACs indexed by the chain id on given port.
403
404         port_index: the port for which dest macs must be set
405         dest_macs: a list of dest MACs indexed by chain id
406         """
407         if len(dest_macs) != self.config.service_chain_count:
408             raise TrafficClientException('Dest MAC list %s must have %d entries' %
409                                          (dest_macs, self.config.service_chain_count))
410         self.devices[port_index].set_vtep_dst_mac(dest_macs)
411         LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
412
413     def get_dest_macs(self):
414         """Return the list of dest macs indexed by port."""
415         return [dev.get_dest_macs() for dev in self.devices]
416
417     def set_vlans(self, port_index, vlans):
418         """Set the list of vlans to use indexed by the chain id on given port.
419
420         port_index: the port for which VLANs must be set
421         vlans: a  list of vlan lists indexed by chain id
422         """
423         if len(vlans) != self.config.service_chain_count:
424             raise TrafficClientException('VLAN list %s must have %d entries' %
425                                          (vlans, self.config.service_chain_count))
426         self.devices[port_index].set_vlans(vlans)
427
428     def set_vxlans(self, port_index, vxlans):
429         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
430
431         port_index: the port for which VXLANs must be set
432         VXLANs: a  list of VNIs lists indexed by chain id
433         """
434         if len(vxlans) != self.config.service_chain_count:
435             raise TrafficClientException('VXLAN list %s must have %d entries' %
436                                          (vxlans, self.config.service_chain_count))
437         self.devices[port_index].set_vxlans(vxlans)
438
439     def set_vtep_vlan(self, port_index, vlan):
440         """Set the vtep vlan to use indexed by the chain id on given port.
441         port_index: the port for which VLAN must be set
442         """
443         self.devices[port_index].set_vtep_vlan(vlan)
444
445     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
446         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
447
448     @staticmethod
449     def __match_generator_profile(traffic_generator, generator_profile):
450         gen_config = AttrDict(traffic_generator)
451         gen_config.pop('default_profile')
452         gen_config.pop('generator_profile')
453         matching_profile = [profile for profile in traffic_generator.generator_profile if
454                             profile.name == generator_profile]
455         if len(matching_profile) != 1:
456             raise Exception('Traffic generator profile not found: ' + generator_profile)
457
458         gen_config.update(matching_profile[0])
459         return gen_config
460
461
462 class TrafficClient(object):
463     """Traffic generator client with NDR/PDR binary seearch."""
464
465     PORTS = [0, 1]
466
467     def __init__(self, config, notifier=None):
468         """Create a new TrafficClient instance.
469
470         config: nfvbench config
471         notifier: notifier (optional)
472
473         A new instance is created everytime the nfvbench config may have changed.
474         """
475         self.config = config
476         self.generator_config = GeneratorConfig(config)
477         self.tool = self.generator_config.tool
478         self.gen = self._get_generator()
479         self.notifier = notifier
480         self.interval_collector = None
481         self.iteration_collector = None
482         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
483         self.config.frame_sizes = self._get_frame_sizes()
484         self.run_config = {
485             'l2frame_size': None,
486             'duration_sec': self.config.duration_sec,
487             'bidirectional': True,
488             'rates': []  # to avoid unsbuscriptable-obj warning
489         }
490         self.current_total_rate = {'rate_percent': '10'}
491         if self.config.single_run:
492             self.current_total_rate = utils.parse_rate_str(self.config.rate)
493         self.ifstats = None
494         # Speed is either discovered when connecting to TG or set from config
495         # This variable is 0 if not yet discovered from TG or must be the speed of
496         # each interface in bits per second
497         self.intf_speed = self.generator_config.intf_speed
498
499     def _get_generator(self):
500         tool = self.tool.lower()
501         if tool == 'trex':
502             from traffic_gen import trex
503             return trex.TRex(self)
504         if tool == 'dummy':
505             from traffic_gen import dummy
506             return dummy.DummyTG(self)
507         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
508
509     def skip_sleep(self):
510         """Skip all sleeps when doing unit testing with dummy TG.
511
512         Must be overriden using mock.patch
513         """
514         return False
515
516     def _get_frame_sizes(self):
517         traffic_profile_name = self.config.traffic.profile
518         matching_profiles = [profile for profile in self.config.traffic_profile if
519                              profile.name == traffic_profile_name]
520         if len(matching_profiles) > 1:
521             raise TrafficClientException('Multiple traffic profiles with name: ' +
522                                          traffic_profile_name)
523         elif not matching_profiles:
524             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
525         return matching_profiles[0].l2frame_size
526
527     def start_traffic_generator(self):
528         """Start the traffic generator process (traffic not started yet)."""
529         self.gen.connect()
530         # pick up the interface speed if it is not set from config
531         intf_speeds = self.gen.get_port_speed_gbps()
532         # convert Gbps unit into bps
533         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
534         if self.intf_speed:
535             # interface speed is overriden from config
536             if self.intf_speed != tg_if_speed:
537                 # Warn the user if the speed in the config is different
538                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
539                             intf_speeds[0])
540         else:
541             # interface speed not provisioned by config
542             self.intf_speed = tg_if_speed
543             # also update the speed in the tg config
544             self.generator_config.intf_speed = tg_if_speed
545
546         # Save the traffic generator local MAC
547         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
548             device.set_mac(mac)
549
550     def setup(self):
551         """Set up the traffic client."""
552         self.gen.clear_stats()
553
554     def get_version(self):
555         """Get the traffic generator version."""
556         return self.gen.get_version()
557
558     def ensure_end_to_end(self):
559         """Ensure traffic generator receives packets it has transmitted.
560
561         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
562
563         VMs that are started and in active state may not pass traffic yet. It is imperative to make
564         sure that all VMs are passing traffic in both directions before starting any benchmarking.
565         To verify this, we need to send at a low frequency bi-directional packets and make sure
566         that we receive all packets back from all VMs. The number of flows is equal to 2 times
567         the number of chains (1 per direction) and we need to make sure we receive packets coming
568         from exactly 2 x chain count different source MAC addresses.
569
570         Example:
571             PVP chain (1 VM per chain)
572             N = 10 (number of chains)
573             Flow count = 20 (number of flows)
574             If the number of unique source MAC addresses from received packets is 20 then
575             all 10 VMs 10 VMs are in operational state.
576         """
577         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
578         # send 2pps on each chain and each direction
579         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
580         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
581
582         # ensures enough traffic is coming back
583         retry_count = (self.config.check_traffic_time_sec +
584                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
585
586         # we expect to see packets coming from 2 unique MAC per chain
587         # because there can be flooding in the case of shared net
588         # we must verify that packets from the right VMs are received
589         # and not just count unique src MAC
590         # create a dict of (port, chain) tuples indexed by dest mac
591         mac_map = {}
592         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
593             for chain, mac in enumerate(dest_macs):
594                 mac_map[mac] = (port, chain)
595         unique_src_mac_count = len(mac_map)
596         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
597             get_mac_id = lambda packet: packet['binary'][60:66]
598         elif self.config.vxlan:
599             get_mac_id = lambda packet: packet['binary'][56:62]
600         else:
601             get_mac_id = lambda packet: packet['binary'][6:12]
602         for it in xrange(retry_count):
603             self.gen.clear_stats()
604             self.gen.start_traffic()
605             self.gen.start_capture()
606             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
607                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
608                      it + 1, retry_count)
609             if not self.skip_sleep():
610                 time.sleep(self.config.generic_poll_sec)
611             self.gen.stop_traffic()
612             self.gen.fetch_capture_packets()
613             self.gen.stop_capture()
614
615             for packet in self.gen.packet_list:
616                 mac_id = get_mac_id(packet)
617                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
618                 if src_mac in mac_map:
619                     port, chain = mac_map[src_mac]
620                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
621                              src_mac, chain, port)
622                     mac_map.pop(src_mac, None)
623
624                 if not mac_map:
625                     LOG.info('End-to-end connectivity established')
626                     return
627
628         raise TrafficClientException('End-to-end connectivity cannot be ensured')
629
630     def ensure_arp_successful(self):
631         """Resolve all IP using ARP and throw an exception in case of failure."""
632         dest_macs = self.gen.resolve_arp()
633         if dest_macs:
634             # all dest macs are discovered, saved them into the generator config
635             if self.config.vxlan:
636                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
637                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
638             else:
639                 self.generator_config.set_dest_macs(0, dest_macs[0])
640                 self.generator_config.set_dest_macs(1, dest_macs[1])
641         else:
642             raise TrafficClientException('ARP cannot be resolved')
643
644     def set_traffic(self, frame_size, bidirectional):
645         """Reconfigure the traffic generator for a new frame size."""
646         self.run_config['bidirectional'] = bidirectional
647         self.run_config['l2frame_size'] = frame_size
648         self.run_config['rates'] = [self.get_per_direction_rate()]
649         if bidirectional:
650             self.run_config['rates'].append(self.get_per_direction_rate())
651         else:
652             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
653             if unidir_reverse_pps > 0:
654                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
655         # Fix for [NFVBENCH-67], convert the rate string to PPS
656         for idx, rate in enumerate(self.run_config['rates']):
657             if 'rate_pps' not in rate:
658                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
659
660         self.gen.clear_streamblock()
661         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
662
663     def _modify_load(self, load):
664         self.current_total_rate = {'rate_percent': str(load)}
665         rate_per_direction = self.get_per_direction_rate()
666
667         self.gen.modify_rate(rate_per_direction, False)
668         self.run_config['rates'][0] = rate_per_direction
669         if self.run_config['bidirectional']:
670             self.gen.modify_rate(rate_per_direction, True)
671             self.run_config['rates'][1] = rate_per_direction
672
673     def get_ndr_and_pdr(self):
674         """Start the NDR/PDR iteration and return the results."""
675         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
676         targets = {}
677         if self.config.ndr_run:
678             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
679             targets['ndr'] = self.config.measurement.NDR
680         if self.config.pdr_run:
681             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
682             targets['pdr'] = self.config.measurement.PDR
683
684         self.run_config['start_time'] = time.time()
685         self.interval_collector = IntervalCollector(self.run_config['start_time'])
686         self.interval_collector.attach_notifier(self.notifier)
687         self.iteration_collector = IterationCollector(self.run_config['start_time'])
688         results = {}
689         self.__range_search(0.0, 200.0, targets, results)
690
691         results['iteration_stats'] = {
692             'ndr_pdr': self.iteration_collector.get()
693         }
694
695         if self.config.ndr_run:
696             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
697             results['ndr']['time_taken_sec'] = \
698                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
699             if self.config.pdr_run:
700                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
701                 results['pdr']['time_taken_sec'] = \
702                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
703         else:
704             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
705             results['pdr']['time_taken_sec'] = \
706                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
707         return results
708
709     def __get_dropped_rate(self, result):
710         dropped_pkts = result['rx']['dropped_pkts']
711         total_pkts = result['tx']['total_pkts']
712         if not total_pkts:
713             return float('inf')
714         return float(dropped_pkts) / total_pkts * 100
715
716     def get_stats(self):
717         """Collect final stats for previous run."""
718         stats = self.gen.get_stats()
719         retDict = {'total_tx_rate': stats['total_tx_rate']}
720         for port in self.PORTS:
721             retDict[port] = {'tx': {}, 'rx': {}}
722
723         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
724         rx_keys = tx_keys + ['dropped_pkts']
725
726         for port in self.PORTS:
727             for key in tx_keys:
728                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
729             for key in rx_keys:
730                 try:
731                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
732                 except ValueError:
733                     retDict[port]['rx'][key] = 0
734             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
735                 stats[port]['rx']['avg_delay_usec'])
736             retDict[port]['rx']['min_delay_usec'] = cast_integer(
737                 stats[port]['rx']['min_delay_usec'])
738             retDict[port]['rx']['max_delay_usec'] = cast_integer(
739                 stats[port]['rx']['max_delay_usec'])
740             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
741
742         ports = sorted(retDict.keys())
743         if self.run_config['bidirectional']:
744             retDict['overall'] = {'tx': {}, 'rx': {}}
745             for key in tx_keys:
746                 retDict['overall']['tx'][key] = \
747                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
748             for key in rx_keys:
749                 retDict['overall']['rx'][key] = \
750                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
751             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
752                           retDict[ports[1]]['rx']['total_pkts']]
753             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
754                           retDict[ports[1]]['rx']['avg_delay_usec']]
755             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
756                           retDict[ports[1]]['rx']['max_delay_usec']]
757             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
758                           retDict[ports[1]]['rx']['min_delay_usec']]
759             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
760             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
761             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
762             for key in ['pkt_bit_rate', 'pkt_rate']:
763                 for dirc in ['tx', 'rx']:
764                     retDict['overall'][dirc][key] /= 2.0
765         else:
766             retDict['overall'] = retDict[ports[0]]
767         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
768         return retDict
769
770     def __convert_rates(self, rate):
771         return utils.convert_rates(self.run_config['l2frame_size'],
772                                    rate,
773                                    self.intf_speed)
774
775     def __ndr_pdr_found(self, tag, load):
776         rates = self.__convert_rates({'rate_percent': load})
777         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
778         last_stats = self.iteration_collector.peek()
779         self.interval_collector.add_ndr_pdr(tag, last_stats)
780
781     def __format_output_stats(self, stats):
782         for key in self.PORTS + ['overall']:
783             interface = stats[key]
784             stats[key] = {
785                 'tx_pkts': interface['tx']['total_pkts'],
786                 'rx_pkts': interface['rx']['total_pkts'],
787                 'drop_percentage': interface['drop_rate_percent'],
788                 'drop_pct': interface['rx']['dropped_pkts'],
789                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
790                 'max_delay_usec': interface['rx']['max_delay_usec'],
791                 'min_delay_usec': interface['rx']['min_delay_usec'],
792             }
793
794         return stats
795
796     def __targets_found(self, rate, targets, results):
797         for tag, target in targets.iteritems():
798             LOG.info('Found %s (%s) load: %s', tag, target, rate)
799             self.__ndr_pdr_found(tag, rate)
800             results[tag]['timestamp_sec'] = time.time()
801
802     def __range_search(self, left, right, targets, results):
803         """Perform a binary search for a list of targets inside a [left..right] range or rate.
804
805         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
806                 indicating the rate to send on each interface
807         right   the right side of the range to search as a % of line rate
808                 indicating the rate to send on each interface
809         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
810                 ('ndr', 'pdr')
811         results a dict to store results
812         """
813         if not targets:
814             return
815         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
816
817         # Terminate search when gap is less than load epsilon
818         if right - left < self.config.measurement.load_epsilon:
819             self.__targets_found(left, targets, results)
820             return
821
822         # Obtain the average drop rate in for middle load
823         middle = (left + right) / 2.0
824         try:
825             stats, rates = self.__run_search_iteration(middle)
826         except STLError:
827             LOG.exception("Got exception from traffic generator during binary search")
828             self.__targets_found(left, targets, results)
829             return
830         # Split target dicts based on the avg drop rate
831         left_targets = {}
832         right_targets = {}
833         for tag, target in targets.iteritems():
834             if stats['overall']['drop_rate_percent'] <= target:
835                 # record the best possible rate found for this target
836                 results[tag] = rates
837                 results[tag].update({
838                     'load_percent_per_direction': middle,
839                     'stats': self.__format_output_stats(dict(stats)),
840                     'timestamp_sec': None
841                 })
842                 right_targets[tag] = target
843             else:
844                 # initialize to 0 all fields of result for
845                 # the worst case scenario of the binary search (if ndr/pdr is not found)
846                 if tag not in results:
847                     results[tag] = dict.fromkeys(rates, 0)
848                     empty_stats = self.__format_output_stats(dict(stats))
849                     for key in empty_stats:
850                         if isinstance(empty_stats[key], dict):
851                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
852                         else:
853                             empty_stats[key] = 0
854                     results[tag].update({
855                         'load_percent_per_direction': 0,
856                         'stats': empty_stats,
857                         'timestamp_sec': None
858                     })
859                 left_targets[tag] = target
860
861         # search lower half
862         self.__range_search(left, middle, left_targets, results)
863
864         # search upper half only if the upper rate does not exceed
865         # 100%, this only happens when the first search at 100%
866         # yields a DR that is < target DR
867         if middle >= 100:
868             self.__targets_found(100, right_targets, results)
869         else:
870             self.__range_search(middle, right, right_targets, results)
871
872     def __run_search_iteration(self, rate):
873         """Run one iteration at the given rate level.
874
875         rate: the rate to send on each port in percent (0 to 100)
876         """
877         self._modify_load(rate)
878
879         # poll interval stats and collect them
880         for stats in self.run_traffic():
881             self.interval_collector.add(stats)
882             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
883             if time_elapsed_ratio >= 1:
884                 self.cancel_traffic()
885                 if not self.skip_sleep():
886                     time.sleep(self.config.pause_sec)
887         self.interval_collector.reset()
888
889         # get stats from the run
890         stats = self.runner.client.get_stats()
891         current_traffic_config = self._get_traffic_config()
892         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
893                                         stats['total_tx_rate'])
894         if warning is not None:
895             stats['warning'] = warning
896
897         # save reliable stats from whole iteration
898         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
899         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
900         return stats, current_traffic_config['direction-total']
901
902     @staticmethod
903     def log_stats(stats):
904         """Log estimated stats during run."""
905         report = {
906             'datetime': str(datetime.now()),
907             'tx_packets': stats['overall']['tx']['total_pkts'],
908             'rx_packets': stats['overall']['rx']['total_pkts'],
909             'drop_packets': stats['overall']['rx']['dropped_pkts'],
910             'drop_rate_percent': stats['overall']['drop_rate_percent']
911         }
912         LOG.info('TX: %(tx_packets)d; '
913                  'RX: %(rx_packets)d; '
914                  'Est. Dropped: %(drop_packets)d; '
915                  'Est. Drop rate: %(drop_rate_percent).4f%%',
916                  report)
917
918     def run_traffic(self):
919         """Start traffic and return intermediate stats for each interval."""
920         stats = self.runner.run()
921         while self.runner.is_running:
922             self.log_stats(stats)
923             yield stats
924             stats = self.runner.poll_stats()
925             if stats is None:
926                 return
927         self.log_stats(stats)
928         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
929         yield stats
930
931     def cancel_traffic(self):
932         """Stop traffic."""
933         self.runner.stop()
934
935     def _get_traffic_config(self):
936         config = {}
937         load_total = 0.0
938         bps_total = 0.0
939         pps_total = 0.0
940         for idx, rate in enumerate(self.run_config['rates']):
941             key = 'direction-forward' if idx == 0 else 'direction-reverse'
942             config[key] = {
943                 'l2frame_size': self.run_config['l2frame_size'],
944                 'duration_sec': self.run_config['duration_sec']
945             }
946             config[key].update(rate)
947             config[key].update(self.__convert_rates(rate))
948             load_total += float(config[key]['rate_percent'])
949             bps_total += float(config[key]['rate_bps'])
950             pps_total += float(config[key]['rate_pps'])
951         config['direction-total'] = dict(config['direction-forward'])
952         config['direction-total'].update({
953             'rate_percent': load_total,
954             'rate_pps': cast_integer(pps_total),
955             'rate_bps': bps_total
956         })
957
958         return config
959
960     def get_run_config(self, results):
961         """Return configuration which was used for the last run."""
962         r = {}
963         # because we want each direction to have the far end RX rates,
964         # use the far end index (1-idx) to retrieve the RX rates
965         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
966             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
967             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
968             r[key] = {
969                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
970                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
971                 "rx": self.__convert_rates({'rate_pps': rx_rate})
972             }
973
974         total = {}
975         for direction in ['orig', 'tx', 'rx']:
976             total[direction] = {}
977             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
978                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
979
980         r['direction-total'] = total
981         return r
982
983     def insert_interface_stats(self, pps_list):
984         """Insert interface stats to a list of packet path stats.
985
986         pps_list: a list of packet path stats instances indexed by chain index
987
988         This function will insert the packet path stats for the traffic gen ports 0 and 1
989         with itemized per chain tx/rx counters.
990         There will be as many packet path stats as chains.
991         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
992         self.pps_list:
993         [
994         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
995         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
996         ...
997         ]
998         """
999         def get_if_stats(chain_idx):
1000             return [InterfaceStats('p' + str(port), self.tool)
1001                     for port in range(2)]
1002         # keep the list of list of interface stats indexed by the chain id
1003         self.ifstats = [get_if_stats(chain_idx)
1004                         for chain_idx in range(self.config.service_chain_count)]
1005         # note that we need to make a copy of the ifs list so that any modification in the
1006         # list from pps will not change the list saved in self.ifstats
1007         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1008         # insert the corresponding pps in the passed list
1009         pps_list.extend(self.pps_list)
1010
1011     def update_interface_stats(self, diff=False):
1012         """Update all interface stats.
1013
1014         diff: if False, simply refresh the interface stats values with latest values
1015               if True, diff the interface stats with the latest values
1016         Make sure that the interface stats inserted in insert_interface_stats() are updated
1017         with proper values.
1018         self.ifstats:
1019         [
1020         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1021         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1022         ...
1023         ]
1024         """
1025         if diff:
1026             stats = self.gen.get_stats()
1027             for chain_idx, ifs in enumerate(self.ifstats):
1028                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1029                 # corresponding to the
1030                 # port 0 and port 1 for the given chain_idx
1031                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1032                 # interface stats for the pps because it could have been modified to contain
1033                 # additional interface stats
1034                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1035             # special handling for vxlan
1036             # in case of vxlan, flow stats are not available so all rx counters will be
1037             # zeros when the total rx port counter is non zero.
1038             # in that case,
1039             for port in range(2):
1040                 total_rx = 0
1041                 for ifs in self.ifstats:
1042                     total_rx += ifs[port].rx
1043                 if total_rx == 0:
1044                     # check if the total port rx from Trex is also zero
1045                     port_rx = stats[port]['rx']['total_pkts']
1046                     if port_rx:
1047                         # the total rx for all chains from port level stats is non zero
1048                         # which means that the per-chain stats are not available
1049                         if len(self.ifstats) == 1:
1050                             # only one chain, simply report the port level rx to the chain rx stats
1051                             self.ifstats[0][port].rx = port_rx
1052                         else:
1053                             for ifs in self.ifstats:
1054                                 # mark this data as unavailable
1055                                 ifs[port].rx = None
1056                             # pitch in the total rx only in the last chain pps
1057                             self.ifstats[-1][port].rx_total = port_rx
1058
1059     @staticmethod
1060     def compare_tx_rates(required, actual):
1061         """Compare the actual TX rate to the required TX rate."""
1062         threshold = 0.9
1063         are_different = False
1064         try:
1065             if float(actual) / required < threshold:
1066                 are_different = True
1067         except ZeroDivisionError:
1068             are_different = True
1069
1070         if are_different:
1071             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1072                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1073                   "to achieve the requested TX rate.".format(r=required, a=actual)
1074             LOG.info(msg)
1075             return msg
1076
1077         return None
1078
1079     def get_per_direction_rate(self):
1080         """Get the rate for each direction."""
1081         divisor = 2 if self.run_config['bidirectional'] else 1
1082         if 'rate_percent' in self.current_total_rate:
1083             # don't split rate if it's percentage
1084             divisor = 1
1085
1086         return utils.divide_rate(self.current_total_rate, divisor)
1087
1088     def close(self):
1089         """Close this instance."""
1090         try:
1091             self.gen.stop_traffic()
1092         except Exception:
1093             pass
1094         self.gen.clear_stats()
1095         self.gen.cleanup()