Add support for VXLAN latency
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: enable=import-error
30
31 from log import LOG
32 from packet_stats import InterfaceStats
33 from packet_stats import PacketPathStats
34 from stats_collector import IntervalCollector
35 from stats_collector import IterationCollector
36 import traffic_gen.traffic_utils as utils
37 from utils import cast_integer
38
39
40 class TrafficClientException(Exception):
41     """Generic traffic client exception."""
42
43     pass
44
45
46 class TrafficRunner(object):
47     """Serialize various steps required to run traffic."""
48
49     def __init__(self, client, duration_sec, interval_sec=0):
50         """Create a traffic runner."""
51         self.client = client
52         self.start_time = None
53         self.duration_sec = duration_sec
54         self.interval_sec = interval_sec
55
56     def run(self):
57         """Clear stats and instruct the traffic generator to start generating traffic."""
58         if self.is_running():
59             return None
60         LOG.info('Running traffic generator')
61         self.client.gen.clear_stats()
62         self.client.gen.start_traffic()
63         self.start_time = time.time()
64         return self.poll_stats()
65
66     def stop(self):
67         """Stop the current run and instruct the traffic generator to stop traffic."""
68         if self.is_running():
69             self.start_time = None
70             self.client.gen.stop_traffic()
71
72     def is_running(self):
73         """Check if a run is still pending."""
74         return self.start_time is not None
75
76     def time_elapsed(self):
77         """Return time elapsed since start of run."""
78         if self.is_running():
79             return time.time() - self.start_time
80         return self.duration_sec
81
82     def poll_stats(self):
83         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
84
85         return: latest stats or None if traffic is stopped
86         """
87         if not self.is_running():
88             return None
89         if self.client.skip_sleep():
90             self.stop()
91             return self.client.get_stats()
92         time_elapsed = self.time_elapsed()
93         if time_elapsed > self.duration_sec:
94             self.stop()
95             return None
96         time_left = self.duration_sec - time_elapsed
97         if self.interval_sec > 0.0:
98             if time_left <= self.interval_sec:
99                 time.sleep(time_left)
100                 self.stop()
101             else:
102                 time.sleep(self.interval_sec)
103         else:
104             time.sleep(self.duration_sec)
105             self.stop()
106         return self.client.get_stats()
107
108
109 class IpBlock(object):
110     """Manage a block of IP addresses."""
111
112     def __init__(self, base_ip, step_ip, count_ip):
113         """Create an IP block."""
114         self.base_ip_int = Device.ip_to_int(base_ip)
115         self.step = Device.ip_to_int(step_ip)
116         self.max_available = count_ip
117         self.next_free = 0
118
119     def get_ip(self, index=0):
120         """Return the IP address at given index."""
121         if index < 0 or index >= self.max_available:
122             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
123         return Device.int_to_ip(self.base_ip_int + index * self.step)
124
125     def reserve_ip_range(self, count):
126         """Reserve a range of count consecutive IP addresses spaced by step."""
127         if self.next_free + count > self.max_available:
128             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
129                              (self.next_free,
130                               self.max_available,
131                               count))
132         first_ip = self.get_ip(self.next_free)
133         last_ip = self.get_ip(self.next_free + count - 1)
134         self.next_free += count
135         return (first_ip, last_ip)
136
137     def reset_reservation(self):
138         """Reset all reservations and restart with a completely unused IP block."""
139         self.next_free = 0
140
141
142 class Device(object):
143     """Represent a port device and all information associated to it.
144
145     In the curent version we only support 2 port devices for the traffic generator
146     identified as port 0 or port 1.
147     """
148
149     def __init__(self, port, generator_config):
150         """Create a new device for a given port."""
151         self.generator_config = generator_config
152         self.chain_count = generator_config.service_chain_count
153         self.flow_count = generator_config.flow_count / 2
154         self.port = port
155         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
156         self.vtep_vlan = None
157         self.vtep_src_mac = None
158         self.vxlan = False
159         self.pci = generator_config.interfaces[port].pci
160         self.mac = None
161         self.dest_macs = None
162         self.vtep_dst_mac = None
163         self.vtep_dst_ip = None
164         if generator_config.vteps is None:
165             self.vtep_src_ip = None
166         else:
167             self.vtep_src_ip = generator_config.vteps[port]
168         self.vnis = None
169         self.vlans = None
170         self.ip_addrs = generator_config.ip_addrs[port]
171         subnet = IPNetwork(self.ip_addrs)
172         self.ip = subnet.ip.format()
173         self.ip_addrs_step = generator_config.ip_addrs_step
174         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
175         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
176                                    generator_config.gateway_ip_addrs_step,
177                                    self.chain_count)
178         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
179         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
180                                       generator_config.tg_gateway_ip_addrs_step,
181                                       self.chain_count)
182         self.udp_src_port = generator_config.udp_src_port
183         self.udp_dst_port = generator_config.udp_dst_port
184
185     def set_mac(self, mac):
186         """Set the local MAC for this port device."""
187         if mac is None:
188             raise TrafficClientException('Trying to set traffic generator MAC address as None')
189         self.mac = mac
190
191     def get_peer_device(self):
192         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
193         return self.generator_config.devices[1 - self.port]
194
195     def set_vtep_dst_mac(self, dest_macs):
196         """Set the list of dest MACs indexed by the chain id.
197
198         This is only called in 2 cases:
199         - VM macs discovered using openstack API
200         - dest MACs provisioned in config file
201         """
202         self.vtep_dst_mac = map(str, dest_macs)
203
204     def set_dest_macs(self, dest_macs):
205         """Set the list of dest MACs indexed by the chain id.
206
207         This is only called in 2 cases:
208         - VM macs discovered using openstack API
209         - dest MACs provisioned in config file
210         """
211         self.dest_macs = map(str, dest_macs)
212
213     def get_dest_macs(self):
214         """Get the list of dest macs for this device.
215
216         If set_dest_macs was never called, assumes l2-loopback and return
217         a list of peer mac (as many as chains but normally only 1 chain)
218         """
219         if self.dest_macs:
220             return self.dest_macs
221         # assume this is l2-loopback
222         return [self.get_peer_device().mac] * self.chain_count
223
224     def set_vlans(self, vlans):
225         """Set the list of vlans to use indexed by the chain id."""
226         self.vlans = vlans
227         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
228
229     def set_vtep_vlan(self, vlan):
230         """Set the vtep vlan to use indexed by specific port."""
231         self.vtep_vlan = vlan
232         self.vxlan = True
233         self.vlan_tagging = None
234         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
235
236     def set_vxlan_endpoints(self, src_ip, dst_ip):
237         self.vtep_dst_ip = dst_ip
238         self.vtep_src_ip = src_ip
239         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
240                  self.vtep_src_ip, self.vtep_dst_ip)
241
242     def set_vxlans(self, vnis):
243         self.vnis = vnis
244         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
245
246     def set_gw_ip(self, gateway_ip):
247         self.gw_ip_block = IpBlock(gateway_ip,
248                                    self.generator_config.gateway_ip_addrs_step,
249                                    self.chain_count)
250
251     def get_gw_ip(self, chain_index):
252         """Retrieve the IP address assigned for the gateway of a given chain."""
253         return self.gw_ip_block.get_ip(chain_index)
254
255     def get_stream_configs(self):
256         """Get the stream config for a given chain on this device.
257
258         Called by the traffic generator driver to program the traffic generator properly
259         before generating traffic
260         """
261         configs = []
262         # exact flow count for each chain is calculated as follows:
263         # - all chains except the first will have the same flow count
264         #   calculated as (total_flows + chain_count - 1) / chain_count
265         # - the first chain will have the remainder
266         # example 11 flows and 3 chains => 3, 4, 4
267         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
268         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
269         peer = self.get_peer_device()
270         self.ip_block.reset_reservation()
271         peer.ip_block.reset_reservation()
272         dest_macs = self.get_dest_macs()
273
274         for chain_idx in xrange(self.chain_count):
275             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
276             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
277
278             configs.append({
279                 'count': cur_chain_flow_count,
280                 'mac_src': self.mac,
281                 'mac_dst': dest_macs[chain_idx],
282                 'ip_src_addr': src_ip_first,
283                 'ip_src_addr_max': src_ip_last,
284                 'ip_src_count': cur_chain_flow_count,
285                 'ip_dst_addr': dst_ip_first,
286                 'ip_dst_addr_max': dst_ip_last,
287                 'ip_dst_count': cur_chain_flow_count,
288                 'ip_addrs_step': self.ip_addrs_step,
289                 'udp_src_port': self.udp_src_port,
290                 'udp_dst_port': self.udp_dst_port,
291                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
292                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
293                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
294                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
295                 'vxlan': self.vxlan,
296                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
297                 'vtep_src_mac': self.mac if self.vxlan is True else None,
298                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
299                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
300                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
301                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
302             })
303             # after first chain, fall back to the flow count for all other chains
304             cur_chain_flow_count = flows_per_chain
305         return configs
306
307     @staticmethod
308     def ip_to_int(addr):
309         """Convert an IP address from string to numeric."""
310         return struct.unpack("!I", socket.inet_aton(addr))[0]
311
312     @staticmethod
313     def int_to_ip(nvalue):
314         """Convert an IP address from numeric to string."""
315         return socket.inet_ntoa(struct.pack("!I", nvalue))
316
317
318 class GeneratorConfig(object):
319     """Represents traffic configuration for currently running traffic profile."""
320
321     DEFAULT_IP_STEP = '0.0.0.1'
322     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
323
324     def __init__(self, config):
325         """Create a generator config."""
326         self.config = config
327         # name of the generator profile (normally trex or dummy)
328         # pick the default one if not specified explicitly from cli options
329         if not config.generator_profile:
330             config.generator_profile = config.traffic_generator.default_profile
331         # pick up the profile dict based on the name
332         gen_config = self.__match_generator_profile(config.traffic_generator,
333                                                     config.generator_profile)
334         self.gen_config = gen_config
335         # copy over fields from the dict
336         self.tool = gen_config.tool
337         self.ip = gen_config.ip
338         # overrides on config.cores and config.mbuf_factor
339         if config.cores:
340             self.cores = config.cores
341         else:
342             self.cores = gen_config.get('cores', 1)
343         self.mbuf_factor = config.mbuf_factor
344         self.hdrh = not config.disable_hdrh
345         if gen_config.intf_speed:
346             # interface speed is overriden from config
347             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
348         else:
349             # interface speed is discovered/provided by the traffic generator
350             self.intf_speed = 0
351         self.name = gen_config.name
352         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
353         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
354         self.limit_memory = gen_config.get('limit_memory', 1024)
355         self.software_mode = gen_config.get('software_mode', False)
356         self.interfaces = gen_config.interfaces
357         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
358             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
359         if hasattr(gen_config, 'platform'):
360             self.platform = gen_config.platform
361         self.service_chain = config.service_chain
362         self.service_chain_count = config.service_chain_count
363         self.flow_count = config.flow_count
364         self.host_name = gen_config.host_name
365
366         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
367         self.ip_addrs = gen_config.ip_addrs
368         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
369         self.tg_gateway_ip_addrs_step = \
370             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
371         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
372         self.gateway_ips = gen_config.gateway_ip_addrs
373         self.udp_src_port = gen_config.udp_src_port
374         self.udp_dst_port = gen_config.udp_dst_port
375         self.vteps = gen_config.get('vteps')
376         self.devices = [Device(port, self) for port in [0, 1]]
377         # This should normally always be [0, 1]
378         self.ports = [device.port for device in self.devices]
379
380         # check that pci is not empty
381         if not gen_config.interfaces[0].get('pci', None) or \
382            not gen_config.interfaces[1].get('pci', None):
383             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
384
385         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
386         self.vlan_tagging = config.vlan_tagging
387
388         # needed for result/summarizer
389         config['tg-name'] = gen_config.name
390         config['tg-tool'] = self.tool
391
392     def to_json(self):
393         """Get json form to display the content into the overall result dict."""
394         return dict(self.gen_config)
395
396     def set_dest_macs(self, port_index, dest_macs):
397         """Set the list of dest MACs indexed by the chain id on given port.
398
399         port_index: the port for which dest macs must be set
400         dest_macs: a list of dest MACs indexed by chain id
401         """
402         if len(dest_macs) < self.config.service_chain_count:
403             raise TrafficClientException('Dest MAC list %s must have %d entries' %
404                                          (dest_macs, self.config.service_chain_count))
405         # only pass the first scc dest MACs
406         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
407         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
408
409     def set_vtep_dest_macs(self, port_index, dest_macs):
410         """Set the list of dest MACs indexed by the chain id on given port.
411
412         port_index: the port for which dest macs must be set
413         dest_macs: a list of dest MACs indexed by chain id
414         """
415         if len(dest_macs) != self.config.service_chain_count:
416             raise TrafficClientException('Dest MAC list %s must have %d entries' %
417                                          (dest_macs, self.config.service_chain_count))
418         self.devices[port_index].set_vtep_dst_mac(dest_macs)
419         LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
420
421     def get_dest_macs(self):
422         """Return the list of dest macs indexed by port."""
423         return [dev.get_dest_macs() for dev in self.devices]
424
425     def set_vlans(self, port_index, vlans):
426         """Set the list of vlans to use indexed by the chain id on given port.
427
428         port_index: the port for which VLANs must be set
429         vlans: a  list of vlan lists indexed by chain id
430         """
431         if len(vlans) != self.config.service_chain_count:
432             raise TrafficClientException('VLAN list %s must have %d entries' %
433                                          (vlans, self.config.service_chain_count))
434         self.devices[port_index].set_vlans(vlans)
435
436     def set_vxlans(self, port_index, vxlans):
437         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
438
439         port_index: the port for which VXLANs must be set
440         VXLANs: a  list of VNIs lists indexed by chain id
441         """
442         if len(vxlans) != self.config.service_chain_count:
443             raise TrafficClientException('VXLAN list %s must have %d entries' %
444                                          (vxlans, self.config.service_chain_count))
445         self.devices[port_index].set_vxlans(vxlans)
446
447     def set_vtep_vlan(self, port_index, vlan):
448         """Set the vtep vlan to use indexed by the chain id on given port.
449         port_index: the port for which VLAN must be set
450         """
451         self.devices[port_index].set_vtep_vlan(vlan)
452
453     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
454         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
455
456     @staticmethod
457     def __match_generator_profile(traffic_generator, generator_profile):
458         gen_config = AttrDict(traffic_generator)
459         gen_config.pop('default_profile')
460         gen_config.pop('generator_profile')
461         matching_profile = [profile for profile in traffic_generator.generator_profile if
462                             profile.name == generator_profile]
463         if len(matching_profile) != 1:
464             raise Exception('Traffic generator profile not found: ' + generator_profile)
465
466         gen_config.update(matching_profile[0])
467         return gen_config
468
469
470 class TrafficClient(object):
471     """Traffic generator client with NDR/PDR binary seearch."""
472
473     PORTS = [0, 1]
474
475     def __init__(self, config, notifier=None):
476         """Create a new TrafficClient instance.
477
478         config: nfvbench config
479         notifier: notifier (optional)
480
481         A new instance is created everytime the nfvbench config may have changed.
482         """
483         self.config = config
484         self.generator_config = GeneratorConfig(config)
485         self.tool = self.generator_config.tool
486         self.gen = self._get_generator()
487         self.notifier = notifier
488         self.interval_collector = None
489         self.iteration_collector = None
490         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
491         self.config.frame_sizes = self._get_frame_sizes()
492         self.run_config = {
493             'l2frame_size': None,
494             'duration_sec': self.config.duration_sec,
495             'bidirectional': True,
496             'rates': []  # to avoid unsbuscriptable-obj warning
497         }
498         self.current_total_rate = {'rate_percent': '10'}
499         if self.config.single_run:
500             self.current_total_rate = utils.parse_rate_str(self.config.rate)
501         self.ifstats = None
502         # Speed is either discovered when connecting to TG or set from config
503         # This variable is 0 if not yet discovered from TG or must be the speed of
504         # each interface in bits per second
505         self.intf_speed = self.generator_config.intf_speed
506
507     def _get_generator(self):
508         tool = self.tool.lower()
509         if tool == 'trex':
510             from traffic_gen import trex_gen
511             return trex_gen.TRex(self)
512         if tool == 'dummy':
513             from traffic_gen import dummy
514             return dummy.DummyTG(self)
515         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
516
517     def skip_sleep(self):
518         """Skip all sleeps when doing unit testing with dummy TG.
519
520         Must be overriden using mock.patch
521         """
522         return False
523
524     def _get_frame_sizes(self):
525         traffic_profile_name = self.config.traffic.profile
526         matching_profiles = [profile for profile in self.config.traffic_profile if
527                              profile.name == traffic_profile_name]
528         if len(matching_profiles) > 1:
529             raise TrafficClientException('Multiple traffic profiles with name: ' +
530                                          traffic_profile_name)
531         elif not matching_profiles:
532             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
533         return matching_profiles[0].l2frame_size
534
535     def start_traffic_generator(self):
536         """Start the traffic generator process (traffic not started yet)."""
537         self.gen.connect()
538         # pick up the interface speed if it is not set from config
539         intf_speeds = self.gen.get_port_speed_gbps()
540         # convert Gbps unit into bps
541         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
542         if self.intf_speed:
543             # interface speed is overriden from config
544             if self.intf_speed != tg_if_speed:
545                 # Warn the user if the speed in the config is different
546                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
547                             intf_speeds[0])
548         else:
549             # interface speed not provisioned by config
550             self.intf_speed = tg_if_speed
551             # also update the speed in the tg config
552             self.generator_config.intf_speed = tg_if_speed
553
554         # Save the traffic generator local MAC
555         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
556             device.set_mac(mac)
557
558     def setup(self):
559         """Set up the traffic client."""
560         self.gen.clear_stats()
561
562     def get_version(self):
563         """Get the traffic generator version."""
564         return self.gen.get_version()
565
566     def ensure_end_to_end(self):
567         """Ensure traffic generator receives packets it has transmitted.
568
569         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
570
571         VMs that are started and in active state may not pass traffic yet. It is imperative to make
572         sure that all VMs are passing traffic in both directions before starting any benchmarking.
573         To verify this, we need to send at a low frequency bi-directional packets and make sure
574         that we receive all packets back from all VMs. The number of flows is equal to 2 times
575         the number of chains (1 per direction) and we need to make sure we receive packets coming
576         from exactly 2 x chain count different source MAC addresses.
577
578         Example:
579             PVP chain (1 VM per chain)
580             N = 10 (number of chains)
581             Flow count = 20 (number of flows)
582             If the number of unique source MAC addresses from received packets is 20 then
583             all 10 VMs 10 VMs are in operational state.
584         """
585         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
586         # send 2pps on each chain and each direction
587         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
588         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
589                                 e2e=True)
590         # ensures enough traffic is coming back
591         retry_count = (self.config.check_traffic_time_sec +
592                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
593
594         # we expect to see packets coming from 2 unique MAC per chain
595         # because there can be flooding in the case of shared net
596         # we must verify that packets from the right VMs are received
597         # and not just count unique src MAC
598         # create a dict of (port, chain) tuples indexed by dest mac
599         mac_map = {}
600         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
601             for chain, mac in enumerate(dest_macs):
602                 mac_map[mac] = (port, chain)
603         unique_src_mac_count = len(mac_map)
604         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
605             get_mac_id = lambda packet: packet['binary'][60:66]
606         elif self.config.vxlan:
607             get_mac_id = lambda packet: packet['binary'][56:62]
608         else:
609             get_mac_id = lambda packet: packet['binary'][6:12]
610         for it in xrange(retry_count):
611             self.gen.clear_stats()
612             self.gen.start_traffic()
613             self.gen.start_capture()
614             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
615                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
616                      it + 1, retry_count)
617             if not self.skip_sleep():
618                 time.sleep(self.config.generic_poll_sec)
619             self.gen.stop_traffic()
620             self.gen.fetch_capture_packets()
621             self.gen.stop_capture()
622             for packet in self.gen.packet_list:
623                 mac_id = get_mac_id(packet)
624                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
625                 if src_mac in mac_map and self.is_udp(packet):
626                     port, chain = mac_map[src_mac]
627                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
628                              src_mac, chain, port)
629                     mac_map.pop(src_mac, None)
630
631                 if not mac_map:
632                     LOG.info('End-to-end connectivity established')
633                     return
634             if self.config.l3_router and not self.config.no_arp:
635                 # In case of L3 traffic mode, routers are not able to route traffic
636                 # until VM interfaces are up and ARP requests are done
637                 LOG.info('Waiting for loopback service completely started...')
638                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
639                 self.ensure_arp_successful()
640         raise TrafficClientException('End-to-end connectivity cannot be ensured')
641
642     def is_udp(self, packet):
643         pkt = Ether(packet['binary'])
644         return UDP in pkt
645
646     def ensure_arp_successful(self):
647         """Resolve all IP using ARP and throw an exception in case of failure."""
648         dest_macs = self.gen.resolve_arp()
649         if dest_macs:
650             # all dest macs are discovered, saved them into the generator config
651             if self.config.vxlan:
652                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
653                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
654             else:
655                 self.generator_config.set_dest_macs(0, dest_macs[0])
656                 self.generator_config.set_dest_macs(1, dest_macs[1])
657         else:
658             raise TrafficClientException('ARP cannot be resolved')
659
660     def set_traffic(self, frame_size, bidirectional):
661         """Reconfigure the traffic generator for a new frame size."""
662         self.run_config['bidirectional'] = bidirectional
663         self.run_config['l2frame_size'] = frame_size
664         self.run_config['rates'] = [self.get_per_direction_rate()]
665         if bidirectional:
666             self.run_config['rates'].append(self.get_per_direction_rate())
667         else:
668             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
669             if unidir_reverse_pps > 0:
670                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
671         # Fix for [NFVBENCH-67], convert the rate string to PPS
672         for idx, rate in enumerate(self.run_config['rates']):
673             if 'rate_pps' not in rate:
674                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
675
676         self.gen.clear_streamblock()
677         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
678
679     def _modify_load(self, load):
680         self.current_total_rate = {'rate_percent': str(load)}
681         rate_per_direction = self.get_per_direction_rate()
682
683         self.gen.modify_rate(rate_per_direction, False)
684         self.run_config['rates'][0] = rate_per_direction
685         if self.run_config['bidirectional']:
686             self.gen.modify_rate(rate_per_direction, True)
687             self.run_config['rates'][1] = rate_per_direction
688
689     def get_ndr_and_pdr(self):
690         """Start the NDR/PDR iteration and return the results."""
691         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
692         targets = {}
693         if self.config.ndr_run:
694             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
695             targets['ndr'] = self.config.measurement.NDR
696         if self.config.pdr_run:
697             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
698             targets['pdr'] = self.config.measurement.PDR
699
700         self.run_config['start_time'] = time.time()
701         self.interval_collector = IntervalCollector(self.run_config['start_time'])
702         self.interval_collector.attach_notifier(self.notifier)
703         self.iteration_collector = IterationCollector(self.run_config['start_time'])
704         results = {}
705         self.__range_search(0.0, 200.0, targets, results)
706
707         results['iteration_stats'] = {
708             'ndr_pdr': self.iteration_collector.get()
709         }
710
711         if self.config.ndr_run:
712             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
713             results['ndr']['time_taken_sec'] = \
714                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
715             if self.config.pdr_run:
716                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
717                 results['pdr']['time_taken_sec'] = \
718                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
719         else:
720             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
721             results['pdr']['time_taken_sec'] = \
722                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
723         return results
724
725     def __get_dropped_rate(self, result):
726         dropped_pkts = result['rx']['dropped_pkts']
727         total_pkts = result['tx']['total_pkts']
728         if not total_pkts:
729             return float('inf')
730         return float(dropped_pkts) / total_pkts * 100
731
732     def get_stats(self):
733         """Collect final stats for previous run."""
734         stats = self.gen.get_stats()
735         retDict = {'total_tx_rate': stats['total_tx_rate']}
736         for port in self.PORTS:
737             retDict[port] = {'tx': {}, 'rx': {}}
738
739         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
740         rx_keys = tx_keys + ['dropped_pkts']
741
742         for port in self.PORTS:
743             for key in tx_keys:
744                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
745             for key in rx_keys:
746                 try:
747                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
748                 except ValueError:
749                     retDict[port]['rx'][key] = 0
750             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
751                 stats[port]['rx']['avg_delay_usec'])
752             retDict[port]['rx']['min_delay_usec'] = cast_integer(
753                 stats[port]['rx']['min_delay_usec'])
754             retDict[port]['rx']['max_delay_usec'] = cast_integer(
755                 stats[port]['rx']['max_delay_usec'])
756             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
757
758         ports = sorted(retDict.keys())
759         if self.run_config['bidirectional']:
760             retDict['overall'] = {'tx': {}, 'rx': {}}
761             for key in tx_keys:
762                 retDict['overall']['tx'][key] = \
763                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
764             for key in rx_keys:
765                 retDict['overall']['rx'][key] = \
766                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
767             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
768                           retDict[ports[1]]['rx']['total_pkts']]
769             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
770                           retDict[ports[1]]['rx']['avg_delay_usec']]
771             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
772                           retDict[ports[1]]['rx']['max_delay_usec']]
773             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
774                           retDict[ports[1]]['rx']['min_delay_usec']]
775             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
776             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
777             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
778             for key in ['pkt_bit_rate', 'pkt_rate']:
779                 for dirc in ['tx', 'rx']:
780                     retDict['overall'][dirc][key] /= 2.0
781         else:
782             retDict['overall'] = retDict[ports[0]]
783         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
784         return retDict
785
786     def __convert_rates(self, rate):
787         return utils.convert_rates(self.run_config['l2frame_size'],
788                                    rate,
789                                    self.intf_speed)
790
791     def __ndr_pdr_found(self, tag, load):
792         rates = self.__convert_rates({'rate_percent': load})
793         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
794         last_stats = self.iteration_collector.peek()
795         self.interval_collector.add_ndr_pdr(tag, last_stats)
796
797     def __format_output_stats(self, stats):
798         for key in self.PORTS + ['overall']:
799             interface = stats[key]
800             stats[key] = {
801                 'tx_pkts': interface['tx']['total_pkts'],
802                 'rx_pkts': interface['rx']['total_pkts'],
803                 'drop_percentage': interface['drop_rate_percent'],
804                 'drop_pct': interface['rx']['dropped_pkts'],
805                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
806                 'max_delay_usec': interface['rx']['max_delay_usec'],
807                 'min_delay_usec': interface['rx']['min_delay_usec'],
808             }
809
810         return stats
811
812     def __targets_found(self, rate, targets, results):
813         for tag, target in targets.iteritems():
814             LOG.info('Found %s (%s) load: %s', tag, target, rate)
815             self.__ndr_pdr_found(tag, rate)
816             results[tag]['timestamp_sec'] = time.time()
817
818     def __range_search(self, left, right, targets, results):
819         """Perform a binary search for a list of targets inside a [left..right] range or rate.
820
821         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
822                 indicating the rate to send on each interface
823         right   the right side of the range to search as a % of line rate
824                 indicating the rate to send on each interface
825         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
826                 ('ndr', 'pdr')
827         results a dict to store results
828         """
829         if not targets:
830             return
831         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
832
833         # Terminate search when gap is less than load epsilon
834         if right - left < self.config.measurement.load_epsilon:
835             self.__targets_found(left, targets, results)
836             return
837
838         # Obtain the average drop rate in for middle load
839         middle = (left + right) / 2.0
840         try:
841             stats, rates = self.__run_search_iteration(middle)
842         except STLError:
843             LOG.exception("Got exception from traffic generator during binary search")
844             self.__targets_found(left, targets, results)
845             return
846         # Split target dicts based on the avg drop rate
847         left_targets = {}
848         right_targets = {}
849         for tag, target in targets.iteritems():
850             if stats['overall']['drop_rate_percent'] <= target:
851                 # record the best possible rate found for this target
852                 results[tag] = rates
853                 results[tag].update({
854                     'load_percent_per_direction': middle,
855                     'stats': self.__format_output_stats(dict(stats)),
856                     'timestamp_sec': None
857                 })
858                 right_targets[tag] = target
859             else:
860                 # initialize to 0 all fields of result for
861                 # the worst case scenario of the binary search (if ndr/pdr is not found)
862                 if tag not in results:
863                     results[tag] = dict.fromkeys(rates, 0)
864                     empty_stats = self.__format_output_stats(dict(stats))
865                     for key in empty_stats:
866                         if isinstance(empty_stats[key], dict):
867                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
868                         else:
869                             empty_stats[key] = 0
870                     results[tag].update({
871                         'load_percent_per_direction': 0,
872                         'stats': empty_stats,
873                         'timestamp_sec': None
874                     })
875                 left_targets[tag] = target
876
877         # search lower half
878         self.__range_search(left, middle, left_targets, results)
879
880         # search upper half only if the upper rate does not exceed
881         # 100%, this only happens when the first search at 100%
882         # yields a DR that is < target DR
883         if middle >= 100:
884             self.__targets_found(100, right_targets, results)
885         else:
886             self.__range_search(middle, right, right_targets, results)
887
888     def __run_search_iteration(self, rate):
889         """Run one iteration at the given rate level.
890
891         rate: the rate to send on each port in percent (0 to 100)
892         """
893         self._modify_load(rate)
894
895         # poll interval stats and collect them
896         for stats in self.run_traffic():
897             self.interval_collector.add(stats)
898             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
899             if time_elapsed_ratio >= 1:
900                 self.cancel_traffic()
901                 if not self.skip_sleep():
902                     time.sleep(self.config.pause_sec)
903         self.interval_collector.reset()
904
905         # get stats from the run
906         stats = self.runner.client.get_stats()
907         current_traffic_config = self._get_traffic_config()
908         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
909                                         stats['total_tx_rate'])
910         if warning is not None:
911             stats['warning'] = warning
912
913         # save reliable stats from whole iteration
914         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
915         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
916         return stats, current_traffic_config['direction-total']
917
918     @staticmethod
919     def log_stats(stats):
920         """Log estimated stats during run."""
921         report = {
922             'datetime': str(datetime.now()),
923             'tx_packets': stats['overall']['tx']['total_pkts'],
924             'rx_packets': stats['overall']['rx']['total_pkts'],
925             'drop_packets': stats['overall']['rx']['dropped_pkts'],
926             'drop_rate_percent': stats['overall']['drop_rate_percent']
927         }
928         LOG.info('TX: %(tx_packets)d; '
929                  'RX: %(rx_packets)d; '
930                  'Est. Dropped: %(drop_packets)d; '
931                  'Est. Drop rate: %(drop_rate_percent).4f%%',
932                  report)
933
934     def run_traffic(self):
935         """Start traffic and return intermediate stats for each interval."""
936         stats = self.runner.run()
937         while self.runner.is_running:
938             self.log_stats(stats)
939             yield stats
940             stats = self.runner.poll_stats()
941             if stats is None:
942                 return
943         self.log_stats(stats)
944         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
945         yield stats
946
947     def cancel_traffic(self):
948         """Stop traffic."""
949         self.runner.stop()
950
951     def _get_traffic_config(self):
952         config = {}
953         load_total = 0.0
954         bps_total = 0.0
955         pps_total = 0.0
956         for idx, rate in enumerate(self.run_config['rates']):
957             key = 'direction-forward' if idx == 0 else 'direction-reverse'
958             config[key] = {
959                 'l2frame_size': self.run_config['l2frame_size'],
960                 'duration_sec': self.run_config['duration_sec']
961             }
962             config[key].update(rate)
963             config[key].update(self.__convert_rates(rate))
964             load_total += float(config[key]['rate_percent'])
965             bps_total += float(config[key]['rate_bps'])
966             pps_total += float(config[key]['rate_pps'])
967         config['direction-total'] = dict(config['direction-forward'])
968         config['direction-total'].update({
969             'rate_percent': load_total,
970             'rate_pps': cast_integer(pps_total),
971             'rate_bps': bps_total
972         })
973
974         return config
975
976     def get_run_config(self, results):
977         """Return configuration which was used for the last run."""
978         r = {}
979         # because we want each direction to have the far end RX rates,
980         # use the far end index (1-idx) to retrieve the RX rates
981         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
982             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
983             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
984             r[key] = {
985                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
986                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
987                 "rx": self.__convert_rates({'rate_pps': rx_rate})
988             }
989
990         total = {}
991         for direction in ['orig', 'tx', 'rx']:
992             total[direction] = {}
993             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
994                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
995
996         r['direction-total'] = total
997         return r
998
999     def insert_interface_stats(self, pps_list):
1000         """Insert interface stats to a list of packet path stats.
1001
1002         pps_list: a list of packet path stats instances indexed by chain index
1003
1004         This function will insert the packet path stats for the traffic gen ports 0 and 1
1005         with itemized per chain tx/rx counters.
1006         There will be as many packet path stats as chains.
1007         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1008         self.pps_list:
1009         [
1010         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1011         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1012         ...
1013         ]
1014         """
1015         def get_if_stats(chain_idx):
1016             return [InterfaceStats('p' + str(port), self.tool)
1017                     for port in range(2)]
1018         # keep the list of list of interface stats indexed by the chain id
1019         self.ifstats = [get_if_stats(chain_idx)
1020                         for chain_idx in range(self.config.service_chain_count)]
1021         # note that we need to make a copy of the ifs list so that any modification in the
1022         # list from pps will not change the list saved in self.ifstats
1023         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1024         # insert the corresponding pps in the passed list
1025         pps_list.extend(self.pps_list)
1026
1027     def update_interface_stats(self, diff=False):
1028         """Update all interface stats.
1029
1030         diff: if False, simply refresh the interface stats values with latest values
1031               if True, diff the interface stats with the latest values
1032         Make sure that the interface stats inserted in insert_interface_stats() are updated
1033         with proper values.
1034         self.ifstats:
1035         [
1036         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1037         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1038         ...
1039         ]
1040         """
1041         if diff:
1042             stats = self.gen.get_stats()
1043             for chain_idx, ifs in enumerate(self.ifstats):
1044                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1045                 # corresponding to the
1046                 # port 0 and port 1 for the given chain_idx
1047                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1048                 # interface stats for the pps because it could have been modified to contain
1049                 # additional interface stats
1050                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1051             # special handling for vxlan
1052             # in case of vxlan, flow stats are not available so all rx counters will be
1053             # zeros when the total rx port counter is non zero.
1054             # in that case,
1055             for port in range(2):
1056                 total_rx = 0
1057                 for ifs in self.ifstats:
1058                     total_rx += ifs[port].rx
1059                 if total_rx == 0:
1060                     # check if the total port rx from Trex is also zero
1061                     port_rx = stats[port]['rx']['total_pkts']
1062                     if port_rx:
1063                         # the total rx for all chains from port level stats is non zero
1064                         # which means that the per-chain stats are not available
1065                         if len(self.ifstats) == 1:
1066                             # only one chain, simply report the port level rx to the chain rx stats
1067                             self.ifstats[0][port].rx = port_rx
1068                         else:
1069                             for ifs in self.ifstats:
1070                                 # mark this data as unavailable
1071                                 ifs[port].rx = None
1072                             # pitch in the total rx only in the last chain pps
1073                             self.ifstats[-1][port].rx_total = port_rx
1074
1075     @staticmethod
1076     def compare_tx_rates(required, actual):
1077         """Compare the actual TX rate to the required TX rate."""
1078         threshold = 0.9
1079         are_different = False
1080         try:
1081             if float(actual) / required < threshold:
1082                 are_different = True
1083         except ZeroDivisionError:
1084             are_different = True
1085
1086         if are_different:
1087             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1088                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1089                   "to achieve the requested TX rate.".format(r=required, a=actual)
1090             LOG.info(msg)
1091             return msg
1092
1093         return None
1094
1095     def get_per_direction_rate(self):
1096         """Get the rate for each direction."""
1097         divisor = 2 if self.run_config['bidirectional'] else 1
1098         if 'rate_percent' in self.current_total_rate:
1099             # don't split rate if it's percentage
1100             divisor = 1
1101
1102         return utils.divide_rate(self.current_total_rate, divisor)
1103
1104     def close(self):
1105         """Close this instance."""
1106         try:
1107             self.gen.stop_traffic()
1108         except Exception:
1109             pass
1110         self.gen.clear_stats()
1111         self.gen.cleanup()