NFVBENCH-140 Retrieve High Dynamic Range latency histograms with TRex v2.59
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: enable=import-error
30
31 from log import LOG
32 from packet_stats import InterfaceStats
33 from packet_stats import PacketPathStats
34 from stats_collector import IntervalCollector
35 from stats_collector import IterationCollector
36 import traffic_gen.traffic_utils as utils
37 from utils import cast_integer
38
39
40 class TrafficClientException(Exception):
41     """Generic traffic client exception."""
42
43     pass
44
45
46 class TrafficRunner(object):
47     """Serialize various steps required to run traffic."""
48
49     def __init__(self, client, duration_sec, interval_sec=0):
50         """Create a traffic runner."""
51         self.client = client
52         self.start_time = None
53         self.duration_sec = duration_sec
54         self.interval_sec = interval_sec
55
56     def run(self):
57         """Clear stats and instruct the traffic generator to start generating traffic."""
58         if self.is_running():
59             return None
60         LOG.info('Running traffic generator')
61         self.client.gen.clear_stats()
62         self.client.gen.start_traffic()
63         self.start_time = time.time()
64         return self.poll_stats()
65
66     def stop(self):
67         """Stop the current run and instruct the traffic generator to stop traffic."""
68         if self.is_running():
69             self.start_time = None
70             self.client.gen.stop_traffic()
71
72     def is_running(self):
73         """Check if a run is still pending."""
74         return self.start_time is not None
75
76     def time_elapsed(self):
77         """Return time elapsed since start of run."""
78         if self.is_running():
79             return time.time() - self.start_time
80         return self.duration_sec
81
82     def poll_stats(self):
83         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
84
85         return: latest stats or None if traffic is stopped
86         """
87         if not self.is_running():
88             return None
89         if self.client.skip_sleep():
90             self.stop()
91             return self.client.get_stats()
92         time_elapsed = self.time_elapsed()
93         if time_elapsed > self.duration_sec:
94             self.stop()
95             return None
96         time_left = self.duration_sec - time_elapsed
97         if self.interval_sec > 0.0:
98             if time_left <= self.interval_sec:
99                 time.sleep(time_left)
100                 self.stop()
101             else:
102                 time.sleep(self.interval_sec)
103         else:
104             time.sleep(self.duration_sec)
105             self.stop()
106         return self.client.get_stats()
107
108
109 class IpBlock(object):
110     """Manage a block of IP addresses."""
111
112     def __init__(self, base_ip, step_ip, count_ip):
113         """Create an IP block."""
114         self.base_ip_int = Device.ip_to_int(base_ip)
115         self.step = Device.ip_to_int(step_ip)
116         self.max_available = count_ip
117         self.next_free = 0
118
119     def get_ip(self, index=0):
120         """Return the IP address at given index."""
121         if index < 0 or index >= self.max_available:
122             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
123         return Device.int_to_ip(self.base_ip_int + index * self.step)
124
125     def reserve_ip_range(self, count):
126         """Reserve a range of count consecutive IP addresses spaced by step."""
127         if self.next_free + count > self.max_available:
128             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
129                              (self.next_free,
130                               self.max_available,
131                               count))
132         first_ip = self.get_ip(self.next_free)
133         last_ip = self.get_ip(self.next_free + count - 1)
134         self.next_free += count
135         return (first_ip, last_ip)
136
137     def reset_reservation(self):
138         """Reset all reservations and restart with a completely unused IP block."""
139         self.next_free = 0
140
141
142 class Device(object):
143     """Represent a port device and all information associated to it.
144
145     In the curent version we only support 2 port devices for the traffic generator
146     identified as port 0 or port 1.
147     """
148
149     def __init__(self, port, generator_config):
150         """Create a new device for a given port."""
151         self.generator_config = generator_config
152         self.chain_count = generator_config.service_chain_count
153         self.flow_count = generator_config.flow_count / 2
154         self.port = port
155         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
156         self.vtep_vlan = None
157         self.vtep_src_mac = None
158         self.vxlan = False
159         self.pci = generator_config.interfaces[port].pci
160         self.mac = None
161         self.dest_macs = None
162         self.vtep_dst_mac = None
163         self.vtep_dst_ip = None
164         if generator_config.vteps is None:
165             self.vtep_src_ip = None
166         else:
167             self.vtep_src_ip = generator_config.vteps[port]
168         self.vnis = None
169         self.vlans = None
170         self.ip_addrs = generator_config.ip_addrs[port]
171         subnet = IPNetwork(self.ip_addrs)
172         self.ip = subnet.ip.format()
173         self.ip_addrs_step = generator_config.ip_addrs_step
174         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
175         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
176                                    generator_config.gateway_ip_addrs_step,
177                                    self.chain_count)
178         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
179         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
180                                       generator_config.tg_gateway_ip_addrs_step,
181                                       self.chain_count)
182         self.udp_src_port = generator_config.udp_src_port
183         self.udp_dst_port = generator_config.udp_dst_port
184
185     def set_mac(self, mac):
186         """Set the local MAC for this port device."""
187         if mac is None:
188             raise TrafficClientException('Trying to set traffic generator MAC address as None')
189         self.mac = mac
190
191     def get_peer_device(self):
192         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
193         return self.generator_config.devices[1 - self.port]
194
195     def set_vtep_dst_mac(self, dest_macs):
196         """Set the list of dest MACs indexed by the chain id.
197
198         This is only called in 2 cases:
199         - VM macs discovered using openstack API
200         - dest MACs provisioned in config file
201         """
202         self.vtep_dst_mac = map(str, dest_macs)
203
204     def set_dest_macs(self, dest_macs):
205         """Set the list of dest MACs indexed by the chain id.
206
207         This is only called in 2 cases:
208         - VM macs discovered using openstack API
209         - dest MACs provisioned in config file
210         """
211         self.dest_macs = map(str, dest_macs)
212
213     def get_dest_macs(self):
214         """Get the list of dest macs for this device.
215
216         If set_dest_macs was never called, assumes l2-loopback and return
217         a list of peer mac (as many as chains but normally only 1 chain)
218         """
219         if self.dest_macs:
220             return self.dest_macs
221         # assume this is l2-loopback
222         return [self.get_peer_device().mac] * self.chain_count
223
224     def set_vlans(self, vlans):
225         """Set the list of vlans to use indexed by the chain id."""
226         self.vlans = vlans
227         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
228
229     def set_vtep_vlan(self, vlan):
230         """Set the vtep vlan to use indexed by specific port."""
231         self.vtep_vlan = vlan
232         self.vxlan = True
233         self.vlan_tagging = None
234         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
235
236     def set_vxlan_endpoints(self, src_ip, dst_ip):
237         self.vtep_dst_ip = dst_ip
238         self.vtep_src_ip = src_ip
239         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
240                  self.vtep_src_ip, self.vtep_dst_ip)
241
242     def set_vxlans(self, vnis):
243         self.vnis = vnis
244         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
245
246     def set_gw_ip(self, gateway_ip):
247         self.gw_ip_block = IpBlock(gateway_ip,
248                                    self.generator_config.gateway_ip_addrs_step,
249                                    self.chain_count)
250
251     def get_gw_ip(self, chain_index):
252         """Retrieve the IP address assigned for the gateway of a given chain."""
253         return self.gw_ip_block.get_ip(chain_index)
254
255     def get_stream_configs(self):
256         """Get the stream config for a given chain on this device.
257
258         Called by the traffic generator driver to program the traffic generator properly
259         before generating traffic
260         """
261         configs = []
262         # exact flow count for each chain is calculated as follows:
263         # - all chains except the first will have the same flow count
264         #   calculated as (total_flows + chain_count - 1) / chain_count
265         # - the first chain will have the remainder
266         # example 11 flows and 3 chains => 3, 4, 4
267         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
268         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
269         peer = self.get_peer_device()
270         self.ip_block.reset_reservation()
271         peer.ip_block.reset_reservation()
272         dest_macs = self.get_dest_macs()
273
274         for chain_idx in xrange(self.chain_count):
275             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
276             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
277
278             configs.append({
279                 'count': cur_chain_flow_count,
280                 'mac_src': self.mac,
281                 'mac_dst': dest_macs[chain_idx],
282                 'ip_src_addr': src_ip_first,
283                 'ip_src_addr_max': src_ip_last,
284                 'ip_src_count': cur_chain_flow_count,
285                 'ip_dst_addr': dst_ip_first,
286                 'ip_dst_addr_max': dst_ip_last,
287                 'ip_dst_count': cur_chain_flow_count,
288                 'ip_addrs_step': self.ip_addrs_step,
289                 'udp_src_port': self.udp_src_port,
290                 'udp_dst_port': self.udp_dst_port,
291                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
292                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
293                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
294                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
295                 'vxlan': self.vxlan,
296                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
297                 'vtep_src_mac': self.mac if self.vxlan is True else None,
298                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
299                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
300                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
301                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
302             })
303             # after first chain, fall back to the flow count for all other chains
304             cur_chain_flow_count = flows_per_chain
305         return configs
306
307     @staticmethod
308     def ip_to_int(addr):
309         """Convert an IP address from string to numeric."""
310         return struct.unpack("!I", socket.inet_aton(addr))[0]
311
312     @staticmethod
313     def int_to_ip(nvalue):
314         """Convert an IP address from numeric to string."""
315         return socket.inet_ntoa(struct.pack("!I", nvalue))
316
317
318 class GeneratorConfig(object):
319     """Represents traffic configuration for currently running traffic profile."""
320
321     DEFAULT_IP_STEP = '0.0.0.1'
322     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
323
324     def __init__(self, config):
325         """Create a generator config."""
326         self.config = config
327         # name of the generator profile (normally trex or dummy)
328         # pick the default one if not specified explicitly from cli options
329         if not config.generator_profile:
330             config.generator_profile = config.traffic_generator.default_profile
331         # pick up the profile dict based on the name
332         gen_config = self.__match_generator_profile(config.traffic_generator,
333                                                     config.generator_profile)
334         self.gen_config = gen_config
335         # copy over fields from the dict
336         self.tool = gen_config.tool
337         self.ip = gen_config.ip
338         # overrides on config.cores and config.mbuf_factor
339         if config.cores:
340             self.cores = config.cores
341         else:
342             self.cores = gen_config.get('cores', 1)
343         self.mbuf_factor = config.mbuf_factor
344         self.hdrh = not config.disable_hdrh
345         if gen_config.intf_speed:
346             # interface speed is overriden from config
347             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
348         else:
349             # interface speed is discovered/provided by the traffic generator
350             self.intf_speed = 0
351         self.name = gen_config.name
352         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
353         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
354         self.limit_memory = gen_config.get('limit_memory', 1024)
355         self.software_mode = gen_config.get('software_mode', False)
356         self.interfaces = gen_config.interfaces
357         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
358             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
359         if hasattr(gen_config, 'platform'):
360             self.platform = gen_config.platform
361         self.service_chain = config.service_chain
362         self.service_chain_count = config.service_chain_count
363         self.flow_count = config.flow_count
364         self.host_name = gen_config.host_name
365
366         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
367         self.ip_addrs = gen_config.ip_addrs
368         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
369         self.tg_gateway_ip_addrs_step = \
370             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
371         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
372         self.gateway_ips = gen_config.gateway_ip_addrs
373         self.udp_src_port = gen_config.udp_src_port
374         self.udp_dst_port = gen_config.udp_dst_port
375         self.vteps = gen_config.get('vteps')
376         self.devices = [Device(port, self) for port in [0, 1]]
377         # This should normally always be [0, 1]
378         self.ports = [device.port for device in self.devices]
379
380         # check that pci is not empty
381         if not gen_config.interfaces[0].get('pci', None) or \
382            not gen_config.interfaces[1].get('pci', None):
383             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
384
385         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
386         self.vlan_tagging = config.vlan_tagging
387
388         # needed for result/summarizer
389         config['tg-name'] = gen_config.name
390         config['tg-tool'] = self.tool
391
392     def to_json(self):
393         """Get json form to display the content into the overall result dict."""
394         return dict(self.gen_config)
395
396     def set_dest_macs(self, port_index, dest_macs):
397         """Set the list of dest MACs indexed by the chain id on given port.
398
399         port_index: the port for which dest macs must be set
400         dest_macs: a list of dest MACs indexed by chain id
401         """
402         if len(dest_macs) < self.config.service_chain_count:
403             raise TrafficClientException('Dest MAC list %s must have %d entries' %
404                                          (dest_macs, self.config.service_chain_count))
405         # only pass the first scc dest MACs
406         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
407         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
408
409     def set_vtep_dest_macs(self, port_index, dest_macs):
410         """Set the list of dest MACs indexed by the chain id on given port.
411
412         port_index: the port for which dest macs must be set
413         dest_macs: a list of dest MACs indexed by chain id
414         """
415         if len(dest_macs) != self.config.service_chain_count:
416             raise TrafficClientException('Dest MAC list %s must have %d entries' %
417                                          (dest_macs, self.config.service_chain_count))
418         self.devices[port_index].set_vtep_dst_mac(dest_macs)
419         LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
420
421     def get_dest_macs(self):
422         """Return the list of dest macs indexed by port."""
423         return [dev.get_dest_macs() for dev in self.devices]
424
425     def set_vlans(self, port_index, vlans):
426         """Set the list of vlans to use indexed by the chain id on given port.
427
428         port_index: the port for which VLANs must be set
429         vlans: a  list of vlan lists indexed by chain id
430         """
431         if len(vlans) != self.config.service_chain_count:
432             raise TrafficClientException('VLAN list %s must have %d entries' %
433                                          (vlans, self.config.service_chain_count))
434         self.devices[port_index].set_vlans(vlans)
435
436     def set_vxlans(self, port_index, vxlans):
437         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
438
439         port_index: the port for which VXLANs must be set
440         VXLANs: a  list of VNIs lists indexed by chain id
441         """
442         if len(vxlans) != self.config.service_chain_count:
443             raise TrafficClientException('VXLAN list %s must have %d entries' %
444                                          (vxlans, self.config.service_chain_count))
445         self.devices[port_index].set_vxlans(vxlans)
446
447     def set_vtep_vlan(self, port_index, vlan):
448         """Set the vtep vlan to use indexed by the chain id on given port.
449         port_index: the port for which VLAN must be set
450         """
451         self.devices[port_index].set_vtep_vlan(vlan)
452
453     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
454         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
455
456     @staticmethod
457     def __match_generator_profile(traffic_generator, generator_profile):
458         gen_config = AttrDict(traffic_generator)
459         gen_config.pop('default_profile')
460         gen_config.pop('generator_profile')
461         matching_profile = [profile for profile in traffic_generator.generator_profile if
462                             profile.name == generator_profile]
463         if len(matching_profile) != 1:
464             raise Exception('Traffic generator profile not found: ' + generator_profile)
465
466         gen_config.update(matching_profile[0])
467         return gen_config
468
469
470 class TrafficClient(object):
471     """Traffic generator client with NDR/PDR binary seearch."""
472
473     PORTS = [0, 1]
474
475     def __init__(self, config, notifier=None):
476         """Create a new TrafficClient instance.
477
478         config: nfvbench config
479         notifier: notifier (optional)
480
481         A new instance is created everytime the nfvbench config may have changed.
482         """
483         self.config = config
484         self.generator_config = GeneratorConfig(config)
485         self.tool = self.generator_config.tool
486         self.gen = self._get_generator()
487         self.notifier = notifier
488         self.interval_collector = None
489         self.iteration_collector = None
490         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
491         self.config.frame_sizes = self._get_frame_sizes()
492         self.run_config = {
493             'l2frame_size': None,
494             'duration_sec': self.config.duration_sec,
495             'bidirectional': True,
496             'rates': []  # to avoid unsbuscriptable-obj warning
497         }
498         self.current_total_rate = {'rate_percent': '10'}
499         if self.config.single_run:
500             self.current_total_rate = utils.parse_rate_str(self.config.rate)
501         self.ifstats = None
502         # Speed is either discovered when connecting to TG or set from config
503         # This variable is 0 if not yet discovered from TG or must be the speed of
504         # each interface in bits per second
505         self.intf_speed = self.generator_config.intf_speed
506
507     def _get_generator(self):
508         tool = self.tool.lower()
509         if tool == 'trex':
510             from traffic_gen import trex_gen
511             return trex_gen.TRex(self)
512         if tool == 'dummy':
513             from traffic_gen import dummy
514             return dummy.DummyTG(self)
515         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
516
517     def skip_sleep(self):
518         """Skip all sleeps when doing unit testing with dummy TG.
519
520         Must be overriden using mock.patch
521         """
522         return False
523
524     def _get_frame_sizes(self):
525         traffic_profile_name = self.config.traffic.profile
526         matching_profiles = [profile for profile in self.config.traffic_profile if
527                              profile.name == traffic_profile_name]
528         if len(matching_profiles) > 1:
529             raise TrafficClientException('Multiple traffic profiles with name: ' +
530                                          traffic_profile_name)
531         elif not matching_profiles:
532             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
533         return matching_profiles[0].l2frame_size
534
535     def start_traffic_generator(self):
536         """Start the traffic generator process (traffic not started yet)."""
537         self.gen.connect()
538         # pick up the interface speed if it is not set from config
539         intf_speeds = self.gen.get_port_speed_gbps()
540         # convert Gbps unit into bps
541         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
542         if self.intf_speed:
543             # interface speed is overriden from config
544             if self.intf_speed != tg_if_speed:
545                 # Warn the user if the speed in the config is different
546                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
547                             intf_speeds[0])
548         else:
549             # interface speed not provisioned by config
550             self.intf_speed = tg_if_speed
551             # also update the speed in the tg config
552             self.generator_config.intf_speed = tg_if_speed
553
554         # Save the traffic generator local MAC
555         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
556             device.set_mac(mac)
557
558     def setup(self):
559         """Set up the traffic client."""
560         self.gen.clear_stats()
561
562     def get_version(self):
563         """Get the traffic generator version."""
564         return self.gen.get_version()
565
566     def ensure_end_to_end(self):
567         """Ensure traffic generator receives packets it has transmitted.
568
569         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
570
571         VMs that are started and in active state may not pass traffic yet. It is imperative to make
572         sure that all VMs are passing traffic in both directions before starting any benchmarking.
573         To verify this, we need to send at a low frequency bi-directional packets and make sure
574         that we receive all packets back from all VMs. The number of flows is equal to 2 times
575         the number of chains (1 per direction) and we need to make sure we receive packets coming
576         from exactly 2 x chain count different source MAC addresses.
577
578         Example:
579             PVP chain (1 VM per chain)
580             N = 10 (number of chains)
581             Flow count = 20 (number of flows)
582             If the number of unique source MAC addresses from received packets is 20 then
583             all 10 VMs 10 VMs are in operational state.
584         """
585         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
586         # send 2pps on each chain and each direction
587         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
588         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
589                                 e2e=True)
590         # ensures enough traffic is coming back
591         retry_count = (self.config.check_traffic_time_sec +
592                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
593
594         # we expect to see packets coming from 2 unique MAC per chain
595         # because there can be flooding in the case of shared net
596         # we must verify that packets from the right VMs are received
597         # and not just count unique src MAC
598         # create a dict of (port, chain) tuples indexed by dest mac
599         mac_map = {}
600         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
601             for chain, mac in enumerate(dest_macs):
602                 mac_map[mac] = (port, chain)
603         unique_src_mac_count = len(mac_map)
604         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
605             get_mac_id = lambda packet: packet['binary'][60:66]
606         elif self.config.vxlan:
607             get_mac_id = lambda packet: packet['binary'][56:62]
608         else:
609             get_mac_id = lambda packet: packet['binary'][6:12]
610         for it in xrange(retry_count):
611             self.gen.clear_stats()
612             self.gen.start_traffic()
613             self.gen.start_capture()
614             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
615                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
616                      it + 1, retry_count)
617             if not self.skip_sleep():
618                 time.sleep(self.config.generic_poll_sec)
619             self.gen.stop_traffic()
620             self.gen.fetch_capture_packets()
621             self.gen.stop_capture()
622             for packet in self.gen.packet_list:
623                 mac_id = get_mac_id(packet)
624                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
625                 if src_mac in mac_map and self.is_udp(packet):
626                     port, chain = mac_map[src_mac]
627                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
628                              src_mac, chain, port)
629                     mac_map.pop(src_mac, None)
630
631                 if not mac_map:
632                     LOG.info('End-to-end connectivity established')
633                     return
634             if self.config.l3_router and not self.config.no_arp:
635                 # In case of L3 traffic mode, routers are not able to route traffic
636                 # until VM interfaces are up and ARP requests are done
637                 LOG.info('Waiting for loopback service completely started...')
638                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
639                 self.ensure_arp_successful()
640         raise TrafficClientException('End-to-end connectivity cannot be ensured')
641
642     def is_udp(self, packet):
643         pkt = Ether(packet['binary'])
644         return UDP in pkt
645
646     def ensure_arp_successful(self):
647         """Resolve all IP using ARP and throw an exception in case of failure."""
648         dest_macs = self.gen.resolve_arp()
649         if dest_macs:
650             # all dest macs are discovered, saved them into the generator config
651             if self.config.vxlan:
652                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
653                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
654             else:
655                 self.generator_config.set_dest_macs(0, dest_macs[0])
656                 self.generator_config.set_dest_macs(1, dest_macs[1])
657         else:
658             raise TrafficClientException('ARP cannot be resolved')
659
660     def set_traffic(self, frame_size, bidirectional):
661         """Reconfigure the traffic generator for a new frame size."""
662         self.run_config['bidirectional'] = bidirectional
663         self.run_config['l2frame_size'] = frame_size
664         self.run_config['rates'] = [self.get_per_direction_rate()]
665         if bidirectional:
666             self.run_config['rates'].append(self.get_per_direction_rate())
667         else:
668             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
669             if unidir_reverse_pps > 0:
670                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
671         # Fix for [NFVBENCH-67], convert the rate string to PPS
672         for idx, rate in enumerate(self.run_config['rates']):
673             if 'rate_pps' not in rate:
674                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
675
676         self.gen.clear_streamblock()
677         if not self.config.vxlan:
678             self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
679                                     latency=True)
680         else:
681             self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
682                                     latency=False)
683
684     def _modify_load(self, load):
685         self.current_total_rate = {'rate_percent': str(load)}
686         rate_per_direction = self.get_per_direction_rate()
687
688         self.gen.modify_rate(rate_per_direction, False)
689         self.run_config['rates'][0] = rate_per_direction
690         if self.run_config['bidirectional']:
691             self.gen.modify_rate(rate_per_direction, True)
692             self.run_config['rates'][1] = rate_per_direction
693
694     def get_ndr_and_pdr(self):
695         """Start the NDR/PDR iteration and return the results."""
696         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
697         targets = {}
698         if self.config.ndr_run:
699             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
700             targets['ndr'] = self.config.measurement.NDR
701         if self.config.pdr_run:
702             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
703             targets['pdr'] = self.config.measurement.PDR
704
705         self.run_config['start_time'] = time.time()
706         self.interval_collector = IntervalCollector(self.run_config['start_time'])
707         self.interval_collector.attach_notifier(self.notifier)
708         self.iteration_collector = IterationCollector(self.run_config['start_time'])
709         results = {}
710         self.__range_search(0.0, 200.0, targets, results)
711
712         results['iteration_stats'] = {
713             'ndr_pdr': self.iteration_collector.get()
714         }
715
716         if self.config.ndr_run:
717             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
718             results['ndr']['time_taken_sec'] = \
719                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
720             if self.config.pdr_run:
721                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
722                 results['pdr']['time_taken_sec'] = \
723                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
724         else:
725             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
726             results['pdr']['time_taken_sec'] = \
727                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
728         return results
729
730     def __get_dropped_rate(self, result):
731         dropped_pkts = result['rx']['dropped_pkts']
732         total_pkts = result['tx']['total_pkts']
733         if not total_pkts:
734             return float('inf')
735         return float(dropped_pkts) / total_pkts * 100
736
737     def get_stats(self):
738         """Collect final stats for previous run."""
739         stats = self.gen.get_stats()
740         retDict = {'total_tx_rate': stats['total_tx_rate']}
741         for port in self.PORTS:
742             retDict[port] = {'tx': {}, 'rx': {}}
743
744         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
745         rx_keys = tx_keys + ['dropped_pkts']
746
747         for port in self.PORTS:
748             for key in tx_keys:
749                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
750             for key in rx_keys:
751                 try:
752                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
753                 except ValueError:
754                     retDict[port]['rx'][key] = 0
755             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
756                 stats[port]['rx']['avg_delay_usec'])
757             retDict[port]['rx']['min_delay_usec'] = cast_integer(
758                 stats[port]['rx']['min_delay_usec'])
759             retDict[port]['rx']['max_delay_usec'] = cast_integer(
760                 stats[port]['rx']['max_delay_usec'])
761             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
762
763         ports = sorted(retDict.keys())
764         if self.run_config['bidirectional']:
765             retDict['overall'] = {'tx': {}, 'rx': {}}
766             for key in tx_keys:
767                 retDict['overall']['tx'][key] = \
768                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
769             for key in rx_keys:
770                 retDict['overall']['rx'][key] = \
771                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
772             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
773                           retDict[ports[1]]['rx']['total_pkts']]
774             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
775                           retDict[ports[1]]['rx']['avg_delay_usec']]
776             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
777                           retDict[ports[1]]['rx']['max_delay_usec']]
778             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
779                           retDict[ports[1]]['rx']['min_delay_usec']]
780             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
781             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
782             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
783             for key in ['pkt_bit_rate', 'pkt_rate']:
784                 for dirc in ['tx', 'rx']:
785                     retDict['overall'][dirc][key] /= 2.0
786         else:
787             retDict['overall'] = retDict[ports[0]]
788         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
789         return retDict
790
791     def __convert_rates(self, rate):
792         return utils.convert_rates(self.run_config['l2frame_size'],
793                                    rate,
794                                    self.intf_speed)
795
796     def __ndr_pdr_found(self, tag, load):
797         rates = self.__convert_rates({'rate_percent': load})
798         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
799         last_stats = self.iteration_collector.peek()
800         self.interval_collector.add_ndr_pdr(tag, last_stats)
801
802     def __format_output_stats(self, stats):
803         for key in self.PORTS + ['overall']:
804             interface = stats[key]
805             stats[key] = {
806                 'tx_pkts': interface['tx']['total_pkts'],
807                 'rx_pkts': interface['rx']['total_pkts'],
808                 'drop_percentage': interface['drop_rate_percent'],
809                 'drop_pct': interface['rx']['dropped_pkts'],
810                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
811                 'max_delay_usec': interface['rx']['max_delay_usec'],
812                 'min_delay_usec': interface['rx']['min_delay_usec'],
813             }
814
815         return stats
816
817     def __targets_found(self, rate, targets, results):
818         for tag, target in targets.iteritems():
819             LOG.info('Found %s (%s) load: %s', tag, target, rate)
820             self.__ndr_pdr_found(tag, rate)
821             results[tag]['timestamp_sec'] = time.time()
822
823     def __range_search(self, left, right, targets, results):
824         """Perform a binary search for a list of targets inside a [left..right] range or rate.
825
826         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
827                 indicating the rate to send on each interface
828         right   the right side of the range to search as a % of line rate
829                 indicating the rate to send on each interface
830         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
831                 ('ndr', 'pdr')
832         results a dict to store results
833         """
834         if not targets:
835             return
836         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
837
838         # Terminate search when gap is less than load epsilon
839         if right - left < self.config.measurement.load_epsilon:
840             self.__targets_found(left, targets, results)
841             return
842
843         # Obtain the average drop rate in for middle load
844         middle = (left + right) / 2.0
845         try:
846             stats, rates = self.__run_search_iteration(middle)
847         except STLError:
848             LOG.exception("Got exception from traffic generator during binary search")
849             self.__targets_found(left, targets, results)
850             return
851         # Split target dicts based on the avg drop rate
852         left_targets = {}
853         right_targets = {}
854         for tag, target in targets.iteritems():
855             if stats['overall']['drop_rate_percent'] <= target:
856                 # record the best possible rate found for this target
857                 results[tag] = rates
858                 results[tag].update({
859                     'load_percent_per_direction': middle,
860                     'stats': self.__format_output_stats(dict(stats)),
861                     'timestamp_sec': None
862                 })
863                 right_targets[tag] = target
864             else:
865                 # initialize to 0 all fields of result for
866                 # the worst case scenario of the binary search (if ndr/pdr is not found)
867                 if tag not in results:
868                     results[tag] = dict.fromkeys(rates, 0)
869                     empty_stats = self.__format_output_stats(dict(stats))
870                     for key in empty_stats:
871                         if isinstance(empty_stats[key], dict):
872                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
873                         else:
874                             empty_stats[key] = 0
875                     results[tag].update({
876                         'load_percent_per_direction': 0,
877                         'stats': empty_stats,
878                         'timestamp_sec': None
879                     })
880                 left_targets[tag] = target
881
882         # search lower half
883         self.__range_search(left, middle, left_targets, results)
884
885         # search upper half only if the upper rate does not exceed
886         # 100%, this only happens when the first search at 100%
887         # yields a DR that is < target DR
888         if middle >= 100:
889             self.__targets_found(100, right_targets, results)
890         else:
891             self.__range_search(middle, right, right_targets, results)
892
893     def __run_search_iteration(self, rate):
894         """Run one iteration at the given rate level.
895
896         rate: the rate to send on each port in percent (0 to 100)
897         """
898         self._modify_load(rate)
899
900         # poll interval stats and collect them
901         for stats in self.run_traffic():
902             self.interval_collector.add(stats)
903             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
904             if time_elapsed_ratio >= 1:
905                 self.cancel_traffic()
906                 if not self.skip_sleep():
907                     time.sleep(self.config.pause_sec)
908         self.interval_collector.reset()
909
910         # get stats from the run
911         stats = self.runner.client.get_stats()
912         current_traffic_config = self._get_traffic_config()
913         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
914                                         stats['total_tx_rate'])
915         if warning is not None:
916             stats['warning'] = warning
917
918         # save reliable stats from whole iteration
919         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
920         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
921         return stats, current_traffic_config['direction-total']
922
923     @staticmethod
924     def log_stats(stats):
925         """Log estimated stats during run."""
926         report = {
927             'datetime': str(datetime.now()),
928             'tx_packets': stats['overall']['tx']['total_pkts'],
929             'rx_packets': stats['overall']['rx']['total_pkts'],
930             'drop_packets': stats['overall']['rx']['dropped_pkts'],
931             'drop_rate_percent': stats['overall']['drop_rate_percent']
932         }
933         LOG.info('TX: %(tx_packets)d; '
934                  'RX: %(rx_packets)d; '
935                  'Est. Dropped: %(drop_packets)d; '
936                  'Est. Drop rate: %(drop_rate_percent).4f%%',
937                  report)
938
939     def run_traffic(self):
940         """Start traffic and return intermediate stats for each interval."""
941         stats = self.runner.run()
942         while self.runner.is_running:
943             self.log_stats(stats)
944             yield stats
945             stats = self.runner.poll_stats()
946             if stats is None:
947                 return
948         self.log_stats(stats)
949         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
950         yield stats
951
952     def cancel_traffic(self):
953         """Stop traffic."""
954         self.runner.stop()
955
956     def _get_traffic_config(self):
957         config = {}
958         load_total = 0.0
959         bps_total = 0.0
960         pps_total = 0.0
961         for idx, rate in enumerate(self.run_config['rates']):
962             key = 'direction-forward' if idx == 0 else 'direction-reverse'
963             config[key] = {
964                 'l2frame_size': self.run_config['l2frame_size'],
965                 'duration_sec': self.run_config['duration_sec']
966             }
967             config[key].update(rate)
968             config[key].update(self.__convert_rates(rate))
969             load_total += float(config[key]['rate_percent'])
970             bps_total += float(config[key]['rate_bps'])
971             pps_total += float(config[key]['rate_pps'])
972         config['direction-total'] = dict(config['direction-forward'])
973         config['direction-total'].update({
974             'rate_percent': load_total,
975             'rate_pps': cast_integer(pps_total),
976             'rate_bps': bps_total
977         })
978
979         return config
980
981     def get_run_config(self, results):
982         """Return configuration which was used for the last run."""
983         r = {}
984         # because we want each direction to have the far end RX rates,
985         # use the far end index (1-idx) to retrieve the RX rates
986         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
987             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
988             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
989             r[key] = {
990                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
991                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
992                 "rx": self.__convert_rates({'rate_pps': rx_rate})
993             }
994
995         total = {}
996         for direction in ['orig', 'tx', 'rx']:
997             total[direction] = {}
998             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
999                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
1000
1001         r['direction-total'] = total
1002         return r
1003
1004     def insert_interface_stats(self, pps_list):
1005         """Insert interface stats to a list of packet path stats.
1006
1007         pps_list: a list of packet path stats instances indexed by chain index
1008
1009         This function will insert the packet path stats for the traffic gen ports 0 and 1
1010         with itemized per chain tx/rx counters.
1011         There will be as many packet path stats as chains.
1012         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1013         self.pps_list:
1014         [
1015         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1016         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1017         ...
1018         ]
1019         """
1020         def get_if_stats(chain_idx):
1021             return [InterfaceStats('p' + str(port), self.tool)
1022                     for port in range(2)]
1023         # keep the list of list of interface stats indexed by the chain id
1024         self.ifstats = [get_if_stats(chain_idx)
1025                         for chain_idx in range(self.config.service_chain_count)]
1026         # note that we need to make a copy of the ifs list so that any modification in the
1027         # list from pps will not change the list saved in self.ifstats
1028         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1029         # insert the corresponding pps in the passed list
1030         pps_list.extend(self.pps_list)
1031
1032     def update_interface_stats(self, diff=False):
1033         """Update all interface stats.
1034
1035         diff: if False, simply refresh the interface stats values with latest values
1036               if True, diff the interface stats with the latest values
1037         Make sure that the interface stats inserted in insert_interface_stats() are updated
1038         with proper values.
1039         self.ifstats:
1040         [
1041         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1042         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1043         ...
1044         ]
1045         """
1046         if diff:
1047             stats = self.gen.get_stats()
1048             for chain_idx, ifs in enumerate(self.ifstats):
1049                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1050                 # corresponding to the
1051                 # port 0 and port 1 for the given chain_idx
1052                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1053                 # interface stats for the pps because it could have been modified to contain
1054                 # additional interface stats
1055                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1056             # special handling for vxlan
1057             # in case of vxlan, flow stats are not available so all rx counters will be
1058             # zeros when the total rx port counter is non zero.
1059             # in that case,
1060             for port in range(2):
1061                 total_rx = 0
1062                 for ifs in self.ifstats:
1063                     total_rx += ifs[port].rx
1064                 if total_rx == 0:
1065                     # check if the total port rx from Trex is also zero
1066                     port_rx = stats[port]['rx']['total_pkts']
1067                     if port_rx:
1068                         # the total rx for all chains from port level stats is non zero
1069                         # which means that the per-chain stats are not available
1070                         if len(self.ifstats) == 1:
1071                             # only one chain, simply report the port level rx to the chain rx stats
1072                             self.ifstats[0][port].rx = port_rx
1073                         else:
1074                             for ifs in self.ifstats:
1075                                 # mark this data as unavailable
1076                                 ifs[port].rx = None
1077                             # pitch in the total rx only in the last chain pps
1078                             self.ifstats[-1][port].rx_total = port_rx
1079
1080     @staticmethod
1081     def compare_tx_rates(required, actual):
1082         """Compare the actual TX rate to the required TX rate."""
1083         threshold = 0.9
1084         are_different = False
1085         try:
1086             if float(actual) / required < threshold:
1087                 are_different = True
1088         except ZeroDivisionError:
1089             are_different = True
1090
1091         if are_different:
1092             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1093                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1094                   "to achieve the requested TX rate.".format(r=required, a=actual)
1095             LOG.info(msg)
1096             return msg
1097
1098         return None
1099
1100     def get_per_direction_rate(self):
1101         """Get the rate for each direction."""
1102         divisor = 2 if self.run_config['bidirectional'] else 1
1103         if 'rate_percent' in self.current_total_rate:
1104             # don't split rate if it's percentage
1105             divisor = 1
1106
1107         return utils.divide_rate(self.current_total_rate, divisor)
1108
1109     def close(self):
1110         """Close this instance."""
1111         try:
1112             self.gen.stop_traffic()
1113         except Exception:
1114             pass
1115         self.gen.clear_stats()
1116         self.gen.cleanup()