c9daf4071cec7e309c350fdb098e92f9c94d7f95
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex_stl_lib.api import STLError
27 # pylint: enable=import-error
28
29 from log import LOG
30 from packet_stats import InterfaceStats
31 from packet_stats import PacketPathStats
32 from stats_collector import IntervalCollector
33 from stats_collector import IterationCollector
34 import traffic_gen.traffic_utils as utils
35 from utils import cast_integer
36
37
38 class TrafficClientException(Exception):
39     """Generic traffic client exception."""
40
41     pass
42
43
44 class TrafficRunner(object):
45     """Serialize various steps required to run traffic."""
46
47     def __init__(self, client, duration_sec, interval_sec=0):
48         """Create a traffic runner."""
49         self.client = client
50         self.start_time = None
51         self.duration_sec = duration_sec
52         self.interval_sec = interval_sec
53
54     def run(self):
55         """Clear stats and instruct the traffic generator to start generating traffic."""
56         if self.is_running():
57             return None
58         LOG.info('Running traffic generator')
59         self.client.gen.clear_stats()
60         self.client.gen.start_traffic()
61         self.start_time = time.time()
62         return self.poll_stats()
63
64     def stop(self):
65         """Stop the current run and instruct the traffic generator to stop traffic."""
66         if self.is_running():
67             self.start_time = None
68             self.client.gen.stop_traffic()
69
70     def is_running(self):
71         """Check if a run is still pending."""
72         return self.start_time is not None
73
74     def time_elapsed(self):
75         """Return time elapsed since start of run."""
76         if self.is_running():
77             return time.time() - self.start_time
78         return self.duration_sec
79
80     def poll_stats(self):
81         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
82
83         return: latest stats or None if traffic is stopped
84         """
85         if not self.is_running():
86             return None
87         if self.client.skip_sleep():
88             self.stop()
89             return self.client.get_stats()
90         time_elapsed = self.time_elapsed()
91         if time_elapsed > self.duration_sec:
92             self.stop()
93             return None
94         time_left = self.duration_sec - time_elapsed
95         if self.interval_sec > 0.0:
96             if time_left <= self.interval_sec:
97                 time.sleep(time_left)
98                 self.stop()
99             else:
100                 time.sleep(self.interval_sec)
101         else:
102             time.sleep(self.duration_sec)
103             self.stop()
104         return self.client.get_stats()
105
106
107 class IpBlock(object):
108     """Manage a block of IP addresses."""
109
110     def __init__(self, base_ip, step_ip, count_ip):
111         """Create an IP block."""
112         self.base_ip_int = Device.ip_to_int(base_ip)
113         self.step = Device.ip_to_int(step_ip)
114         self.max_available = count_ip
115         self.next_free = 0
116
117     def get_ip(self, index=0):
118         """Return the IP address at given index."""
119         if index < 0 or index >= self.max_available:
120             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
121         return Device.int_to_ip(self.base_ip_int + index * self.step)
122
123     def reserve_ip_range(self, count):
124         """Reserve a range of count consecutive IP addresses spaced by step."""
125         if self.next_free + count > self.max_available:
126             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
127                              (self.next_free,
128                               self.max_available,
129                               count))
130         first_ip = self.get_ip(self.next_free)
131         last_ip = self.get_ip(self.next_free + count - 1)
132         self.next_free += count
133         return (first_ip, last_ip)
134
135     def reset_reservation(self):
136         """Reset all reservations and restart with a completely unused IP block."""
137         self.next_free = 0
138
139
140 class Device(object):
141     """Represent a port device and all information associated to it.
142
143     In the curent version we only support 2 port devices for the traffic generator
144     identified as port 0 or port 1.
145     """
146
147     def __init__(self, port, generator_config):
148         """Create a new device for a given port."""
149         self.generator_config = generator_config
150         self.chain_count = generator_config.service_chain_count
151         self.flow_count = generator_config.flow_count / 2
152         self.port = port
153         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
154         self.vtep_vlan = None
155         self.vtep_src_mac = None
156         self.vxlan = False
157         self.pci = generator_config.interfaces[port].pci
158         self.mac = None
159         self.dest_macs = None
160         self.vtep_dst_mac = None
161         self.vtep_dst_ip = None
162         if generator_config.vteps is None:
163             self.vtep_src_ip = None
164         else:
165             self.vtep_src_ip = generator_config.vteps[port]
166         self.vnis = None
167         self.vlans = None
168         self.ip_addrs = generator_config.ip_addrs[port]
169         subnet = IPNetwork(self.ip_addrs)
170         self.ip = subnet.ip.format()
171         self.ip_addrs_step = generator_config.ip_addrs_step
172         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
173         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
174                                    generator_config.gateway_ip_addrs_step,
175                                    self.chain_count)
176         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
177         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
178                                       generator_config.tg_gateway_ip_addrs_step,
179                                       self.chain_count)
180         self.udp_src_port = generator_config.udp_src_port
181         self.udp_dst_port = generator_config.udp_dst_port
182
183     def set_mac(self, mac):
184         """Set the local MAC for this port device."""
185         if mac is None:
186             raise TrafficClientException('Trying to set traffic generator MAC address as None')
187         self.mac = mac
188
189     def get_peer_device(self):
190         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
191         return self.generator_config.devices[1 - self.port]
192
193     def set_vtep_dst_mac(self, dest_macs):
194         """Set the list of dest MACs indexed by the chain id.
195
196         This is only called in 2 cases:
197         - VM macs discovered using openstack API
198         - dest MACs provisioned in config file
199         """
200         self.vtep_dst_mac = map(str, dest_macs)
201
202     def set_dest_macs(self, dest_macs):
203         """Set the list of dest MACs indexed by the chain id.
204
205         This is only called in 2 cases:
206         - VM macs discovered using openstack API
207         - dest MACs provisioned in config file
208         """
209         self.dest_macs = map(str, dest_macs)
210
211     def get_dest_macs(self):
212         """Get the list of dest macs for this device.
213
214         If set_dest_macs was never called, assumes l2-loopback and return
215         a list of peer mac (as many as chains but normally only 1 chain)
216         """
217         if self.dest_macs:
218             return self.dest_macs
219         # assume this is l2-loopback
220         return [self.get_peer_device().mac] * self.chain_count
221
222     def set_vlans(self, vlans):
223         """Set the list of vlans to use indexed by the chain id."""
224         self.vlans = vlans
225         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
226
227     def set_vtep_vlan(self, vlan):
228         """Set the vtep vlan to use indexed by specific port."""
229         self.vtep_vlan = vlan
230         self.vxlan = True
231         self.vlan_tagging = None
232         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
233
234     def set_vxlan_endpoints(self, src_ip, dst_ip):
235         self.vtep_dst_ip = dst_ip
236         self.vtep_src_ip = src_ip
237         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
238                  self.vtep_src_ip, self.vtep_dst_ip)
239
240     def set_vxlans(self, vnis):
241         self.vnis = vnis
242         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
243
244     def get_gw_ip(self, chain_index):
245         """Retrieve the IP address assigned for the gateway of a given chain."""
246         return self.gw_ip_block.get_ip(chain_index)
247
248     def get_stream_configs(self):
249         """Get the stream config for a given chain on this device.
250
251         Called by the traffic generator driver to program the traffic generator properly
252         before generating traffic
253         """
254         configs = []
255         # exact flow count for each chain is calculated as follows:
256         # - all chains except the first will have the same flow count
257         #   calculated as (total_flows + chain_count - 1) / chain_count
258         # - the first chain will have the remainder
259         # example 11 flows and 3 chains => 3, 4, 4
260         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
261         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
262         peer = self.get_peer_device()
263         self.ip_block.reset_reservation()
264         peer.ip_block.reset_reservation()
265         dest_macs = self.get_dest_macs()
266
267         for chain_idx in xrange(self.chain_count):
268             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
269             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
270
271             configs.append({
272                 'count': cur_chain_flow_count,
273                 'mac_src': self.mac,
274                 'mac_dst': dest_macs[chain_idx],
275                 'ip_src_addr': src_ip_first,
276                 'ip_src_addr_max': src_ip_last,
277                 'ip_src_count': cur_chain_flow_count,
278                 'ip_dst_addr': dst_ip_first,
279                 'ip_dst_addr_max': dst_ip_last,
280                 'ip_dst_count': cur_chain_flow_count,
281                 'ip_addrs_step': self.ip_addrs_step,
282                 'udp_src_port': self.udp_src_port,
283                 'udp_dst_port': self.udp_dst_port,
284                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
285                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
286                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
287                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
288                 'vxlan': self.vxlan,
289                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
290                 'vtep_src_mac': self.mac if self.vxlan is True else None,
291                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
292                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
293                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
294                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
295             })
296             # after first chain, fall back to the flow count for all other chains
297             cur_chain_flow_count = flows_per_chain
298         return configs
299
300     @staticmethod
301     def ip_to_int(addr):
302         """Convert an IP address from string to numeric."""
303         return struct.unpack("!I", socket.inet_aton(addr))[0]
304
305     @staticmethod
306     def int_to_ip(nvalue):
307         """Convert an IP address from numeric to string."""
308         return socket.inet_ntoa(struct.pack("!I", nvalue))
309
310
311 class GeneratorConfig(object):
312     """Represents traffic configuration for currently running traffic profile."""
313
314     DEFAULT_IP_STEP = '0.0.0.1'
315     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
316
317     def __init__(self, config):
318         """Create a generator config."""
319         self.config = config
320         # name of the generator profile (normally trex or dummy)
321         # pick the default one if not specified explicitly from cli options
322         if not config.generator_profile:
323             config.generator_profile = config.traffic_generator.default_profile
324         # pick up the profile dict based on the name
325         gen_config = self.__match_generator_profile(config.traffic_generator,
326                                                     config.generator_profile)
327         self.gen_config = gen_config
328         # copy over fields from the dict
329         self.tool = gen_config.tool
330         self.ip = gen_config.ip
331         self.cores = gen_config.get('cores', 1)
332         if gen_config.intf_speed:
333             # interface speed is overriden from config
334             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
335         else:
336             # interface speed is discovered/provided by the traffic generator
337             self.intf_speed = 0
338         self.software_mode = gen_config.get('software_mode', False)
339         self.interfaces = gen_config.interfaces
340         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
341             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
342
343         self.service_chain = config.service_chain
344         self.service_chain_count = config.service_chain_count
345         self.flow_count = config.flow_count
346         self.host_name = gen_config.host_name
347
348         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
349         self.ip_addrs = gen_config.ip_addrs
350         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
351         self.tg_gateway_ip_addrs_step = \
352             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
353         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
354         self.gateway_ips = gen_config.gateway_ip_addrs
355         self.udp_src_port = gen_config.udp_src_port
356         self.udp_dst_port = gen_config.udp_dst_port
357         self.vteps = gen_config.get('vteps')
358         self.vnis = gen_config.get('vnis')
359         self.devices = [Device(port, self) for port in [0, 1]]
360         # This should normally always be [0, 1]
361         self.ports = [device.port for device in self.devices]
362
363         # check that pci is not empty
364         if not gen_config.interfaces[0].get('pci', None) or \
365            not gen_config.interfaces[1].get('pci', None):
366             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
367
368         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
369         self.vlan_tagging = config.vlan_tagging
370
371         # needed for result/summarizer
372         config['tg-name'] = gen_config.name
373         config['tg-tool'] = self.tool
374
375     def to_json(self):
376         """Get json form to display the content into the overall result dict."""
377         return dict(self.gen_config)
378
379     def set_dest_macs(self, port_index, dest_macs):
380         """Set the list of dest MACs indexed by the chain id on given port.
381
382         port_index: the port for which dest macs must be set
383         dest_macs: a list of dest MACs indexed by chain id
384         """
385         if len(dest_macs) != self.config.service_chain_count:
386             raise TrafficClientException('Dest MAC list %s must have %d entries' %
387                                          (dest_macs, self.config.service_chain_count))
388         self.devices[port_index].set_dest_macs(dest_macs)
389         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
390
391     def set_vtep_dest_macs(self, port_index, dest_macs):
392         """Set the list of dest MACs indexed by the chain id on given port.
393
394         port_index: the port for which dest macs must be set
395         dest_macs: a list of dest MACs indexed by chain id
396         """
397         if len(dest_macs) != self.config.service_chain_count:
398             raise TrafficClientException('Dest MAC list %s must have %d entries' %
399                                          (dest_macs, self.config.service_chain_count))
400         self.devices[port_index].set_vtep_dst_mac(dest_macs)
401         LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
402
403     def get_dest_macs(self):
404         """Return the list of dest macs indexed by port."""
405         return [dev.get_dest_macs() for dev in self.devices]
406
407     def set_vlans(self, port_index, vlans):
408         """Set the list of vlans to use indexed by the chain id on given port.
409
410         port_index: the port for which VLANs must be set
411         vlans: a  list of vlan lists indexed by chain id
412         """
413         if len(vlans) != self.config.service_chain_count:
414             raise TrafficClientException('VLAN list %s must have %d entries' %
415                                          (vlans, self.config.service_chain_count))
416         self.devices[port_index].set_vlans(vlans)
417
418     def set_vxlans(self, port_index, vxlans):
419         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
420
421         port_index: the port for which VXLANs must be set
422         VXLANs: a  list of VNIs lists indexed by chain id
423         """
424         if len(vxlans) != self.config.service_chain_count:
425             raise TrafficClientException('VXLAN list %s must have %d entries' %
426                                          (vxlans, self.config.service_chain_count))
427         self.devices[port_index].set_vxlans(vxlans)
428
429     def set_vtep_vlan(self, port_index, vlan):
430         """Set the vtep vlan to use indexed by the chain id on given port.
431         port_index: the port for which VLAN must be set
432         """
433         self.devices[port_index].set_vtep_vlan(vlan)
434
435     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
436         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
437
438     @staticmethod
439     def __match_generator_profile(traffic_generator, generator_profile):
440         gen_config = AttrDict(traffic_generator)
441         gen_config.pop('default_profile')
442         gen_config.pop('generator_profile')
443         matching_profile = [profile for profile in traffic_generator.generator_profile if
444                             profile.name == generator_profile]
445         if len(matching_profile) != 1:
446             raise Exception('Traffic generator profile not found: ' + generator_profile)
447
448         gen_config.update(matching_profile[0])
449         return gen_config
450
451
452 class TrafficClient(object):
453     """Traffic generator client with NDR/PDR binary seearch."""
454
455     PORTS = [0, 1]
456
457     def __init__(self, config, notifier=None):
458         """Create a new TrafficClient instance.
459
460         config: nfvbench config
461         notifier: notifier (optional)
462
463         A new instance is created everytime the nfvbench config may have changed.
464         """
465         self.config = config
466         self.generator_config = GeneratorConfig(config)
467         self.tool = self.generator_config.tool
468         self.gen = self._get_generator()
469         self.notifier = notifier
470         self.interval_collector = None
471         self.iteration_collector = None
472         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
473         self.config.frame_sizes = self._get_frame_sizes()
474         self.run_config = {
475             'l2frame_size': None,
476             'duration_sec': self.config.duration_sec,
477             'bidirectional': True,
478             'rates': []  # to avoid unsbuscriptable-obj warning
479         }
480         self.current_total_rate = {'rate_percent': '10'}
481         if self.config.single_run:
482             self.current_total_rate = utils.parse_rate_str(self.config.rate)
483         self.ifstats = None
484         # Speed is either discovered when connecting to TG or set from config
485         # This variable is 0 if not yet discovered from TG or must be the speed of
486         # each interface in bits per second
487         self.intf_speed = self.generator_config.intf_speed
488
489     def _get_generator(self):
490         tool = self.tool.lower()
491         if tool == 'trex':
492             from traffic_gen import trex
493             return trex.TRex(self)
494         if tool == 'dummy':
495             from traffic_gen import dummy
496             return dummy.DummyTG(self)
497         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
498
499     def skip_sleep(self):
500         """Skip all sleeps when doing unit testing with dummy TG.
501
502         Must be overriden using mock.patch
503         """
504         return False
505
506     def _get_frame_sizes(self):
507         traffic_profile_name = self.config.traffic.profile
508         matching_profiles = [profile for profile in self.config.traffic_profile if
509                              profile.name == traffic_profile_name]
510         if len(matching_profiles) > 1:
511             raise TrafficClientException('Multiple traffic profiles with name: ' +
512                                          traffic_profile_name)
513         elif not matching_profiles:
514             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
515         return matching_profiles[0].l2frame_size
516
517     def start_traffic_generator(self):
518         """Start the traffic generator process (traffic not started yet)."""
519         self.gen.connect()
520         # pick up the interface speed if it is not set from config
521         intf_speeds = self.gen.get_port_speed_gbps()
522         # convert Gbps unit into bps
523         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
524         if self.intf_speed:
525             # interface speed is overriden from config
526             if self.intf_speed != tg_if_speed:
527                 # Warn the user if the speed in the config is different
528                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
529                             intf_speeds[0])
530         else:
531             # interface speed not provisioned by config
532             self.intf_speed = tg_if_speed
533             # also update the speed in the tg config
534             self.generator_config.intf_speed = tg_if_speed
535
536         # Save the traffic generator local MAC
537         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
538             device.set_mac(mac)
539
540     def setup(self):
541         """Set up the traffic client."""
542         self.gen.clear_stats()
543
544     def get_version(self):
545         """Get the traffic generator version."""
546         return self.gen.get_version()
547
548     def ensure_end_to_end(self):
549         """Ensure traffic generator receives packets it has transmitted.
550
551         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
552
553         VMs that are started and in active state may not pass traffic yet. It is imperative to make
554         sure that all VMs are passing traffic in both directions before starting any benchmarking.
555         To verify this, we need to send at a low frequency bi-directional packets and make sure
556         that we receive all packets back from all VMs. The number of flows is equal to 2 times
557         the number of chains (1 per direction) and we need to make sure we receive packets coming
558         from exactly 2 x chain count different source MAC addresses.
559
560         Example:
561             PVP chain (1 VM per chain)
562             N = 10 (number of chains)
563             Flow count = 20 (number of flows)
564             If the number of unique source MAC addresses from received packets is 20 then
565             all 10 VMs 10 VMs are in operational state.
566         """
567         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
568         # send 2pps on each chain and each direction
569         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
570         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
571
572         # ensures enough traffic is coming back
573         retry_count = (self.config.check_traffic_time_sec +
574                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
575
576         # we expect to see packets coming from 2 unique MAC per chain
577         # because there can be flooding in the case of shared net
578         # we must verify that packets from the right VMs are received
579         # and not just count unique src MAC
580         # create a dict of (port, chain) tuples indexed by dest mac
581         mac_map = {}
582         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
583             for chain, mac in enumerate(dest_macs):
584                 mac_map[mac] = (port, chain)
585         unique_src_mac_count = len(mac_map)
586         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
587             get_mac_id = lambda packet: packet['binary'][60:66]
588         elif self.config.vxlan:
589             get_mac_id = lambda packet: packet['binary'][56:62]
590         else:
591             get_mac_id = lambda packet: packet['binary'][6:12]
592         for it in xrange(retry_count):
593             self.gen.clear_stats()
594             self.gen.start_traffic()
595             self.gen.start_capture()
596             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
597                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
598                      it + 1, retry_count)
599             if not self.skip_sleep():
600                 time.sleep(self.config.generic_poll_sec)
601             self.gen.stop_traffic()
602             self.gen.fetch_capture_packets()
603             self.gen.stop_capture()
604
605             for packet in self.gen.packet_list:
606                 mac_id = get_mac_id(packet)
607                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
608                 if src_mac in mac_map:
609                     port, chain = mac_map[src_mac]
610                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
611                              src_mac, chain, port)
612                     mac_map.pop(src_mac, None)
613
614                 if not mac_map:
615                     LOG.info('End-to-end connectivity established')
616                     return
617
618         raise TrafficClientException('End-to-end connectivity cannot be ensured')
619
620     def ensure_arp_successful(self):
621         """Resolve all IP using ARP and throw an exception in case of failure."""
622         dest_macs = self.gen.resolve_arp()
623         if dest_macs:
624             # all dest macs are discovered, saved them into the generator config
625             if self.config.vxlan:
626                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
627                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
628             else:
629                 self.generator_config.set_dest_macs(0, dest_macs[0])
630                 self.generator_config.set_dest_macs(1, dest_macs[1])
631         else:
632             raise TrafficClientException('ARP cannot be resolved')
633
634     def set_traffic(self, frame_size, bidirectional):
635         """Reconfigure the traffic generator for a new frame size."""
636         self.run_config['bidirectional'] = bidirectional
637         self.run_config['l2frame_size'] = frame_size
638         self.run_config['rates'] = [self.get_per_direction_rate()]
639         if bidirectional:
640             self.run_config['rates'].append(self.get_per_direction_rate())
641         else:
642             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
643             if unidir_reverse_pps > 0:
644                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
645         # Fix for [NFVBENCH-67], convert the rate string to PPS
646         for idx, rate in enumerate(self.run_config['rates']):
647             if 'rate_pps' not in rate:
648                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
649
650         self.gen.clear_streamblock()
651         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
652
653     def _modify_load(self, load):
654         self.current_total_rate = {'rate_percent': str(load)}
655         rate_per_direction = self.get_per_direction_rate()
656
657         self.gen.modify_rate(rate_per_direction, False)
658         self.run_config['rates'][0] = rate_per_direction
659         if self.run_config['bidirectional']:
660             self.gen.modify_rate(rate_per_direction, True)
661             self.run_config['rates'][1] = rate_per_direction
662
663     def get_ndr_and_pdr(self):
664         """Start the NDR/PDR iteration and return the results."""
665         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
666         targets = {}
667         if self.config.ndr_run:
668             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
669             targets['ndr'] = self.config.measurement.NDR
670         if self.config.pdr_run:
671             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
672             targets['pdr'] = self.config.measurement.PDR
673
674         self.run_config['start_time'] = time.time()
675         self.interval_collector = IntervalCollector(self.run_config['start_time'])
676         self.interval_collector.attach_notifier(self.notifier)
677         self.iteration_collector = IterationCollector(self.run_config['start_time'])
678         results = {}
679         self.__range_search(0.0, 200.0, targets, results)
680
681         results['iteration_stats'] = {
682             'ndr_pdr': self.iteration_collector.get()
683         }
684
685         if self.config.ndr_run:
686             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
687             results['ndr']['time_taken_sec'] = \
688                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
689             if self.config.pdr_run:
690                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
691                 results['pdr']['time_taken_sec'] = \
692                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
693         else:
694             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
695             results['pdr']['time_taken_sec'] = \
696                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
697         return results
698
699     def __get_dropped_rate(self, result):
700         dropped_pkts = result['rx']['dropped_pkts']
701         total_pkts = result['tx']['total_pkts']
702         if not total_pkts:
703             return float('inf')
704         return float(dropped_pkts) / total_pkts * 100
705
706     def get_stats(self):
707         """Collect final stats for previous run."""
708         stats = self.gen.get_stats()
709         retDict = {'total_tx_rate': stats['total_tx_rate']}
710         for port in self.PORTS:
711             retDict[port] = {'tx': {}, 'rx': {}}
712
713         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
714         rx_keys = tx_keys + ['dropped_pkts']
715
716         for port in self.PORTS:
717             for key in tx_keys:
718                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
719             for key in rx_keys:
720                 try:
721                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
722                 except ValueError:
723                     retDict[port]['rx'][key] = 0
724             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
725                 stats[port]['rx']['avg_delay_usec'])
726             retDict[port]['rx']['min_delay_usec'] = cast_integer(
727                 stats[port]['rx']['min_delay_usec'])
728             retDict[port]['rx']['max_delay_usec'] = cast_integer(
729                 stats[port]['rx']['max_delay_usec'])
730             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
731
732         ports = sorted(retDict.keys())
733         if self.run_config['bidirectional']:
734             retDict['overall'] = {'tx': {}, 'rx': {}}
735             for key in tx_keys:
736                 retDict['overall']['tx'][key] = \
737                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
738             for key in rx_keys:
739                 retDict['overall']['rx'][key] = \
740                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
741             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
742                           retDict[ports[1]]['rx']['total_pkts']]
743             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
744                           retDict[ports[1]]['rx']['avg_delay_usec']]
745             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
746                           retDict[ports[1]]['rx']['max_delay_usec']]
747             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
748                           retDict[ports[1]]['rx']['min_delay_usec']]
749             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
750             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
751             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
752             for key in ['pkt_bit_rate', 'pkt_rate']:
753                 for dirc in ['tx', 'rx']:
754                     retDict['overall'][dirc][key] /= 2.0
755         else:
756             retDict['overall'] = retDict[ports[0]]
757         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
758         return retDict
759
760     def __convert_rates(self, rate):
761         return utils.convert_rates(self.run_config['l2frame_size'],
762                                    rate,
763                                    self.intf_speed)
764
765     def __ndr_pdr_found(self, tag, load):
766         rates = self.__convert_rates({'rate_percent': load})
767         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
768         last_stats = self.iteration_collector.peek()
769         self.interval_collector.add_ndr_pdr(tag, last_stats)
770
771     def __format_output_stats(self, stats):
772         for key in self.PORTS + ['overall']:
773             interface = stats[key]
774             stats[key] = {
775                 'tx_pkts': interface['tx']['total_pkts'],
776                 'rx_pkts': interface['rx']['total_pkts'],
777                 'drop_percentage': interface['drop_rate_percent'],
778                 'drop_pct': interface['rx']['dropped_pkts'],
779                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
780                 'max_delay_usec': interface['rx']['max_delay_usec'],
781                 'min_delay_usec': interface['rx']['min_delay_usec'],
782             }
783
784         return stats
785
786     def __targets_found(self, rate, targets, results):
787         for tag, target in targets.iteritems():
788             LOG.info('Found %s (%s) load: %s', tag, target, rate)
789             self.__ndr_pdr_found(tag, rate)
790             results[tag]['timestamp_sec'] = time.time()
791
792     def __range_search(self, left, right, targets, results):
793         """Perform a binary search for a list of targets inside a [left..right] range or rate.
794
795         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
796                 indicating the rate to send on each interface
797         right   the right side of the range to search as a % of line rate
798                 indicating the rate to send on each interface
799         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
800                 ('ndr', 'pdr')
801         results a dict to store results
802         """
803         if not targets:
804             return
805         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
806
807         # Terminate search when gap is less than load epsilon
808         if right - left < self.config.measurement.load_epsilon:
809             self.__targets_found(left, targets, results)
810             return
811
812         # Obtain the average drop rate in for middle load
813         middle = (left + right) / 2.0
814         try:
815             stats, rates = self.__run_search_iteration(middle)
816         except STLError:
817             LOG.exception("Got exception from traffic generator during binary search")
818             self.__targets_found(left, targets, results)
819             return
820         # Split target dicts based on the avg drop rate
821         left_targets = {}
822         right_targets = {}
823         for tag, target in targets.iteritems():
824             if stats['overall']['drop_rate_percent'] <= target:
825                 # record the best possible rate found for this target
826                 results[tag] = rates
827                 results[tag].update({
828                     'load_percent_per_direction': middle,
829                     'stats': self.__format_output_stats(dict(stats)),
830                     'timestamp_sec': None
831                 })
832                 right_targets[tag] = target
833             else:
834                 # initialize to 0 all fields of result for
835                 # the worst case scenario of the binary search (if ndr/pdr is not found)
836                 if tag not in results:
837                     results[tag] = dict.fromkeys(rates, 0)
838                     empty_stats = self.__format_output_stats(dict(stats))
839                     for key in empty_stats:
840                         if isinstance(empty_stats[key], dict):
841                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
842                         else:
843                             empty_stats[key] = 0
844                     results[tag].update({
845                         'load_percent_per_direction': 0,
846                         'stats': empty_stats,
847                         'timestamp_sec': None
848                     })
849                 left_targets[tag] = target
850
851         # search lower half
852         self.__range_search(left, middle, left_targets, results)
853
854         # search upper half only if the upper rate does not exceed
855         # 100%, this only happens when the first search at 100%
856         # yields a DR that is < target DR
857         if middle >= 100:
858             self.__targets_found(100, right_targets, results)
859         else:
860             self.__range_search(middle, right, right_targets, results)
861
862     def __run_search_iteration(self, rate):
863         """Run one iteration at the given rate level.
864
865         rate: the rate to send on each port in percent (0 to 100)
866         """
867         self._modify_load(rate)
868
869         # poll interval stats and collect them
870         for stats in self.run_traffic():
871             self.interval_collector.add(stats)
872             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
873             if time_elapsed_ratio >= 1:
874                 self.cancel_traffic()
875                 if not self.skip_sleep():
876                     time.sleep(self.config.pause_sec)
877         self.interval_collector.reset()
878
879         # get stats from the run
880         stats = self.runner.client.get_stats()
881         current_traffic_config = self._get_traffic_config()
882         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
883                                         stats['total_tx_rate'])
884         if warning is not None:
885             stats['warning'] = warning
886
887         # save reliable stats from whole iteration
888         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
889         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
890         return stats, current_traffic_config['direction-total']
891
892     @staticmethod
893     def log_stats(stats):
894         """Log estimated stats during run."""
895         report = {
896             'datetime': str(datetime.now()),
897             'tx_packets': stats['overall']['tx']['total_pkts'],
898             'rx_packets': stats['overall']['rx']['total_pkts'],
899             'drop_packets': stats['overall']['rx']['dropped_pkts'],
900             'drop_rate_percent': stats['overall']['drop_rate_percent']
901         }
902         LOG.info('TX: %(tx_packets)d; '
903                  'RX: %(rx_packets)d; '
904                  'Est. Dropped: %(drop_packets)d; '
905                  'Est. Drop rate: %(drop_rate_percent).4f%%',
906                  report)
907
908     def run_traffic(self):
909         """Start traffic and return intermediate stats for each interval."""
910         stats = self.runner.run()
911         while self.runner.is_running:
912             self.log_stats(stats)
913             yield stats
914             stats = self.runner.poll_stats()
915             if stats is None:
916                 return
917         self.log_stats(stats)
918         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
919         yield stats
920
921     def cancel_traffic(self):
922         """Stop traffic."""
923         self.runner.stop()
924
925     def _get_traffic_config(self):
926         config = {}
927         load_total = 0.0
928         bps_total = 0.0
929         pps_total = 0.0
930         for idx, rate in enumerate(self.run_config['rates']):
931             key = 'direction-forward' if idx == 0 else 'direction-reverse'
932             config[key] = {
933                 'l2frame_size': self.run_config['l2frame_size'],
934                 'duration_sec': self.run_config['duration_sec']
935             }
936             config[key].update(rate)
937             config[key].update(self.__convert_rates(rate))
938             load_total += float(config[key]['rate_percent'])
939             bps_total += float(config[key]['rate_bps'])
940             pps_total += float(config[key]['rate_pps'])
941         config['direction-total'] = dict(config['direction-forward'])
942         config['direction-total'].update({
943             'rate_percent': load_total,
944             'rate_pps': cast_integer(pps_total),
945             'rate_bps': bps_total
946         })
947
948         return config
949
950     def get_run_config(self, results):
951         """Return configuration which was used for the last run."""
952         r = {}
953         # because we want each direction to have the far end RX rates,
954         # use the far end index (1-idx) to retrieve the RX rates
955         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
956             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
957             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
958             r[key] = {
959                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
960                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
961                 "rx": self.__convert_rates({'rate_pps': rx_rate})
962             }
963
964         total = {}
965         for direction in ['orig', 'tx', 'rx']:
966             total[direction] = {}
967             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
968                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
969
970         r['direction-total'] = total
971         return r
972
973     def insert_interface_stats(self, pps_list):
974         """Insert interface stats to a list of packet path stats.
975
976         pps_list: a list of packet path stats instances indexed by chain index
977
978         This function will insert the packet path stats for the traffic gen ports 0 and 1
979         with itemized per chain tx/rx counters.
980         There will be as many packet path stats as chains.
981         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
982         self.pps_list:
983         [
984         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
985         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
986         ...
987         ]
988         """
989         def get_if_stats(chain_idx):
990             return [InterfaceStats('p' + str(port), self.tool)
991                     for port in range(2)]
992         # keep the list of list of interface stats indexed by the chain id
993         self.ifstats = [get_if_stats(chain_idx)
994                         for chain_idx in range(self.config.service_chain_count)]
995         # note that we need to make a copy of the ifs list so that any modification in the
996         # list from pps will not change the list saved in self.ifstats
997         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
998         # insert the corresponding pps in the passed list
999         pps_list.extend(self.pps_list)
1000
1001     def update_interface_stats(self, diff=False):
1002         """Update all interface stats.
1003
1004         diff: if False, simply refresh the interface stats values with latest values
1005               if True, diff the interface stats with the latest values
1006         Make sure that the interface stats inserted in insert_interface_stats() are updated
1007         with proper values.
1008         self.ifstats:
1009         [
1010         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1011         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1012         ...
1013         ]
1014         """
1015         if diff:
1016             stats = self.gen.get_stats()
1017             for chain_idx, ifs in enumerate(self.ifstats):
1018                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1019                 # corresponding to the
1020                 # port 0 and port 1 for the given chain_idx
1021                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1022                 # interface stats for the pps because it could have been modified to contain
1023                 # additional interface stats
1024                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1025
1026
1027     @staticmethod
1028     def compare_tx_rates(required, actual):
1029         """Compare the actual TX rate to the required TX rate."""
1030         threshold = 0.9
1031         are_different = False
1032         try:
1033             if float(actual) / required < threshold:
1034                 are_different = True
1035         except ZeroDivisionError:
1036             are_different = True
1037
1038         if are_different:
1039             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1040                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1041                   "to achieve the requested TX rate.".format(r=required, a=actual)
1042             LOG.info(msg)
1043             return msg
1044
1045         return None
1046
1047     def get_per_direction_rate(self):
1048         """Get the rate for each direction."""
1049         divisor = 2 if self.run_config['bidirectional'] else 1
1050         if 'rate_percent' in self.current_total_rate:
1051             # don't split rate if it's percentage
1052             divisor = 1
1053
1054         return utils.divide_rate(self.current_total_rate, divisor)
1055
1056     def close(self):
1057         """Close this instance."""
1058         try:
1059             self.gen.stop_traffic()
1060         except Exception:
1061             pass
1062         self.gen.clear_stats()
1063         self.gen.cleanup()