NFVBENCH-155 Add options to disable extra stats, latency stats and latency streams
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: enable=import-error
30
31 from log import LOG
32 from packet_stats import InterfaceStats
33 from packet_stats import PacketPathStats
34 from stats_collector import IntervalCollector
35 from stats_collector import IterationCollector
36 import traffic_gen.traffic_utils as utils
37 from utils import cast_integer
38
39
40 class TrafficClientException(Exception):
41     """Generic traffic client exception."""
42
43     pass
44
45
46 class TrafficRunner(object):
47     """Serialize various steps required to run traffic."""
48
49     def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
50         """Create a traffic runner."""
51         self.client = client
52         self.start_time = None
53         self.duration_sec = duration_sec
54         self.interval_sec = interval_sec
55         self.service_mode = service_mode
56
57     def run(self):
58         """Clear stats and instruct the traffic generator to start generating traffic."""
59         if self.is_running():
60             return None
61         LOG.info('Running traffic generator')
62         self.client.gen.clear_stats()
63         # Debug use only : new '--service-mode' option available for the NFVBench command line.
64         # A read-only mode TRex console would be able to capture the generated traffic.
65         self.client.gen.set_service_mode(enabled=self.service_mode)
66         LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
67         self.client.gen.start_traffic()
68         self.start_time = time.time()
69         return self.poll_stats()
70
71     def stop(self):
72         """Stop the current run and instruct the traffic generator to stop traffic."""
73         if self.is_running():
74             self.start_time = None
75             self.client.gen.stop_traffic()
76
77     def is_running(self):
78         """Check if a run is still pending."""
79         return self.start_time is not None
80
81     def time_elapsed(self):
82         """Return time elapsed since start of run."""
83         if self.is_running():
84             return time.time() - self.start_time
85         return self.duration_sec
86
87     def poll_stats(self):
88         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
89
90         return: latest stats or None if traffic is stopped
91         """
92         if not self.is_running():
93             return None
94         if self.client.skip_sleep():
95             self.stop()
96             return self.client.get_stats()
97         time_elapsed = self.time_elapsed()
98         if time_elapsed > self.duration_sec:
99             self.stop()
100             return None
101         time_left = self.duration_sec - time_elapsed
102         if self.interval_sec > 0.0:
103             if time_left <= self.interval_sec:
104                 time.sleep(time_left)
105                 self.stop()
106             else:
107                 time.sleep(self.interval_sec)
108         else:
109             time.sleep(self.duration_sec)
110             self.stop()
111         return self.client.get_stats()
112
113
114 class IpBlock(object):
115     """Manage a block of IP addresses."""
116
117     def __init__(self, base_ip, step_ip, count_ip):
118         """Create an IP block."""
119         self.base_ip_int = Device.ip_to_int(base_ip)
120         self.step = Device.ip_to_int(step_ip)
121         self.max_available = count_ip
122         self.next_free = 0
123
124     def get_ip(self, index=0):
125         """Return the IP address at given index."""
126         if index < 0 or index >= self.max_available:
127             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
128         return Device.int_to_ip(self.base_ip_int + index * self.step)
129
130     def reserve_ip_range(self, count):
131         """Reserve a range of count consecutive IP addresses spaced by step."""
132         if self.next_free + count > self.max_available:
133             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
134                              (self.next_free,
135                               self.max_available,
136                               count))
137         first_ip = self.get_ip(self.next_free)
138         last_ip = self.get_ip(self.next_free + count - 1)
139         self.next_free += count
140         return (first_ip, last_ip)
141
142     def reset_reservation(self):
143         """Reset all reservations and restart with a completely unused IP block."""
144         self.next_free = 0
145
146
147 class Device(object):
148     """Represent a port device and all information associated to it.
149
150     In the curent version we only support 2 port devices for the traffic generator
151     identified as port 0 or port 1.
152     """
153
154     def __init__(self, port, generator_config):
155         """Create a new device for a given port."""
156         self.generator_config = generator_config
157         self.chain_count = generator_config.service_chain_count
158         self.flow_count = generator_config.flow_count / 2
159         self.port = port
160         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
161         self.vtep_vlan = None
162         self.vtep_src_mac = None
163         self.vxlan = False
164         self.pci = generator_config.interfaces[port].pci
165         self.mac = None
166         self.dest_macs = None
167         self.vtep_dst_mac = None
168         self.vtep_dst_ip = None
169         if generator_config.vteps is None:
170             self.vtep_src_ip = None
171         else:
172             self.vtep_src_ip = generator_config.vteps[port]
173         self.vnis = None
174         self.vlans = None
175         self.ip_addrs = generator_config.ip_addrs[port]
176         subnet = IPNetwork(self.ip_addrs)
177         self.ip = subnet.ip.format()
178         self.ip_addrs_step = generator_config.ip_addrs_step
179         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
180         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
181                                    generator_config.gateway_ip_addrs_step,
182                                    self.chain_count)
183         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
184         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
185                                       generator_config.tg_gateway_ip_addrs_step,
186                                       self.chain_count)
187         self.udp_src_port = generator_config.udp_src_port
188         self.udp_dst_port = generator_config.udp_dst_port
189
190     def set_mac(self, mac):
191         """Set the local MAC for this port device."""
192         if mac is None:
193             raise TrafficClientException('Trying to set traffic generator MAC address as None')
194         self.mac = mac
195
196     def get_peer_device(self):
197         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
198         return self.generator_config.devices[1 - self.port]
199
200     def set_vtep_dst_mac(self, dest_macs):
201         """Set the list of dest MACs indexed by the chain id.
202
203         This is only called in 2 cases:
204         - VM macs discovered using openstack API
205         - dest MACs provisioned in config file
206         """
207         self.vtep_dst_mac = map(str, dest_macs)
208
209     def set_dest_macs(self, dest_macs):
210         """Set the list of dest MACs indexed by the chain id.
211
212         This is only called in 2 cases:
213         - VM macs discovered using openstack API
214         - dest MACs provisioned in config file
215         """
216         self.dest_macs = map(str, dest_macs)
217
218     def get_dest_macs(self):
219         """Get the list of dest macs for this device.
220
221         If set_dest_macs was never called, assumes l2-loopback and return
222         a list of peer mac (as many as chains but normally only 1 chain)
223         """
224         if self.dest_macs:
225             return self.dest_macs
226         # assume this is l2-loopback
227         return [self.get_peer_device().mac] * self.chain_count
228
229     def set_vlans(self, vlans):
230         """Set the list of vlans to use indexed by the chain id."""
231         self.vlans = vlans
232         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
233
234     def set_vtep_vlan(self, vlan):
235         """Set the vtep vlan to use indexed by specific port."""
236         self.vtep_vlan = vlan
237         self.vxlan = True
238         self.vlan_tagging = None
239         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
240
241     def set_vxlan_endpoints(self, src_ip, dst_ip):
242         self.vtep_dst_ip = dst_ip
243         self.vtep_src_ip = src_ip
244         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
245                  self.vtep_src_ip, self.vtep_dst_ip)
246
247     def set_vxlans(self, vnis):
248         self.vnis = vnis
249         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
250
251     def set_gw_ip(self, gateway_ip):
252         self.gw_ip_block = IpBlock(gateway_ip,
253                                    self.generator_config.gateway_ip_addrs_step,
254                                    self.chain_count)
255
256     def get_gw_ip(self, chain_index):
257         """Retrieve the IP address assigned for the gateway of a given chain."""
258         return self.gw_ip_block.get_ip(chain_index)
259
260     def get_stream_configs(self):
261         """Get the stream config for a given chain on this device.
262
263         Called by the traffic generator driver to program the traffic generator properly
264         before generating traffic
265         """
266         configs = []
267         # exact flow count for each chain is calculated as follows:
268         # - all chains except the first will have the same flow count
269         #   calculated as (total_flows + chain_count - 1) / chain_count
270         # - the first chain will have the remainder
271         # example 11 flows and 3 chains => 3, 4, 4
272         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
273         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
274         peer = self.get_peer_device()
275         self.ip_block.reset_reservation()
276         peer.ip_block.reset_reservation()
277         dest_macs = self.get_dest_macs()
278
279         for chain_idx in xrange(self.chain_count):
280             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
281             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
282
283             configs.append({
284                 'count': cur_chain_flow_count,
285                 'mac_src': self.mac,
286                 'mac_dst': dest_macs[chain_idx],
287                 'ip_src_addr': src_ip_first,
288                 'ip_src_addr_max': src_ip_last,
289                 'ip_src_count': cur_chain_flow_count,
290                 'ip_dst_addr': dst_ip_first,
291                 'ip_dst_addr_max': dst_ip_last,
292                 'ip_dst_count': cur_chain_flow_count,
293                 'ip_addrs_step': self.ip_addrs_step,
294                 'udp_src_port': self.udp_src_port,
295                 'udp_dst_port': self.udp_dst_port,
296                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
297                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
298                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
299                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
300                 'vxlan': self.vxlan,
301                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
302                 'vtep_src_mac': self.mac if self.vxlan is True else None,
303                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
304                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
305                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
306                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
307             })
308             # after first chain, fall back to the flow count for all other chains
309             cur_chain_flow_count = flows_per_chain
310         return configs
311
312     @staticmethod
313     def ip_to_int(addr):
314         """Convert an IP address from string to numeric."""
315         return struct.unpack("!I", socket.inet_aton(addr))[0]
316
317     @staticmethod
318     def int_to_ip(nvalue):
319         """Convert an IP address from numeric to string."""
320         return socket.inet_ntoa(struct.pack("!I", nvalue))
321
322
323 class GeneratorConfig(object):
324     """Represents traffic configuration for currently running traffic profile."""
325
326     DEFAULT_IP_STEP = '0.0.0.1'
327     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
328
329     def __init__(self, config):
330         """Create a generator config."""
331         self.config = config
332         # name of the generator profile (normally trex or dummy)
333         # pick the default one if not specified explicitly from cli options
334         if not config.generator_profile:
335             config.generator_profile = config.traffic_generator.default_profile
336         # pick up the profile dict based on the name
337         gen_config = self.__match_generator_profile(config.traffic_generator,
338                                                     config.generator_profile)
339         self.gen_config = gen_config
340         # copy over fields from the dict
341         self.tool = gen_config.tool
342         self.ip = gen_config.ip
343         # overrides on config.cores and config.mbuf_factor
344         if config.cores:
345             self.cores = config.cores
346         else:
347             self.cores = gen_config.get('cores', 1)
348         self.mbuf_factor = config.mbuf_factor
349         self.mbuf_64 = config.mbuf_64
350         self.hdrh = not config.disable_hdrh
351         if gen_config.intf_speed:
352             # interface speed is overriden from config
353             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
354         else:
355             # interface speed is discovered/provided by the traffic generator
356             self.intf_speed = 0
357         self.name = gen_config.name
358         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
359         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
360         self.limit_memory = gen_config.get('limit_memory', 1024)
361         self.software_mode = gen_config.get('software_mode', False)
362         self.interfaces = gen_config.interfaces
363         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
364             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
365         self.service_chain = config.service_chain
366         self.service_chain_count = config.service_chain_count
367         self.flow_count = config.flow_count
368         self.host_name = gen_config.host_name
369
370         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
371         self.ip_addrs = gen_config.ip_addrs
372         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
373         self.tg_gateway_ip_addrs_step = \
374             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
375         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
376         self.gateway_ips = gen_config.gateway_ip_addrs
377         self.udp_src_port = gen_config.udp_src_port
378         self.udp_dst_port = gen_config.udp_dst_port
379         self.vteps = gen_config.get('vteps')
380         self.devices = [Device(port, self) for port in [0, 1]]
381         # This should normally always be [0, 1]
382         self.ports = [device.port for device in self.devices]
383
384         # check that pci is not empty
385         if not gen_config.interfaces[0].get('pci', None) or \
386            not gen_config.interfaces[1].get('pci', None):
387             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
388
389         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
390         self.vlan_tagging = config.vlan_tagging
391
392         # needed for result/summarizer
393         config['tg-name'] = gen_config.name
394         config['tg-tool'] = self.tool
395
396     def to_json(self):
397         """Get json form to display the content into the overall result dict."""
398         return dict(self.gen_config)
399
400     def set_dest_macs(self, port_index, dest_macs):
401         """Set the list of dest MACs indexed by the chain id on given port.
402
403         port_index: the port for which dest macs must be set
404         dest_macs: a list of dest MACs indexed by chain id
405         """
406         if len(dest_macs) < self.config.service_chain_count:
407             raise TrafficClientException('Dest MAC list %s must have %d entries' %
408                                          (dest_macs, self.config.service_chain_count))
409         # only pass the first scc dest MACs
410         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
411         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
412
413     def set_vtep_dest_macs(self, port_index, dest_macs):
414         """Set the list of dest MACs indexed by the chain id on given port.
415
416         port_index: the port for which dest macs must be set
417         dest_macs: a list of dest MACs indexed by chain id
418         """
419         if len(dest_macs) != self.config.service_chain_count:
420             raise TrafficClientException('Dest MAC list %s must have %d entries' %
421                                          (dest_macs, self.config.service_chain_count))
422         self.devices[port_index].set_vtep_dst_mac(dest_macs)
423         LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
424
425     def get_dest_macs(self):
426         """Return the list of dest macs indexed by port."""
427         return [dev.get_dest_macs() for dev in self.devices]
428
429     def set_vlans(self, port_index, vlans):
430         """Set the list of vlans to use indexed by the chain id on given port.
431
432         port_index: the port for which VLANs must be set
433         vlans: a  list of vlan lists indexed by chain id
434         """
435         if len(vlans) != self.config.service_chain_count:
436             raise TrafficClientException('VLAN list %s must have %d entries' %
437                                          (vlans, self.config.service_chain_count))
438         self.devices[port_index].set_vlans(vlans)
439
440     def set_vxlans(self, port_index, vxlans):
441         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
442
443         port_index: the port for which VXLANs must be set
444         VXLANs: a  list of VNIs lists indexed by chain id
445         """
446         if len(vxlans) != self.config.service_chain_count:
447             raise TrafficClientException('VXLAN list %s must have %d entries' %
448                                          (vxlans, self.config.service_chain_count))
449         self.devices[port_index].set_vxlans(vxlans)
450
451     def set_vtep_vlan(self, port_index, vlan):
452         """Set the vtep vlan to use indexed by the chain id on given port.
453         port_index: the port for which VLAN must be set
454         """
455         self.devices[port_index].set_vtep_vlan(vlan)
456
457     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
458         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
459
460     @staticmethod
461     def __match_generator_profile(traffic_generator, generator_profile):
462         gen_config = AttrDict(traffic_generator)
463         gen_config.pop('default_profile')
464         gen_config.pop('generator_profile')
465         matching_profile = [profile for profile in traffic_generator.generator_profile if
466                             profile.name == generator_profile]
467         if len(matching_profile) != 1:
468             raise Exception('Traffic generator profile not found: ' + generator_profile)
469
470         gen_config.update(matching_profile[0])
471         return gen_config
472
473
474 class TrafficClient(object):
475     """Traffic generator client with NDR/PDR binary seearch."""
476
477     PORTS = [0, 1]
478
479     def __init__(self, config, notifier=None):
480         """Create a new TrafficClient instance.
481
482         config: nfvbench config
483         notifier: notifier (optional)
484
485         A new instance is created everytime the nfvbench config may have changed.
486         """
487         self.config = config
488         self.generator_config = GeneratorConfig(config)
489         self.tool = self.generator_config.tool
490         self.gen = self._get_generator()
491         self.notifier = notifier
492         self.interval_collector = None
493         self.iteration_collector = None
494         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
495                                     self.config.service_mode)
496         self.config.frame_sizes = self._get_frame_sizes()
497         self.run_config = {
498             'l2frame_size': None,
499             'duration_sec': self.config.duration_sec,
500             'bidirectional': True,
501             'rates': []  # to avoid unsbuscriptable-obj warning
502         }
503         self.current_total_rate = {'rate_percent': '10'}
504         if self.config.single_run:
505             self.current_total_rate = utils.parse_rate_str(self.config.rate)
506         self.ifstats = None
507         # Speed is either discovered when connecting to TG or set from config
508         # This variable is 0 if not yet discovered from TG or must be the speed of
509         # each interface in bits per second
510         self.intf_speed = self.generator_config.intf_speed
511
512     def _get_generator(self):
513         tool = self.tool.lower()
514         if tool == 'trex':
515             from traffic_gen import trex_gen
516             return trex_gen.TRex(self)
517         if tool == 'dummy':
518             from traffic_gen import dummy
519             return dummy.DummyTG(self)
520         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
521
522     def skip_sleep(self):
523         """Skip all sleeps when doing unit testing with dummy TG.
524
525         Must be overriden using mock.patch
526         """
527         return False
528
529     def _get_frame_sizes(self):
530         traffic_profile_name = self.config.traffic.profile
531         matching_profiles = [profile for profile in self.config.traffic_profile if
532                              profile.name == traffic_profile_name]
533         if len(matching_profiles) > 1:
534             raise TrafficClientException('Multiple traffic profiles with name: ' +
535                                          traffic_profile_name)
536         elif not matching_profiles:
537             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
538         return matching_profiles[0].l2frame_size
539
540     def start_traffic_generator(self):
541         """Start the traffic generator process (traffic not started yet)."""
542         self.gen.connect()
543         # pick up the interface speed if it is not set from config
544         intf_speeds = self.gen.get_port_speed_gbps()
545         # convert Gbps unit into bps
546         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
547         if self.intf_speed:
548             # interface speed is overriden from config
549             if self.intf_speed != tg_if_speed:
550                 # Warn the user if the speed in the config is different
551                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
552                             intf_speeds[0])
553         else:
554             # interface speed not provisioned by config
555             self.intf_speed = tg_if_speed
556             # also update the speed in the tg config
557             self.generator_config.intf_speed = tg_if_speed
558
559         # Save the traffic generator local MAC
560         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
561             device.set_mac(mac)
562
563     def setup(self):
564         """Set up the traffic client."""
565         self.gen.clear_stats()
566
567     def get_version(self):
568         """Get the traffic generator version."""
569         return self.gen.get_version()
570
571     def ensure_end_to_end(self):
572         """Ensure traffic generator receives packets it has transmitted.
573
574         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
575
576         VMs that are started and in active state may not pass traffic yet. It is imperative to make
577         sure that all VMs are passing traffic in both directions before starting any benchmarking.
578         To verify this, we need to send at a low frequency bi-directional packets and make sure
579         that we receive all packets back from all VMs. The number of flows is equal to 2 times
580         the number of chains (1 per direction) and we need to make sure we receive packets coming
581         from exactly 2 x chain count different source MAC addresses.
582
583         Example:
584             PVP chain (1 VM per chain)
585             N = 10 (number of chains)
586             Flow count = 20 (number of flows)
587             If the number of unique source MAC addresses from received packets is 20 then
588             all 10 VMs 10 VMs are in operational state.
589         """
590         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
591         # send 2pps on each chain and each direction
592         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
593         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
594                                 e2e=True)
595         # ensures enough traffic is coming back
596         retry_count = (self.config.check_traffic_time_sec +
597                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
598
599         # we expect to see packets coming from 2 unique MAC per chain
600         # because there can be flooding in the case of shared net
601         # we must verify that packets from the right VMs are received
602         # and not just count unique src MAC
603         # create a dict of (port, chain) tuples indexed by dest mac
604         mac_map = {}
605         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
606             for chain, mac in enumerate(dest_macs):
607                 mac_map[mac] = (port, chain)
608         unique_src_mac_count = len(mac_map)
609         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
610             get_mac_id = lambda packet: packet['binary'][60:66]
611         elif self.config.vxlan:
612             get_mac_id = lambda packet: packet['binary'][56:62]
613         else:
614             get_mac_id = lambda packet: packet['binary'][6:12]
615         for it in xrange(retry_count):
616             self.gen.clear_stats()
617             self.gen.start_traffic()
618             self.gen.start_capture()
619             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
620                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
621                      it + 1, retry_count)
622             if not self.skip_sleep():
623                 time.sleep(self.config.generic_poll_sec)
624             self.gen.stop_traffic()
625             self.gen.fetch_capture_packets()
626             self.gen.stop_capture()
627             for packet in self.gen.packet_list:
628                 mac_id = get_mac_id(packet)
629                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
630                 if src_mac in mac_map and self.is_udp(packet):
631                     port, chain = mac_map[src_mac]
632                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
633                              src_mac, chain, port)
634                     mac_map.pop(src_mac, None)
635
636                 if not mac_map:
637                     LOG.info('End-to-end connectivity established')
638                     return
639             if self.config.l3_router and not self.config.no_arp:
640                 # In case of L3 traffic mode, routers are not able to route traffic
641                 # until VM interfaces are up and ARP requests are done
642                 LOG.info('Waiting for loopback service completely started...')
643                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
644                 self.ensure_arp_successful()
645         raise TrafficClientException('End-to-end connectivity cannot be ensured')
646
647     def is_udp(self, packet):
648         pkt = Ether(packet['binary'])
649         return UDP in pkt
650
651     def ensure_arp_successful(self):
652         """Resolve all IP using ARP and throw an exception in case of failure."""
653         dest_macs = self.gen.resolve_arp()
654         if dest_macs:
655             # all dest macs are discovered, saved them into the generator config
656             if self.config.vxlan:
657                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
658                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
659             else:
660                 self.generator_config.set_dest_macs(0, dest_macs[0])
661                 self.generator_config.set_dest_macs(1, dest_macs[1])
662         else:
663             raise TrafficClientException('ARP cannot be resolved')
664
665     def set_traffic(self, frame_size, bidirectional):
666         """Reconfigure the traffic generator for a new frame size."""
667         self.run_config['bidirectional'] = bidirectional
668         self.run_config['l2frame_size'] = frame_size
669         self.run_config['rates'] = [self.get_per_direction_rate()]
670         if bidirectional:
671             self.run_config['rates'].append(self.get_per_direction_rate())
672         else:
673             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
674             if unidir_reverse_pps > 0:
675                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
676         # Fix for [NFVBENCH-67], convert the rate string to PPS
677         for idx, rate in enumerate(self.run_config['rates']):
678             if 'rate_pps' not in rate:
679                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
680
681         self.gen.clear_streamblock()
682         if self.config.no_latency_streams:
683             LOG.info("Latency streams are disabled")
684         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
685                                 latency=not self.config.no_latency_streams)
686
687     def _modify_load(self, load):
688         self.current_total_rate = {'rate_percent': str(load)}
689         rate_per_direction = self.get_per_direction_rate()
690
691         self.gen.modify_rate(rate_per_direction, False)
692         self.run_config['rates'][0] = rate_per_direction
693         if self.run_config['bidirectional']:
694             self.gen.modify_rate(rate_per_direction, True)
695             self.run_config['rates'][1] = rate_per_direction
696
697     def get_ndr_and_pdr(self):
698         """Start the NDR/PDR iteration and return the results."""
699         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
700         targets = {}
701         if self.config.ndr_run:
702             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
703             targets['ndr'] = self.config.measurement.NDR
704         if self.config.pdr_run:
705             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
706             targets['pdr'] = self.config.measurement.PDR
707
708         self.run_config['start_time'] = time.time()
709         self.interval_collector = IntervalCollector(self.run_config['start_time'])
710         self.interval_collector.attach_notifier(self.notifier)
711         self.iteration_collector = IterationCollector(self.run_config['start_time'])
712         results = {}
713         self.__range_search(0.0, 200.0, targets, results)
714
715         results['iteration_stats'] = {
716             'ndr_pdr': self.iteration_collector.get()
717         }
718
719         if self.config.ndr_run:
720             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
721             results['ndr']['time_taken_sec'] = \
722                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
723             if self.config.pdr_run:
724                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
725                 results['pdr']['time_taken_sec'] = \
726                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
727         else:
728             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
729             results['pdr']['time_taken_sec'] = \
730                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
731         return results
732
733     def __get_dropped_rate(self, result):
734         dropped_pkts = result['rx']['dropped_pkts']
735         total_pkts = result['tx']['total_pkts']
736         if not total_pkts:
737             return float('inf')
738         return float(dropped_pkts) / total_pkts * 100
739
740     def get_stats(self):
741         """Collect final stats for previous run."""
742         stats = self.gen.get_stats()
743         retDict = {'total_tx_rate': stats['total_tx_rate']}
744         for port in self.PORTS:
745             retDict[port] = {'tx': {}, 'rx': {}}
746
747         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
748         rx_keys = tx_keys + ['dropped_pkts']
749
750         for port in self.PORTS:
751             for key in tx_keys:
752                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
753             for key in rx_keys:
754                 try:
755                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
756                 except ValueError:
757                     retDict[port]['rx'][key] = 0
758             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
759                 stats[port]['rx']['avg_delay_usec'])
760             retDict[port]['rx']['min_delay_usec'] = cast_integer(
761                 stats[port]['rx']['min_delay_usec'])
762             retDict[port]['rx']['max_delay_usec'] = cast_integer(
763                 stats[port]['rx']['max_delay_usec'])
764             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
765
766         ports = sorted(retDict.keys())
767         if self.run_config['bidirectional']:
768             retDict['overall'] = {'tx': {}, 'rx': {}}
769             for key in tx_keys:
770                 retDict['overall']['tx'][key] = \
771                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
772             for key in rx_keys:
773                 retDict['overall']['rx'][key] = \
774                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
775             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
776                           retDict[ports[1]]['rx']['total_pkts']]
777             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
778                           retDict[ports[1]]['rx']['avg_delay_usec']]
779             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
780                           retDict[ports[1]]['rx']['max_delay_usec']]
781             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
782                           retDict[ports[1]]['rx']['min_delay_usec']]
783             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
784             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
785             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
786             for key in ['pkt_bit_rate', 'pkt_rate']:
787                 for dirc in ['tx', 'rx']:
788                     retDict['overall'][dirc][key] /= 2.0
789         else:
790             retDict['overall'] = retDict[ports[0]]
791         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
792         return retDict
793
794     def __convert_rates(self, rate):
795         return utils.convert_rates(self.run_config['l2frame_size'],
796                                    rate,
797                                    self.intf_speed)
798
799     def __ndr_pdr_found(self, tag, load):
800         rates = self.__convert_rates({'rate_percent': load})
801         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
802         last_stats = self.iteration_collector.peek()
803         self.interval_collector.add_ndr_pdr(tag, last_stats)
804
805     def __format_output_stats(self, stats):
806         for key in self.PORTS + ['overall']:
807             interface = stats[key]
808             stats[key] = {
809                 'tx_pkts': interface['tx']['total_pkts'],
810                 'rx_pkts': interface['rx']['total_pkts'],
811                 'drop_percentage': interface['drop_rate_percent'],
812                 'drop_pct': interface['rx']['dropped_pkts'],
813                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
814                 'max_delay_usec': interface['rx']['max_delay_usec'],
815                 'min_delay_usec': interface['rx']['min_delay_usec'],
816             }
817
818         return stats
819
820     def __targets_found(self, rate, targets, results):
821         for tag, target in targets.iteritems():
822             LOG.info('Found %s (%s) load: %s', tag, target, rate)
823             self.__ndr_pdr_found(tag, rate)
824             results[tag]['timestamp_sec'] = time.time()
825
826     def __range_search(self, left, right, targets, results):
827         """Perform a binary search for a list of targets inside a [left..right] range or rate.
828
829         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
830                 indicating the rate to send on each interface
831         right   the right side of the range to search as a % of line rate
832                 indicating the rate to send on each interface
833         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
834                 ('ndr', 'pdr')
835         results a dict to store results
836         """
837         if not targets:
838             return
839         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
840
841         # Terminate search when gap is less than load epsilon
842         if right - left < self.config.measurement.load_epsilon:
843             self.__targets_found(left, targets, results)
844             return
845
846         # Obtain the average drop rate in for middle load
847         middle = (left + right) / 2.0
848         try:
849             stats, rates = self.__run_search_iteration(middle)
850         except STLError:
851             LOG.exception("Got exception from traffic generator during binary search")
852             self.__targets_found(left, targets, results)
853             return
854         # Split target dicts based on the avg drop rate
855         left_targets = {}
856         right_targets = {}
857         for tag, target in targets.iteritems():
858             if stats['overall']['drop_rate_percent'] <= target:
859                 # record the best possible rate found for this target
860                 results[tag] = rates
861                 results[tag].update({
862                     'load_percent_per_direction': middle,
863                     'stats': self.__format_output_stats(dict(stats)),
864                     'timestamp_sec': None
865                 })
866                 right_targets[tag] = target
867             else:
868                 # initialize to 0 all fields of result for
869                 # the worst case scenario of the binary search (if ndr/pdr is not found)
870                 if tag not in results:
871                     results[tag] = dict.fromkeys(rates, 0)
872                     empty_stats = self.__format_output_stats(dict(stats))
873                     for key in empty_stats:
874                         if isinstance(empty_stats[key], dict):
875                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
876                         else:
877                             empty_stats[key] = 0
878                     results[tag].update({
879                         'load_percent_per_direction': 0,
880                         'stats': empty_stats,
881                         'timestamp_sec': None
882                     })
883                 left_targets[tag] = target
884
885         # search lower half
886         self.__range_search(left, middle, left_targets, results)
887
888         # search upper half only if the upper rate does not exceed
889         # 100%, this only happens when the first search at 100%
890         # yields a DR that is < target DR
891         if middle >= 100:
892             self.__targets_found(100, right_targets, results)
893         else:
894             self.__range_search(middle, right, right_targets, results)
895
896     def __run_search_iteration(self, rate):
897         """Run one iteration at the given rate level.
898
899         rate: the rate to send on each port in percent (0 to 100)
900         """
901         self._modify_load(rate)
902
903         # poll interval stats and collect them
904         for stats in self.run_traffic():
905             self.interval_collector.add(stats)
906             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
907             if time_elapsed_ratio >= 1:
908                 self.cancel_traffic()
909                 if not self.skip_sleep():
910                     time.sleep(self.config.pause_sec)
911         self.interval_collector.reset()
912
913         # get stats from the run
914         stats = self.runner.client.get_stats()
915         current_traffic_config = self._get_traffic_config()
916         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
917                                         stats['total_tx_rate'])
918         if warning is not None:
919             stats['warning'] = warning
920
921         # save reliable stats from whole iteration
922         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
923         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
924         return stats, current_traffic_config['direction-total']
925
926     @staticmethod
927     def log_stats(stats):
928         """Log estimated stats during run."""
929         report = {
930             'datetime': str(datetime.now()),
931             'tx_packets': stats['overall']['tx']['total_pkts'],
932             'rx_packets': stats['overall']['rx']['total_pkts'],
933             'drop_packets': stats['overall']['rx']['dropped_pkts'],
934             'drop_rate_percent': stats['overall']['drop_rate_percent']
935         }
936         LOG.info('TX: %(tx_packets)d; '
937                  'RX: %(rx_packets)d; '
938                  'Est. Dropped: %(drop_packets)d; '
939                  'Est. Drop rate: %(drop_rate_percent).4f%%',
940                  report)
941
942     def run_traffic(self):
943         """Start traffic and return intermediate stats for each interval."""
944         stats = self.runner.run()
945         while self.runner.is_running:
946             self.log_stats(stats)
947             yield stats
948             stats = self.runner.poll_stats()
949             if stats is None:
950                 return
951         self.log_stats(stats)
952         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
953         yield stats
954
955     def cancel_traffic(self):
956         """Stop traffic."""
957         self.runner.stop()
958
959     def _get_traffic_config(self):
960         config = {}
961         load_total = 0.0
962         bps_total = 0.0
963         pps_total = 0.0
964         for idx, rate in enumerate(self.run_config['rates']):
965             key = 'direction-forward' if idx == 0 else 'direction-reverse'
966             config[key] = {
967                 'l2frame_size': self.run_config['l2frame_size'],
968                 'duration_sec': self.run_config['duration_sec']
969             }
970             config[key].update(rate)
971             config[key].update(self.__convert_rates(rate))
972             load_total += float(config[key]['rate_percent'])
973             bps_total += float(config[key]['rate_bps'])
974             pps_total += float(config[key]['rate_pps'])
975         config['direction-total'] = dict(config['direction-forward'])
976         config['direction-total'].update({
977             'rate_percent': load_total,
978             'rate_pps': cast_integer(pps_total),
979             'rate_bps': bps_total
980         })
981
982         return config
983
984     def get_run_config(self, results):
985         """Return configuration which was used for the last run."""
986         r = {}
987         # because we want each direction to have the far end RX rates,
988         # use the far end index (1-idx) to retrieve the RX rates
989         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
990             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
991             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
992             r[key] = {
993                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
994                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
995                 "rx": self.__convert_rates({'rate_pps': rx_rate})
996             }
997
998         total = {}
999         for direction in ['orig', 'tx', 'rx']:
1000             total[direction] = {}
1001             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
1002                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
1003
1004         r['direction-total'] = total
1005         return r
1006
1007     def insert_interface_stats(self, pps_list):
1008         """Insert interface stats to a list of packet path stats.
1009
1010         pps_list: a list of packet path stats instances indexed by chain index
1011
1012         This function will insert the packet path stats for the traffic gen ports 0 and 1
1013         with itemized per chain tx/rx counters.
1014         There will be as many packet path stats as chains.
1015         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1016         self.pps_list:
1017         [
1018         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1019         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1020         ...
1021         ]
1022         """
1023         def get_if_stats(chain_idx):
1024             return [InterfaceStats('p' + str(port), self.tool)
1025                     for port in range(2)]
1026         # keep the list of list of interface stats indexed by the chain id
1027         self.ifstats = [get_if_stats(chain_idx)
1028                         for chain_idx in range(self.config.service_chain_count)]
1029         # note that we need to make a copy of the ifs list so that any modification in the
1030         # list from pps will not change the list saved in self.ifstats
1031         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1032         # insert the corresponding pps in the passed list
1033         pps_list.extend(self.pps_list)
1034
1035     def update_interface_stats(self, diff=False):
1036         """Update all interface stats.
1037
1038         diff: if False, simply refresh the interface stats values with latest values
1039               if True, diff the interface stats with the latest values
1040         Make sure that the interface stats inserted in insert_interface_stats() are updated
1041         with proper values.
1042         self.ifstats:
1043         [
1044         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1045         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1046         ...
1047         ]
1048         """
1049         if diff:
1050             stats = self.gen.get_stats()
1051             for chain_idx, ifs in enumerate(self.ifstats):
1052                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1053                 # corresponding to the
1054                 # port 0 and port 1 for the given chain_idx
1055                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1056                 # interface stats for the pps because it could have been modified to contain
1057                 # additional interface stats
1058                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1059             # special handling for vxlan
1060             # in case of vxlan, flow stats are not available so all rx counters will be
1061             # zeros when the total rx port counter is non zero.
1062             # in that case,
1063             for port in range(2):
1064                 total_rx = 0
1065                 for ifs in self.ifstats:
1066                     total_rx += ifs[port].rx
1067                 if total_rx == 0:
1068                     # check if the total port rx from Trex is also zero
1069                     port_rx = stats[port]['rx']['total_pkts']
1070                     if port_rx:
1071                         # the total rx for all chains from port level stats is non zero
1072                         # which means that the per-chain stats are not available
1073                         if len(self.ifstats) == 1:
1074                             # only one chain, simply report the port level rx to the chain rx stats
1075                             self.ifstats[0][port].rx = port_rx
1076                         else:
1077                             for ifs in self.ifstats:
1078                                 # mark this data as unavailable
1079                                 ifs[port].rx = None
1080                             # pitch in the total rx only in the last chain pps
1081                             self.ifstats[-1][port].rx_total = port_rx
1082
1083     @staticmethod
1084     def compare_tx_rates(required, actual):
1085         """Compare the actual TX rate to the required TX rate."""
1086         threshold = 0.9
1087         are_different = False
1088         try:
1089             if float(actual) / required < threshold:
1090                 are_different = True
1091         except ZeroDivisionError:
1092             are_different = True
1093
1094         if are_different:
1095             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1096                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1097                   "to achieve the requested TX rate.".format(r=required, a=actual)
1098             LOG.info(msg)
1099             return msg
1100
1101         return None
1102
1103     def get_per_direction_rate(self):
1104         """Get the rate for each direction."""
1105         divisor = 2 if self.run_config['bidirectional'] else 1
1106         if 'rate_percent' in self.current_total_rate:
1107             # don't split rate if it's percentage
1108             divisor = 1
1109
1110         return utils.divide_rate(self.current_total_rate, divisor)
1111
1112     def close(self):
1113         """Close this instance."""
1114         try:
1115             self.gen.stop_traffic()
1116         except Exception:
1117             pass
1118         self.gen.clear_stats()
1119         self.gen.cleanup()