NFVBENCH-121 Add TRex parameters to tune performance and allocate ressources
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex_stl_lib.api import STLError
27 # pylint: enable=import-error
28
29 from log import LOG
30 from packet_stats import InterfaceStats
31 from packet_stats import PacketPathStats
32 from stats_collector import IntervalCollector
33 from stats_collector import IterationCollector
34 import traffic_gen.traffic_utils as utils
35 from utils import cast_integer
36
37
38 class TrafficClientException(Exception):
39     """Generic traffic client exception."""
40
41     pass
42
43
44 class TrafficRunner(object):
45     """Serialize various steps required to run traffic."""
46
47     def __init__(self, client, duration_sec, interval_sec=0):
48         """Create a traffic runner."""
49         self.client = client
50         self.start_time = None
51         self.duration_sec = duration_sec
52         self.interval_sec = interval_sec
53
54     def run(self):
55         """Clear stats and instruct the traffic generator to start generating traffic."""
56         if self.is_running():
57             return None
58         LOG.info('Running traffic generator')
59         self.client.gen.clear_stats()
60         self.client.gen.start_traffic()
61         self.start_time = time.time()
62         return self.poll_stats()
63
64     def stop(self):
65         """Stop the current run and instruct the traffic generator to stop traffic."""
66         if self.is_running():
67             self.start_time = None
68             self.client.gen.stop_traffic()
69
70     def is_running(self):
71         """Check if a run is still pending."""
72         return self.start_time is not None
73
74     def time_elapsed(self):
75         """Return time elapsed since start of run."""
76         if self.is_running():
77             return time.time() - self.start_time
78         return self.duration_sec
79
80     def poll_stats(self):
81         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
82
83         return: latest stats or None if traffic is stopped
84         """
85         if not self.is_running():
86             return None
87         if self.client.skip_sleep():
88             self.stop()
89             return self.client.get_stats()
90         time_elapsed = self.time_elapsed()
91         if time_elapsed > self.duration_sec:
92             self.stop()
93             return None
94         time_left = self.duration_sec - time_elapsed
95         if self.interval_sec > 0.0:
96             if time_left <= self.interval_sec:
97                 time.sleep(time_left)
98                 self.stop()
99             else:
100                 time.sleep(self.interval_sec)
101         else:
102             time.sleep(self.duration_sec)
103             self.stop()
104         return self.client.get_stats()
105
106
107 class IpBlock(object):
108     """Manage a block of IP addresses."""
109
110     def __init__(self, base_ip, step_ip, count_ip):
111         """Create an IP block."""
112         self.base_ip_int = Device.ip_to_int(base_ip)
113         self.step = Device.ip_to_int(step_ip)
114         self.max_available = count_ip
115         self.next_free = 0
116
117     def get_ip(self, index=0):
118         """Return the IP address at given index."""
119         if index < 0 or index >= self.max_available:
120             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
121         return Device.int_to_ip(self.base_ip_int + index * self.step)
122
123     def reserve_ip_range(self, count):
124         """Reserve a range of count consecutive IP addresses spaced by step."""
125         if self.next_free + count > self.max_available:
126             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
127                              (self.next_free,
128                               self.max_available,
129                               count))
130         first_ip = self.get_ip(self.next_free)
131         last_ip = self.get_ip(self.next_free + count - 1)
132         self.next_free += count
133         return (first_ip, last_ip)
134
135     def reset_reservation(self):
136         """Reset all reservations and restart with a completely unused IP block."""
137         self.next_free = 0
138
139
140 class Device(object):
141     """Represent a port device and all information associated to it.
142
143     In the curent version we only support 2 port devices for the traffic generator
144     identified as port 0 or port 1.
145     """
146
147     def __init__(self, port, generator_config):
148         """Create a new device for a given port."""
149         self.generator_config = generator_config
150         self.chain_count = generator_config.service_chain_count
151         self.flow_count = generator_config.flow_count / 2
152         self.port = port
153         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
154         self.vtep_vlan = None
155         self.vtep_src_mac = None
156         self.vxlan = False
157         self.pci = generator_config.interfaces[port].pci
158         self.mac = None
159         self.dest_macs = None
160         self.vtep_dst_mac = None
161         self.vtep_dst_ip = None
162         if generator_config.vteps is None:
163             self.vtep_src_ip = None
164         else:
165             self.vtep_src_ip = generator_config.vteps[port]
166         self.vnis = None
167         self.vlans = None
168         self.ip_addrs = generator_config.ip_addrs[port]
169         subnet = IPNetwork(self.ip_addrs)
170         self.ip = subnet.ip.format()
171         self.ip_addrs_step = generator_config.ip_addrs_step
172         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
173         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
174                                    generator_config.gateway_ip_addrs_step,
175                                    self.chain_count)
176         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
177         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
178                                       generator_config.tg_gateway_ip_addrs_step,
179                                       self.chain_count)
180         self.udp_src_port = generator_config.udp_src_port
181         self.udp_dst_port = generator_config.udp_dst_port
182
183     def set_mac(self, mac):
184         """Set the local MAC for this port device."""
185         if mac is None:
186             raise TrafficClientException('Trying to set traffic generator MAC address as None')
187         self.mac = mac
188
189     def get_peer_device(self):
190         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
191         return self.generator_config.devices[1 - self.port]
192
193     def set_vtep_dst_mac(self, dest_macs):
194         """Set the list of dest MACs indexed by the chain id.
195
196         This is only called in 2 cases:
197         - VM macs discovered using openstack API
198         - dest MACs provisioned in config file
199         """
200         self.vtep_dst_mac = map(str, dest_macs)
201
202     def set_dest_macs(self, dest_macs):
203         """Set the list of dest MACs indexed by the chain id.
204
205         This is only called in 2 cases:
206         - VM macs discovered using openstack API
207         - dest MACs provisioned in config file
208         """
209         self.dest_macs = map(str, dest_macs)
210
211     def get_dest_macs(self):
212         """Get the list of dest macs for this device.
213
214         If set_dest_macs was never called, assumes l2-loopback and return
215         a list of peer mac (as many as chains but normally only 1 chain)
216         """
217         if self.dest_macs:
218             return self.dest_macs
219         # assume this is l2-loopback
220         return [self.get_peer_device().mac] * self.chain_count
221
222     def set_vlans(self, vlans):
223         """Set the list of vlans to use indexed by the chain id."""
224         self.vlans = vlans
225         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
226
227     def set_vtep_vlan(self, vlan):
228         """Set the vtep vlan to use indexed by specific port."""
229         self.vtep_vlan = vlan
230         self.vxlan = True
231         self.vlan_tagging = None
232         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
233
234     def set_vxlan_endpoints(self, src_ip, dst_ip):
235         self.vtep_dst_ip = dst_ip
236         self.vtep_src_ip = src_ip
237         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
238                  self.vtep_src_ip, self.vtep_dst_ip)
239
240     def set_vxlans(self, vnis):
241         self.vnis = vnis
242         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
243
244     def get_gw_ip(self, chain_index):
245         """Retrieve the IP address assigned for the gateway of a given chain."""
246         return self.gw_ip_block.get_ip(chain_index)
247
248     def get_stream_configs(self):
249         """Get the stream config for a given chain on this device.
250
251         Called by the traffic generator driver to program the traffic generator properly
252         before generating traffic
253         """
254         configs = []
255         # exact flow count for each chain is calculated as follows:
256         # - all chains except the first will have the same flow count
257         #   calculated as (total_flows + chain_count - 1) / chain_count
258         # - the first chain will have the remainder
259         # example 11 flows and 3 chains => 3, 4, 4
260         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
261         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
262         peer = self.get_peer_device()
263         self.ip_block.reset_reservation()
264         peer.ip_block.reset_reservation()
265         dest_macs = self.get_dest_macs()
266
267         for chain_idx in xrange(self.chain_count):
268             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
269             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
270
271             configs.append({
272                 'count': cur_chain_flow_count,
273                 'mac_src': self.mac,
274                 'mac_dst': dest_macs[chain_idx],
275                 'ip_src_addr': src_ip_first,
276                 'ip_src_addr_max': src_ip_last,
277                 'ip_src_count': cur_chain_flow_count,
278                 'ip_dst_addr': dst_ip_first,
279                 'ip_dst_addr_max': dst_ip_last,
280                 'ip_dst_count': cur_chain_flow_count,
281                 'ip_addrs_step': self.ip_addrs_step,
282                 'udp_src_port': self.udp_src_port,
283                 'udp_dst_port': self.udp_dst_port,
284                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
285                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
286                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
287                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
288                 'vxlan': self.vxlan,
289                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
290                 'vtep_src_mac': self.mac if self.vxlan is True else None,
291                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
292                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
293                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
294                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
295             })
296             # after first chain, fall back to the flow count for all other chains
297             cur_chain_flow_count = flows_per_chain
298         return configs
299
300     @staticmethod
301     def ip_to_int(addr):
302         """Convert an IP address from string to numeric."""
303         return struct.unpack("!I", socket.inet_aton(addr))[0]
304
305     @staticmethod
306     def int_to_ip(nvalue):
307         """Convert an IP address from numeric to string."""
308         return socket.inet_ntoa(struct.pack("!I", nvalue))
309
310
311 class GeneratorConfig(object):
312     """Represents traffic configuration for currently running traffic profile."""
313
314     DEFAULT_IP_STEP = '0.0.0.1'
315     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
316
317     def __init__(self, config):
318         """Create a generator config."""
319         self.config = config
320         # name of the generator profile (normally trex or dummy)
321         # pick the default one if not specified explicitly from cli options
322         if not config.generator_profile:
323             config.generator_profile = config.traffic_generator.default_profile
324         # pick up the profile dict based on the name
325         gen_config = self.__match_generator_profile(config.traffic_generator,
326                                                     config.generator_profile)
327         self.gen_config = gen_config
328         # copy over fields from the dict
329         self.tool = gen_config.tool
330         self.ip = gen_config.ip
331         # overrides on config.cores and config.mbuf_factor
332         if config.cores:
333             self.cores = config.cores
334         else:
335             self.cores = gen_config.get('cores', 1)
336         self.mbuf_factor = config.mbuf_factor
337         if gen_config.intf_speed:
338             # interface speed is overriden from config
339             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
340         else:
341             # interface speed is discovered/provided by the traffic generator
342             self.intf_speed = 0
343         self.name = gen_config.name
344         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
345         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
346         self.limit_memory = gen_config.get('limit_memory', 1024)
347         self.software_mode = gen_config.get('software_mode', False)
348         self.interfaces = gen_config.interfaces
349         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
350             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
351         if hasattr(gen_config, 'platform'):
352             self.platform = gen_config.platform
353         self.service_chain = config.service_chain
354         self.service_chain_count = config.service_chain_count
355         self.flow_count = config.flow_count
356         self.host_name = gen_config.host_name
357
358         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
359         self.ip_addrs = gen_config.ip_addrs
360         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
361         self.tg_gateway_ip_addrs_step = \
362             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
363         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
364         self.gateway_ips = gen_config.gateway_ip_addrs
365         self.udp_src_port = gen_config.udp_src_port
366         self.udp_dst_port = gen_config.udp_dst_port
367         self.vteps = gen_config.get('vteps')
368         self.devices = [Device(port, self) for port in [0, 1]]
369         # This should normally always be [0, 1]
370         self.ports = [device.port for device in self.devices]
371
372         # check that pci is not empty
373         if not gen_config.interfaces[0].get('pci', None) or \
374            not gen_config.interfaces[1].get('pci', None):
375             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
376
377         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
378         self.vlan_tagging = config.vlan_tagging
379
380         # needed for result/summarizer
381         config['tg-name'] = gen_config.name
382         config['tg-tool'] = self.tool
383
384     def to_json(self):
385         """Get json form to display the content into the overall result dict."""
386         return dict(self.gen_config)
387
388     def set_dest_macs(self, port_index, dest_macs):
389         """Set the list of dest MACs indexed by the chain id on given port.
390
391         port_index: the port for which dest macs must be set
392         dest_macs: a list of dest MACs indexed by chain id
393         """
394         if len(dest_macs) != self.config.service_chain_count:
395             raise TrafficClientException('Dest MAC list %s must have %d entries' %
396                                          (dest_macs, self.config.service_chain_count))
397         self.devices[port_index].set_dest_macs(dest_macs)
398         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
399
400     def set_vtep_dest_macs(self, port_index, dest_macs):
401         """Set the list of dest MACs indexed by the chain id on given port.
402
403         port_index: the port for which dest macs must be set
404         dest_macs: a list of dest MACs indexed by chain id
405         """
406         if len(dest_macs) != self.config.service_chain_count:
407             raise TrafficClientException('Dest MAC list %s must have %d entries' %
408                                          (dest_macs, self.config.service_chain_count))
409         self.devices[port_index].set_vtep_dst_mac(dest_macs)
410         LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
411
412     def get_dest_macs(self):
413         """Return the list of dest macs indexed by port."""
414         return [dev.get_dest_macs() for dev in self.devices]
415
416     def set_vlans(self, port_index, vlans):
417         """Set the list of vlans to use indexed by the chain id on given port.
418
419         port_index: the port for which VLANs must be set
420         vlans: a  list of vlan lists indexed by chain id
421         """
422         if len(vlans) != self.config.service_chain_count:
423             raise TrafficClientException('VLAN list %s must have %d entries' %
424                                          (vlans, self.config.service_chain_count))
425         self.devices[port_index].set_vlans(vlans)
426
427     def set_vxlans(self, port_index, vxlans):
428         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
429
430         port_index: the port for which VXLANs must be set
431         VXLANs: a  list of VNIs lists indexed by chain id
432         """
433         if len(vxlans) != self.config.service_chain_count:
434             raise TrafficClientException('VXLAN list %s must have %d entries' %
435                                          (vxlans, self.config.service_chain_count))
436         self.devices[port_index].set_vxlans(vxlans)
437
438     def set_vtep_vlan(self, port_index, vlan):
439         """Set the vtep vlan to use indexed by the chain id on given port.
440         port_index: the port for which VLAN must be set
441         """
442         self.devices[port_index].set_vtep_vlan(vlan)
443
444     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
445         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
446
447     @staticmethod
448     def __match_generator_profile(traffic_generator, generator_profile):
449         gen_config = AttrDict(traffic_generator)
450         gen_config.pop('default_profile')
451         gen_config.pop('generator_profile')
452         matching_profile = [profile for profile in traffic_generator.generator_profile if
453                             profile.name == generator_profile]
454         if len(matching_profile) != 1:
455             raise Exception('Traffic generator profile not found: ' + generator_profile)
456
457         gen_config.update(matching_profile[0])
458         return gen_config
459
460
461 class TrafficClient(object):
462     """Traffic generator client with NDR/PDR binary seearch."""
463
464     PORTS = [0, 1]
465
466     def __init__(self, config, notifier=None):
467         """Create a new TrafficClient instance.
468
469         config: nfvbench config
470         notifier: notifier (optional)
471
472         A new instance is created everytime the nfvbench config may have changed.
473         """
474         self.config = config
475         self.generator_config = GeneratorConfig(config)
476         self.tool = self.generator_config.tool
477         self.gen = self._get_generator()
478         self.notifier = notifier
479         self.interval_collector = None
480         self.iteration_collector = None
481         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
482         self.config.frame_sizes = self._get_frame_sizes()
483         self.run_config = {
484             'l2frame_size': None,
485             'duration_sec': self.config.duration_sec,
486             'bidirectional': True,
487             'rates': []  # to avoid unsbuscriptable-obj warning
488         }
489         self.current_total_rate = {'rate_percent': '10'}
490         if self.config.single_run:
491             self.current_total_rate = utils.parse_rate_str(self.config.rate)
492         self.ifstats = None
493         # Speed is either discovered when connecting to TG or set from config
494         # This variable is 0 if not yet discovered from TG or must be the speed of
495         # each interface in bits per second
496         self.intf_speed = self.generator_config.intf_speed
497
498     def _get_generator(self):
499         tool = self.tool.lower()
500         if tool == 'trex':
501             from traffic_gen import trex
502             return trex.TRex(self)
503         if tool == 'dummy':
504             from traffic_gen import dummy
505             return dummy.DummyTG(self)
506         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
507
508     def skip_sleep(self):
509         """Skip all sleeps when doing unit testing with dummy TG.
510
511         Must be overriden using mock.patch
512         """
513         return False
514
515     def _get_frame_sizes(self):
516         traffic_profile_name = self.config.traffic.profile
517         matching_profiles = [profile for profile in self.config.traffic_profile if
518                              profile.name == traffic_profile_name]
519         if len(matching_profiles) > 1:
520             raise TrafficClientException('Multiple traffic profiles with name: ' +
521                                          traffic_profile_name)
522         elif not matching_profiles:
523             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
524         return matching_profiles[0].l2frame_size
525
526     def start_traffic_generator(self):
527         """Start the traffic generator process (traffic not started yet)."""
528         self.gen.connect()
529         # pick up the interface speed if it is not set from config
530         intf_speeds = self.gen.get_port_speed_gbps()
531         # convert Gbps unit into bps
532         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
533         if self.intf_speed:
534             # interface speed is overriden from config
535             if self.intf_speed != tg_if_speed:
536                 # Warn the user if the speed in the config is different
537                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
538                             intf_speeds[0])
539         else:
540             # interface speed not provisioned by config
541             self.intf_speed = tg_if_speed
542             # also update the speed in the tg config
543             self.generator_config.intf_speed = tg_if_speed
544
545         # Save the traffic generator local MAC
546         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
547             device.set_mac(mac)
548
549     def setup(self):
550         """Set up the traffic client."""
551         self.gen.clear_stats()
552
553     def get_version(self):
554         """Get the traffic generator version."""
555         return self.gen.get_version()
556
557     def ensure_end_to_end(self):
558         """Ensure traffic generator receives packets it has transmitted.
559
560         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
561
562         VMs that are started and in active state may not pass traffic yet. It is imperative to make
563         sure that all VMs are passing traffic in both directions before starting any benchmarking.
564         To verify this, we need to send at a low frequency bi-directional packets and make sure
565         that we receive all packets back from all VMs. The number of flows is equal to 2 times
566         the number of chains (1 per direction) and we need to make sure we receive packets coming
567         from exactly 2 x chain count different source MAC addresses.
568
569         Example:
570             PVP chain (1 VM per chain)
571             N = 10 (number of chains)
572             Flow count = 20 (number of flows)
573             If the number of unique source MAC addresses from received packets is 20 then
574             all 10 VMs 10 VMs are in operational state.
575         """
576         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
577         # send 2pps on each chain and each direction
578         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
579         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
580
581         # ensures enough traffic is coming back
582         retry_count = (self.config.check_traffic_time_sec +
583                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
584
585         # we expect to see packets coming from 2 unique MAC per chain
586         # because there can be flooding in the case of shared net
587         # we must verify that packets from the right VMs are received
588         # and not just count unique src MAC
589         # create a dict of (port, chain) tuples indexed by dest mac
590         mac_map = {}
591         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
592             for chain, mac in enumerate(dest_macs):
593                 mac_map[mac] = (port, chain)
594         unique_src_mac_count = len(mac_map)
595         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
596             get_mac_id = lambda packet: packet['binary'][60:66]
597         elif self.config.vxlan:
598             get_mac_id = lambda packet: packet['binary'][56:62]
599         else:
600             get_mac_id = lambda packet: packet['binary'][6:12]
601         for it in xrange(retry_count):
602             self.gen.clear_stats()
603             self.gen.start_traffic()
604             self.gen.start_capture()
605             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
606                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
607                      it + 1, retry_count)
608             if not self.skip_sleep():
609                 time.sleep(self.config.generic_poll_sec)
610             self.gen.stop_traffic()
611             self.gen.fetch_capture_packets()
612             self.gen.stop_capture()
613
614             for packet in self.gen.packet_list:
615                 mac_id = get_mac_id(packet)
616                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
617                 if src_mac in mac_map:
618                     port, chain = mac_map[src_mac]
619                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
620                              src_mac, chain, port)
621                     mac_map.pop(src_mac, None)
622
623                 if not mac_map:
624                     LOG.info('End-to-end connectivity established')
625                     return
626
627         raise TrafficClientException('End-to-end connectivity cannot be ensured')
628
629     def ensure_arp_successful(self):
630         """Resolve all IP using ARP and throw an exception in case of failure."""
631         dest_macs = self.gen.resolve_arp()
632         if dest_macs:
633             # all dest macs are discovered, saved them into the generator config
634             if self.config.vxlan:
635                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
636                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
637             else:
638                 self.generator_config.set_dest_macs(0, dest_macs[0])
639                 self.generator_config.set_dest_macs(1, dest_macs[1])
640         else:
641             raise TrafficClientException('ARP cannot be resolved')
642
643     def set_traffic(self, frame_size, bidirectional):
644         """Reconfigure the traffic generator for a new frame size."""
645         self.run_config['bidirectional'] = bidirectional
646         self.run_config['l2frame_size'] = frame_size
647         self.run_config['rates'] = [self.get_per_direction_rate()]
648         if bidirectional:
649             self.run_config['rates'].append(self.get_per_direction_rate())
650         else:
651             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
652             if unidir_reverse_pps > 0:
653                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
654         # Fix for [NFVBENCH-67], convert the rate string to PPS
655         for idx, rate in enumerate(self.run_config['rates']):
656             if 'rate_pps' not in rate:
657                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
658
659         self.gen.clear_streamblock()
660         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
661
662     def _modify_load(self, load):
663         self.current_total_rate = {'rate_percent': str(load)}
664         rate_per_direction = self.get_per_direction_rate()
665
666         self.gen.modify_rate(rate_per_direction, False)
667         self.run_config['rates'][0] = rate_per_direction
668         if self.run_config['bidirectional']:
669             self.gen.modify_rate(rate_per_direction, True)
670             self.run_config['rates'][1] = rate_per_direction
671
672     def get_ndr_and_pdr(self):
673         """Start the NDR/PDR iteration and return the results."""
674         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
675         targets = {}
676         if self.config.ndr_run:
677             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
678             targets['ndr'] = self.config.measurement.NDR
679         if self.config.pdr_run:
680             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
681             targets['pdr'] = self.config.measurement.PDR
682
683         self.run_config['start_time'] = time.time()
684         self.interval_collector = IntervalCollector(self.run_config['start_time'])
685         self.interval_collector.attach_notifier(self.notifier)
686         self.iteration_collector = IterationCollector(self.run_config['start_time'])
687         results = {}
688         self.__range_search(0.0, 200.0, targets, results)
689
690         results['iteration_stats'] = {
691             'ndr_pdr': self.iteration_collector.get()
692         }
693
694         if self.config.ndr_run:
695             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
696             results['ndr']['time_taken_sec'] = \
697                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
698             if self.config.pdr_run:
699                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
700                 results['pdr']['time_taken_sec'] = \
701                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
702         else:
703             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
704             results['pdr']['time_taken_sec'] = \
705                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
706         return results
707
708     def __get_dropped_rate(self, result):
709         dropped_pkts = result['rx']['dropped_pkts']
710         total_pkts = result['tx']['total_pkts']
711         if not total_pkts:
712             return float('inf')
713         return float(dropped_pkts) / total_pkts * 100
714
715     def get_stats(self):
716         """Collect final stats for previous run."""
717         stats = self.gen.get_stats()
718         retDict = {'total_tx_rate': stats['total_tx_rate']}
719         for port in self.PORTS:
720             retDict[port] = {'tx': {}, 'rx': {}}
721
722         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
723         rx_keys = tx_keys + ['dropped_pkts']
724
725         for port in self.PORTS:
726             for key in tx_keys:
727                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
728             for key in rx_keys:
729                 try:
730                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
731                 except ValueError:
732                     retDict[port]['rx'][key] = 0
733             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
734                 stats[port]['rx']['avg_delay_usec'])
735             retDict[port]['rx']['min_delay_usec'] = cast_integer(
736                 stats[port]['rx']['min_delay_usec'])
737             retDict[port]['rx']['max_delay_usec'] = cast_integer(
738                 stats[port]['rx']['max_delay_usec'])
739             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
740
741         ports = sorted(retDict.keys())
742         if self.run_config['bidirectional']:
743             retDict['overall'] = {'tx': {}, 'rx': {}}
744             for key in tx_keys:
745                 retDict['overall']['tx'][key] = \
746                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
747             for key in rx_keys:
748                 retDict['overall']['rx'][key] = \
749                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
750             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
751                           retDict[ports[1]]['rx']['total_pkts']]
752             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
753                           retDict[ports[1]]['rx']['avg_delay_usec']]
754             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
755                           retDict[ports[1]]['rx']['max_delay_usec']]
756             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
757                           retDict[ports[1]]['rx']['min_delay_usec']]
758             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
759             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
760             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
761             for key in ['pkt_bit_rate', 'pkt_rate']:
762                 for dirc in ['tx', 'rx']:
763                     retDict['overall'][dirc][key] /= 2.0
764         else:
765             retDict['overall'] = retDict[ports[0]]
766         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
767         return retDict
768
769     def __convert_rates(self, rate):
770         return utils.convert_rates(self.run_config['l2frame_size'],
771                                    rate,
772                                    self.intf_speed)
773
774     def __ndr_pdr_found(self, tag, load):
775         rates = self.__convert_rates({'rate_percent': load})
776         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
777         last_stats = self.iteration_collector.peek()
778         self.interval_collector.add_ndr_pdr(tag, last_stats)
779
780     def __format_output_stats(self, stats):
781         for key in self.PORTS + ['overall']:
782             interface = stats[key]
783             stats[key] = {
784                 'tx_pkts': interface['tx']['total_pkts'],
785                 'rx_pkts': interface['rx']['total_pkts'],
786                 'drop_percentage': interface['drop_rate_percent'],
787                 'drop_pct': interface['rx']['dropped_pkts'],
788                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
789                 'max_delay_usec': interface['rx']['max_delay_usec'],
790                 'min_delay_usec': interface['rx']['min_delay_usec'],
791             }
792
793         return stats
794
795     def __targets_found(self, rate, targets, results):
796         for tag, target in targets.iteritems():
797             LOG.info('Found %s (%s) load: %s', tag, target, rate)
798             self.__ndr_pdr_found(tag, rate)
799             results[tag]['timestamp_sec'] = time.time()
800
801     def __range_search(self, left, right, targets, results):
802         """Perform a binary search for a list of targets inside a [left..right] range or rate.
803
804         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
805                 indicating the rate to send on each interface
806         right   the right side of the range to search as a % of line rate
807                 indicating the rate to send on each interface
808         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
809                 ('ndr', 'pdr')
810         results a dict to store results
811         """
812         if not targets:
813             return
814         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
815
816         # Terminate search when gap is less than load epsilon
817         if right - left < self.config.measurement.load_epsilon:
818             self.__targets_found(left, targets, results)
819             return
820
821         # Obtain the average drop rate in for middle load
822         middle = (left + right) / 2.0
823         try:
824             stats, rates = self.__run_search_iteration(middle)
825         except STLError:
826             LOG.exception("Got exception from traffic generator during binary search")
827             self.__targets_found(left, targets, results)
828             return
829         # Split target dicts based on the avg drop rate
830         left_targets = {}
831         right_targets = {}
832         for tag, target in targets.iteritems():
833             if stats['overall']['drop_rate_percent'] <= target:
834                 # record the best possible rate found for this target
835                 results[tag] = rates
836                 results[tag].update({
837                     'load_percent_per_direction': middle,
838                     'stats': self.__format_output_stats(dict(stats)),
839                     'timestamp_sec': None
840                 })
841                 right_targets[tag] = target
842             else:
843                 # initialize to 0 all fields of result for
844                 # the worst case scenario of the binary search (if ndr/pdr is not found)
845                 if tag not in results:
846                     results[tag] = dict.fromkeys(rates, 0)
847                     empty_stats = self.__format_output_stats(dict(stats))
848                     for key in empty_stats:
849                         if isinstance(empty_stats[key], dict):
850                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
851                         else:
852                             empty_stats[key] = 0
853                     results[tag].update({
854                         'load_percent_per_direction': 0,
855                         'stats': empty_stats,
856                         'timestamp_sec': None
857                     })
858                 left_targets[tag] = target
859
860         # search lower half
861         self.__range_search(left, middle, left_targets, results)
862
863         # search upper half only if the upper rate does not exceed
864         # 100%, this only happens when the first search at 100%
865         # yields a DR that is < target DR
866         if middle >= 100:
867             self.__targets_found(100, right_targets, results)
868         else:
869             self.__range_search(middle, right, right_targets, results)
870
871     def __run_search_iteration(self, rate):
872         """Run one iteration at the given rate level.
873
874         rate: the rate to send on each port in percent (0 to 100)
875         """
876         self._modify_load(rate)
877
878         # poll interval stats and collect them
879         for stats in self.run_traffic():
880             self.interval_collector.add(stats)
881             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
882             if time_elapsed_ratio >= 1:
883                 self.cancel_traffic()
884                 if not self.skip_sleep():
885                     time.sleep(self.config.pause_sec)
886         self.interval_collector.reset()
887
888         # get stats from the run
889         stats = self.runner.client.get_stats()
890         current_traffic_config = self._get_traffic_config()
891         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
892                                         stats['total_tx_rate'])
893         if warning is not None:
894             stats['warning'] = warning
895
896         # save reliable stats from whole iteration
897         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
898         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
899         return stats, current_traffic_config['direction-total']
900
901     @staticmethod
902     def log_stats(stats):
903         """Log estimated stats during run."""
904         report = {
905             'datetime': str(datetime.now()),
906             'tx_packets': stats['overall']['tx']['total_pkts'],
907             'rx_packets': stats['overall']['rx']['total_pkts'],
908             'drop_packets': stats['overall']['rx']['dropped_pkts'],
909             'drop_rate_percent': stats['overall']['drop_rate_percent']
910         }
911         LOG.info('TX: %(tx_packets)d; '
912                  'RX: %(rx_packets)d; '
913                  'Est. Dropped: %(drop_packets)d; '
914                  'Est. Drop rate: %(drop_rate_percent).4f%%',
915                  report)
916
917     def run_traffic(self):
918         """Start traffic and return intermediate stats for each interval."""
919         stats = self.runner.run()
920         while self.runner.is_running:
921             self.log_stats(stats)
922             yield stats
923             stats = self.runner.poll_stats()
924             if stats is None:
925                 return
926         self.log_stats(stats)
927         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
928         yield stats
929
930     def cancel_traffic(self):
931         """Stop traffic."""
932         self.runner.stop()
933
934     def _get_traffic_config(self):
935         config = {}
936         load_total = 0.0
937         bps_total = 0.0
938         pps_total = 0.0
939         for idx, rate in enumerate(self.run_config['rates']):
940             key = 'direction-forward' if idx == 0 else 'direction-reverse'
941             config[key] = {
942                 'l2frame_size': self.run_config['l2frame_size'],
943                 'duration_sec': self.run_config['duration_sec']
944             }
945             config[key].update(rate)
946             config[key].update(self.__convert_rates(rate))
947             load_total += float(config[key]['rate_percent'])
948             bps_total += float(config[key]['rate_bps'])
949             pps_total += float(config[key]['rate_pps'])
950         config['direction-total'] = dict(config['direction-forward'])
951         config['direction-total'].update({
952             'rate_percent': load_total,
953             'rate_pps': cast_integer(pps_total),
954             'rate_bps': bps_total
955         })
956
957         return config
958
959     def get_run_config(self, results):
960         """Return configuration which was used for the last run."""
961         r = {}
962         # because we want each direction to have the far end RX rates,
963         # use the far end index (1-idx) to retrieve the RX rates
964         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
965             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
966             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
967             r[key] = {
968                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
969                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
970                 "rx": self.__convert_rates({'rate_pps': rx_rate})
971             }
972
973         total = {}
974         for direction in ['orig', 'tx', 'rx']:
975             total[direction] = {}
976             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
977                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
978
979         r['direction-total'] = total
980         return r
981
982     def insert_interface_stats(self, pps_list):
983         """Insert interface stats to a list of packet path stats.
984
985         pps_list: a list of packet path stats instances indexed by chain index
986
987         This function will insert the packet path stats for the traffic gen ports 0 and 1
988         with itemized per chain tx/rx counters.
989         There will be as many packet path stats as chains.
990         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
991         self.pps_list:
992         [
993         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
994         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
995         ...
996         ]
997         """
998         def get_if_stats(chain_idx):
999             return [InterfaceStats('p' + str(port), self.tool)
1000                     for port in range(2)]
1001         # keep the list of list of interface stats indexed by the chain id
1002         self.ifstats = [get_if_stats(chain_idx)
1003                         for chain_idx in range(self.config.service_chain_count)]
1004         # note that we need to make a copy of the ifs list so that any modification in the
1005         # list from pps will not change the list saved in self.ifstats
1006         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1007         # insert the corresponding pps in the passed list
1008         pps_list.extend(self.pps_list)
1009
1010     def update_interface_stats(self, diff=False):
1011         """Update all interface stats.
1012
1013         diff: if False, simply refresh the interface stats values with latest values
1014               if True, diff the interface stats with the latest values
1015         Make sure that the interface stats inserted in insert_interface_stats() are updated
1016         with proper values.
1017         self.ifstats:
1018         [
1019         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1020         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1021         ...
1022         ]
1023         """
1024         if diff:
1025             stats = self.gen.get_stats()
1026             for chain_idx, ifs in enumerate(self.ifstats):
1027                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1028                 # corresponding to the
1029                 # port 0 and port 1 for the given chain_idx
1030                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1031                 # interface stats for the pps because it could have been modified to contain
1032                 # additional interface stats
1033                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1034             # special handling for vxlan
1035             # in case of vxlan, flow stats are not available so all rx counters will be
1036             # zeros when the total rx port counter is non zero.
1037             # in that case,
1038             for port in range(2):
1039                 total_rx = 0
1040                 for ifs in self.ifstats:
1041                     total_rx += ifs[port].rx
1042                 if total_rx == 0:
1043                     # check if the total port rx from Trex is also zero
1044                     port_rx = stats[port]['rx']['total_pkts']
1045                     if port_rx:
1046                         # the total rx for all chains from port level stats is non zero
1047                         # which means that the per-chain stats are not available
1048                         if len(self.ifstats) == 1:
1049                             # only one chain, simply report the port level rx to the chain rx stats
1050                             self.ifstats[0][port].rx = port_rx
1051                         else:
1052                             for ifs in self.ifstats:
1053                                 # mark this data as unavailable
1054                                 ifs[port].rx = None
1055                             # pitch in the total rx only in the last chain pps
1056                             self.ifstats[-1][port].rx_total = port_rx
1057
1058     @staticmethod
1059     def compare_tx_rates(required, actual):
1060         """Compare the actual TX rate to the required TX rate."""
1061         threshold = 0.9
1062         are_different = False
1063         try:
1064             if float(actual) / required < threshold:
1065                 are_different = True
1066         except ZeroDivisionError:
1067             are_different = True
1068
1069         if are_different:
1070             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1071                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1072                   "to achieve the requested TX rate.".format(r=required, a=actual)
1073             LOG.info(msg)
1074             return msg
1075
1076         return None
1077
1078     def get_per_direction_rate(self):
1079         """Get the rate for each direction."""
1080         divisor = 2 if self.run_config['bidirectional'] else 1
1081         if 'rate_percent' in self.current_total_rate:
1082             # don't split rate if it's percentage
1083             divisor = 1
1084
1085         return utils.divide_rate(self.current_total_rate, divisor)
1086
1087     def close(self):
1088         """Close this instance."""
1089         try:
1090             self.gen.stop_traffic()
1091         except Exception:
1092             pass
1093         self.gen.clear_stats()
1094         self.gen.cleanup()