NFVBENCH-105 ARP not working with NFVbench 2.0
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex_stl_lib.api import STLError
27 # pylint: enable=import-error
28
29 from log import LOG
30 from packet_stats import InterfaceStats
31 from packet_stats import PacketPathStats
32 from stats_collector import IntervalCollector
33 from stats_collector import IterationCollector
34 import traffic_gen.traffic_utils as utils
35 from utils import cast_integer
36
37
38 class TrafficClientException(Exception):
39     """Generic traffic client exception."""
40
41     pass
42
43
44 class TrafficRunner(object):
45     """Serialize various steps required to run traffic."""
46
47     def __init__(self, client, duration_sec, interval_sec=0):
48         """Create a traffic runner."""
49         self.client = client
50         self.start_time = None
51         self.duration_sec = duration_sec
52         self.interval_sec = interval_sec
53
54     def run(self):
55         """Clear stats and instruct the traffic generator to start generating traffic."""
56         if self.is_running():
57             return None
58         LOG.info('Running traffic generator')
59         self.client.gen.clear_stats()
60         self.client.gen.start_traffic()
61         self.start_time = time.time()
62         return self.poll_stats()
63
64     def stop(self):
65         """Stop the current run and instruct the traffic generator to stop traffic."""
66         if self.is_running():
67             self.start_time = None
68             self.client.gen.stop_traffic()
69
70     def is_running(self):
71         """Check if a run is still pending."""
72         return self.start_time is not None
73
74     def time_elapsed(self):
75         """Return time elapsed since start of run."""
76         if self.is_running():
77             return time.time() - self.start_time
78         return self.duration_sec
79
80     def poll_stats(self):
81         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
82
83         return: latest stats or None if traffic is stopped
84         """
85         if not self.is_running():
86             return None
87         if self.client.skip_sleep():
88             self.stop()
89             return self.client.get_stats()
90         time_elapsed = self.time_elapsed()
91         if time_elapsed > self.duration_sec:
92             self.stop()
93             return None
94         time_left = self.duration_sec - time_elapsed
95         if self.interval_sec > 0.0:
96             if time_left <= self.interval_sec:
97                 time.sleep(time_left)
98                 self.stop()
99             else:
100                 time.sleep(self.interval_sec)
101         else:
102             time.sleep(self.duration_sec)
103             self.stop()
104         return self.client.get_stats()
105
106
107 class IpBlock(object):
108     """Manage a block of IP addresses."""
109
110     def __init__(self, base_ip, step_ip, count_ip):
111         """Create an IP block."""
112         self.base_ip_int = Device.ip_to_int(base_ip)
113         self.step = Device.ip_to_int(step_ip)
114         self.max_available = count_ip
115         self.next_free = 0
116
117     def get_ip(self, index=0):
118         """Return the IP address at given index."""
119         if index < 0 or index >= self.max_available:
120             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
121         return Device.int_to_ip(self.base_ip_int + index * self.step)
122
123     def reserve_ip_range(self, count):
124         """Reserve a range of count consecutive IP addresses spaced by step."""
125         if self.next_free + count > self.max_available:
126             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
127                              (self.next_free,
128                               self.max_available,
129                               count))
130         first_ip = self.get_ip(self.next_free)
131         last_ip = self.get_ip(self.next_free + count - 1)
132         self.next_free += count
133         return (first_ip, last_ip)
134
135     def reset_reservation(self):
136         """Reset all reservations and restart with a completely unused IP block."""
137         self.next_free = 0
138
139
140 class Device(object):
141     """Represent a port device and all information associated to it.
142
143     In the curent version we only support 2 port devices for the traffic generator
144     identified as port 0 or port 1.
145     """
146
147     def __init__(self, port, generator_config, vtep_vlan=None):
148         """Create a new device for a given port."""
149         self.generator_config = generator_config
150         self.chain_count = generator_config.service_chain_count
151         self.flow_count = generator_config.flow_count / 2
152         self.port = port
153         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
154         self.vtep_vlan = vtep_vlan
155         self.pci = generator_config.interfaces[port].pci
156         self.mac = None
157         self.dest_macs = None
158         self.vlans = None
159         self.ip_addrs = generator_config.ip_addrs[port]
160         subnet = IPNetwork(self.ip_addrs)
161         self.ip = subnet.ip.format()
162         self.ip_addrs_step = generator_config.ip_addrs_step
163         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
164         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
165                                    generator_config.gateway_ip_addrs_step,
166                                    self.chain_count)
167         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
168         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
169                                       generator_config.tg_gateway_ip_addrs_step,
170                                       self.chain_count)
171         self.udp_src_port = generator_config.udp_src_port
172         self.udp_dst_port = generator_config.udp_dst_port
173
174     def set_mac(self, mac):
175         """Set the local MAC for this port device."""
176         if mac is None:
177             raise TrafficClientException('Trying to set traffic generator MAC address as None')
178         self.mac = mac
179
180     def get_peer_device(self):
181         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
182         return self.generator_config.devices[1 - self.port]
183
184     def set_dest_macs(self, dest_macs):
185         """Set the list of dest MACs indexed by the chain id.
186
187         This is only called in 2 cases:
188         - VM macs discovered using openstack API
189         - dest MACs provisioned in config file
190         """
191         self.dest_macs = map(str, dest_macs)
192
193     def get_dest_macs(self):
194         """Get the list of dest macs for this device.
195
196         If set_dest_macs was never called, assumes l2-loopback and return
197         a list of peer mac (as many as chains but normally only 1 chain)
198         """
199         if self.dest_macs:
200             return self.dest_macs
201         # assume this is l2-loopback
202         return [self.get_peer_device().mac] * self.chain_count
203
204     def set_vlans(self, vlans):
205         """Set the list of vlans to use indexed by the chain id."""
206         self.vlans = vlans
207         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
208
209     def get_gw_ip(self, chain_index):
210         """Retrieve the IP address assigned for the gateway of a given chain."""
211         return self.gw_ip_block.get_ip(chain_index)
212
213     def get_stream_configs(self):
214         """Get the stream config for a given chain on this device.
215
216         Called by the traffic generator driver to program the traffic generator properly
217         before generating traffic
218         """
219         configs = []
220         # exact flow count for each chain is calculated as follows:
221         # - all chains except the first will have the same flow count
222         #   calculated as (total_flows + chain_count - 1) / chain_count
223         # - the first chain will have the remainder
224         # example 11 flows and 3 chains => 3, 4, 4
225         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
226         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
227         peer = self.get_peer_device()
228         self.ip_block.reset_reservation()
229         peer.ip_block.reset_reservation()
230         dest_macs = self.get_dest_macs()
231
232         for chain_idx in xrange(self.chain_count):
233             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
234             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
235
236             configs.append({
237                 'count': cur_chain_flow_count,
238                 'mac_src': self.mac,
239                 'mac_dst': dest_macs[chain_idx],
240                 'ip_src_addr': src_ip_first,
241                 'ip_src_addr_max': src_ip_last,
242                 'ip_src_count': cur_chain_flow_count,
243                 'ip_dst_addr': dst_ip_first,
244                 'ip_dst_addr_max': dst_ip_last,
245                 'ip_dst_count': cur_chain_flow_count,
246                 'ip_addrs_step': self.ip_addrs_step,
247                 'udp_src_port': self.udp_src_port,
248                 'udp_dst_port': self.udp_dst_port,
249                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
250                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
251                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
252                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None
253             })
254             # after first chain, fall back to the flow count for all other chains
255             cur_chain_flow_count = flows_per_chain
256         return configs
257
258     @staticmethod
259     def ip_to_int(addr):
260         """Convert an IP address from string to numeric."""
261         return struct.unpack("!I", socket.inet_aton(addr))[0]
262
263     @staticmethod
264     def int_to_ip(nvalue):
265         """Convert an IP address from numeric to string."""
266         return socket.inet_ntoa(struct.pack("!I", nvalue))
267
268
269 class GeneratorConfig(object):
270     """Represents traffic configuration for currently running traffic profile."""
271
272     DEFAULT_IP_STEP = '0.0.0.1'
273     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
274
275     def __init__(self, config):
276         """Create a generator config."""
277         self.config = config
278         # name of the generator profile (normally trex or dummy)
279         # pick the default one if not specified explicitly from cli options
280         if not config.generator_profile:
281             config.generator_profile = config.traffic_generator.default_profile
282         # pick up the profile dict based on the name
283         gen_config = self.__match_generator_profile(config.traffic_generator,
284                                                     config.generator_profile)
285         self.gen_config = gen_config
286         # copy over fields from the dict
287         self.tool = gen_config.tool
288         self.ip = gen_config.ip
289         self.cores = gen_config.get('cores', 1)
290         if gen_config.intf_speed:
291             # interface speed is overriden from config
292             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
293         else:
294             # interface speed is discovered/provided by the traffic generator
295             self.intf_speed = 0
296         self.software_mode = gen_config.get('software_mode', False)
297         self.interfaces = gen_config.interfaces
298         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
299             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
300
301         self.service_chain = config.service_chain
302         self.service_chain_count = config.service_chain_count
303         self.flow_count = config.flow_count
304         self.host_name = gen_config.host_name
305
306         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
307         self.ip_addrs = gen_config.ip_addrs
308         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
309         self.tg_gateway_ip_addrs_step = \
310             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
311         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
312         self.gateway_ips = gen_config.gateway_ip_addrs
313         self.udp_src_port = gen_config.udp_src_port
314         self.udp_dst_port = gen_config.udp_dst_port
315         self.devices = [Device(port, self) for port in [0, 1]]
316         # This should normally always be [0, 1]
317         self.ports = [device.port for device in self.devices]
318
319         # check that pci is not empty
320         if not gen_config.interfaces[0].get('pci', None) or \
321            not gen_config.interfaces[1].get('pci', None):
322             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
323
324         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
325         self.vlan_tagging = config.vlan_tagging
326
327         # needed for result/summarizer
328         config['tg-name'] = gen_config.name
329         config['tg-tool'] = self.tool
330
331     def to_json(self):
332         """Get json form to display the content into the overall result dict."""
333         return dict(self.gen_config)
334
335     def set_dest_macs(self, port_index, dest_macs):
336         """Set the list of dest MACs indexed by the chain id on given port.
337
338         port_index: the port for which dest macs must be set
339         dest_macs: a list of dest MACs indexed by chain id
340         """
341         if len(dest_macs) != self.config.service_chain_count:
342             raise TrafficClientException('Dest MAC list %s must have %d entries' %
343                                          (dest_macs, self.config.service_chain_count))
344         self.devices[port_index].set_dest_macs(dest_macs)
345         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
346
347     def get_dest_macs(self):
348         """Return the list of dest macs indexed by port."""
349         return [dev.get_dest_macs() for dev in self.devices]
350
351     def set_vlans(self, port_index, vlans):
352         """Set the list of vlans to use indexed by the chain id on given port.
353
354         port_index: the port for which VLANs must be set
355         vlans: a  list of vlan lists indexed by chain id
356         """
357         if len(vlans) != self.config.service_chain_count:
358             raise TrafficClientException('VLAN list %s must have %d entries' %
359                                          (vlans, self.config.service_chain_count))
360         self.devices[port_index].set_vlans(vlans)
361
362     @staticmethod
363     def __match_generator_profile(traffic_generator, generator_profile):
364         gen_config = AttrDict(traffic_generator)
365         gen_config.pop('default_profile')
366         gen_config.pop('generator_profile')
367         matching_profile = [profile for profile in traffic_generator.generator_profile if
368                             profile.name == generator_profile]
369         if len(matching_profile) != 1:
370             raise Exception('Traffic generator profile not found: ' + generator_profile)
371
372         gen_config.update(matching_profile[0])
373         return gen_config
374
375
376 class TrafficClient(object):
377     """Traffic generator client with NDR/PDR binary seearch."""
378
379     PORTS = [0, 1]
380
381     def __init__(self, config, notifier=None):
382         """Create a new TrafficClient instance.
383
384         config: nfvbench config
385         notifier: notifier (optional)
386
387         A new instance is created everytime the nfvbench config may have changed.
388         """
389         self.config = config
390         self.generator_config = GeneratorConfig(config)
391         self.tool = self.generator_config.tool
392         self.gen = self._get_generator()
393         self.notifier = notifier
394         self.interval_collector = None
395         self.iteration_collector = None
396         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
397         self.config.frame_sizes = self._get_frame_sizes()
398         self.run_config = {
399             'l2frame_size': None,
400             'duration_sec': self.config.duration_sec,
401             'bidirectional': True,
402             'rates': []  # to avoid unsbuscriptable-obj warning
403         }
404         self.current_total_rate = {'rate_percent': '10'}
405         if self.config.single_run:
406             self.current_total_rate = utils.parse_rate_str(self.config.rate)
407         self.ifstats = None
408         # Speed is either discovered when connecting to TG or set from config
409         # This variable is 0 if not yet discovered from TG or must be the speed of
410         # each interface in bits per second
411         self.intf_speed = self.generator_config.intf_speed
412
413     def _get_generator(self):
414         tool = self.tool.lower()
415         if tool == 'trex':
416             from traffic_gen import trex
417             return trex.TRex(self)
418         if tool == 'dummy':
419             from traffic_gen import dummy
420             return dummy.DummyTG(self)
421         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
422
423     def skip_sleep(self):
424         """Skip all sleeps when doing unit testing with dummy TG.
425
426         Must be overriden using mock.patch
427         """
428         return False
429
430     def _get_frame_sizes(self):
431         traffic_profile_name = self.config.traffic.profile
432         matching_profiles = [profile for profile in self.config.traffic_profile if
433                              profile.name == traffic_profile_name]
434         if len(matching_profiles) > 1:
435             raise TrafficClientException('Multiple traffic profiles with name: ' +
436                                          traffic_profile_name)
437         elif not matching_profiles:
438             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
439         return matching_profiles[0].l2frame_size
440
441     def start_traffic_generator(self):
442         """Start the traffic generator process (traffic not started yet)."""
443         self.gen.connect()
444         # pick up the interface speed if it is not set from config
445         intf_speeds = self.gen.get_port_speed_gbps()
446         # convert Gbps unit into bps
447         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
448         if self.intf_speed:
449             # interface speed is overriden from config
450             if self.intf_speed != tg_if_speed:
451                 # Warn the user if the speed in the config is different
452                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
453                             intf_speeds[0])
454         else:
455             # interface speed not provisioned by config
456             self.intf_speed = tg_if_speed
457             # also update the speed in the tg config
458             self.generator_config.intf_speed = tg_if_speed
459
460         # Save the traffic generator local MAC
461         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
462             device.set_mac(mac)
463
464     def setup(self):
465         """Set up the traffic client."""
466         self.gen.clear_stats()
467
468     def get_version(self):
469         """Get the traffic generator version."""
470         return self.gen.get_version()
471
472     def ensure_end_to_end(self):
473         """Ensure traffic generator receives packets it has transmitted.
474
475         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
476
477         VMs that are started and in active state may not pass traffic yet. It is imperative to make
478         sure that all VMs are passing traffic in both directions before starting any benchmarking.
479         To verify this, we need to send at a low frequency bi-directional packets and make sure
480         that we receive all packets back from all VMs. The number of flows is equal to 2 times
481         the number of chains (1 per direction) and we need to make sure we receive packets coming
482         from exactly 2 x chain count different source MAC addresses.
483
484         Example:
485             PVP chain (1 VM per chain)
486             N = 10 (number of chains)
487             Flow count = 20 (number of flows)
488             If the number of unique source MAC addresses from received packets is 20 then
489             all 10 VMs 10 VMs are in operational state.
490         """
491         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
492         # send 2pps on each chain and each direction
493         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
494         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
495
496         # ensures enough traffic is coming back
497         retry_count = (self.config.check_traffic_time_sec +
498                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
499
500         # we expect to see packets coming from 2 unique MAC per chain
501         # because there can be flooding in the case of shared net
502         # we must verify that packets from the right VMs are received
503         # and not just count unique src MAC
504         # create a dict of (port, chain) tuples indexed by dest mac
505         mac_map = {}
506         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
507             for chain, mac in enumerate(dest_macs):
508                 mac_map[mac] = (port, chain)
509         unique_src_mac_count = len(mac_map)
510         for it in xrange(retry_count):
511             self.gen.clear_stats()
512             self.gen.start_traffic()
513             self.gen.start_capture()
514             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
515                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
516                      it + 1, retry_count)
517             if not self.skip_sleep():
518                 time.sleep(self.config.generic_poll_sec)
519             self.gen.stop_traffic()
520             self.gen.fetch_capture_packets()
521             self.gen.stop_capture()
522
523             for packet in self.gen.packet_list:
524                 src_mac = packet['binary'][6:12]
525                 src_mac = ':'.join(["%02x" % ord(x) for x in src_mac])
526                 if src_mac in mac_map:
527                     port, chain = mac_map[src_mac]
528                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
529                              src_mac, chain, port)
530                     mac_map.pop(src_mac, None)
531
532                 if not mac_map:
533                     LOG.info('End-to-end connectivity established')
534                     return
535
536         raise TrafficClientException('End-to-end connectivity cannot be ensured')
537
538     def ensure_arp_successful(self):
539         """Resolve all IP using ARP and throw an exception in case of failure."""
540         dest_macs = self.gen.resolve_arp()
541         if dest_macs:
542             # all dest macs are discovered, saved them into the generator config
543             self.generator_config.set_dest_macs(0, dest_macs[0])
544             self.generator_config.set_dest_macs(1, dest_macs[1])
545         else:
546             raise TrafficClientException('ARP cannot be resolved')
547
548     def set_traffic(self, frame_size, bidirectional):
549         """Reconfigure the traffic generator for a new frame size."""
550         self.run_config['bidirectional'] = bidirectional
551         self.run_config['l2frame_size'] = frame_size
552         self.run_config['rates'] = [self.get_per_direction_rate()]
553         if bidirectional:
554             self.run_config['rates'].append(self.get_per_direction_rate())
555         else:
556             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
557             if unidir_reverse_pps > 0:
558                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
559         # Fix for [NFVBENCH-67], convert the rate string to PPS
560         for idx, rate in enumerate(self.run_config['rates']):
561             if 'rate_pps' not in rate:
562                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
563
564         self.gen.clear_streamblock()
565         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
566
567     def _modify_load(self, load):
568         self.current_total_rate = {'rate_percent': str(load)}
569         rate_per_direction = self.get_per_direction_rate()
570
571         self.gen.modify_rate(rate_per_direction, False)
572         self.run_config['rates'][0] = rate_per_direction
573         if self.run_config['bidirectional']:
574             self.gen.modify_rate(rate_per_direction, True)
575             self.run_config['rates'][1] = rate_per_direction
576
577     def get_ndr_and_pdr(self):
578         """Start the NDR/PDR iteration and return the results."""
579         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
580         targets = {}
581         if self.config.ndr_run:
582             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
583             targets['ndr'] = self.config.measurement.NDR
584         if self.config.pdr_run:
585             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
586             targets['pdr'] = self.config.measurement.PDR
587
588         self.run_config['start_time'] = time.time()
589         self.interval_collector = IntervalCollector(self.run_config['start_time'])
590         self.interval_collector.attach_notifier(self.notifier)
591         self.iteration_collector = IterationCollector(self.run_config['start_time'])
592         results = {}
593         self.__range_search(0.0, 200.0, targets, results)
594
595         results['iteration_stats'] = {
596             'ndr_pdr': self.iteration_collector.get()
597         }
598
599         if self.config.ndr_run:
600             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
601             results['ndr']['time_taken_sec'] = \
602                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
603             if self.config.pdr_run:
604                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
605                 results['pdr']['time_taken_sec'] = \
606                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
607         else:
608             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
609             results['pdr']['time_taken_sec'] = \
610                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
611         return results
612
613     def __get_dropped_rate(self, result):
614         dropped_pkts = result['rx']['dropped_pkts']
615         total_pkts = result['tx']['total_pkts']
616         if not total_pkts:
617             return float('inf')
618         return float(dropped_pkts) / total_pkts * 100
619
620     def get_stats(self):
621         """Collect final stats for previous run."""
622         stats = self.gen.get_stats()
623         retDict = {'total_tx_rate': stats['total_tx_rate']}
624         for port in self.PORTS:
625             retDict[port] = {'tx': {}, 'rx': {}}
626
627         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
628         rx_keys = tx_keys + ['dropped_pkts']
629
630         for port in self.PORTS:
631             for key in tx_keys:
632                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
633             for key in rx_keys:
634                 try:
635                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
636                 except ValueError:
637                     retDict[port]['rx'][key] = 0
638             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
639                 stats[port]['rx']['avg_delay_usec'])
640             retDict[port]['rx']['min_delay_usec'] = cast_integer(
641                 stats[port]['rx']['min_delay_usec'])
642             retDict[port]['rx']['max_delay_usec'] = cast_integer(
643                 stats[port]['rx']['max_delay_usec'])
644             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
645
646         ports = sorted(retDict.keys())
647         if self.run_config['bidirectional']:
648             retDict['overall'] = {'tx': {}, 'rx': {}}
649             for key in tx_keys:
650                 retDict['overall']['tx'][key] = \
651                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
652             for key in rx_keys:
653                 retDict['overall']['rx'][key] = \
654                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
655             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
656                           retDict[ports[1]]['rx']['total_pkts']]
657             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
658                           retDict[ports[1]]['rx']['avg_delay_usec']]
659             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
660                           retDict[ports[1]]['rx']['max_delay_usec']]
661             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
662                           retDict[ports[1]]['rx']['min_delay_usec']]
663             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
664             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
665             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
666             for key in ['pkt_bit_rate', 'pkt_rate']:
667                 for dirc in ['tx', 'rx']:
668                     retDict['overall'][dirc][key] /= 2.0
669         else:
670             retDict['overall'] = retDict[ports[0]]
671         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
672         return retDict
673
674     def __convert_rates(self, rate):
675         return utils.convert_rates(self.run_config['l2frame_size'],
676                                    rate,
677                                    self.intf_speed)
678
679     def __ndr_pdr_found(self, tag, load):
680         rates = self.__convert_rates({'rate_percent': load})
681         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
682         last_stats = self.iteration_collector.peek()
683         self.interval_collector.add_ndr_pdr(tag, last_stats)
684
685     def __format_output_stats(self, stats):
686         for key in self.PORTS + ['overall']:
687             interface = stats[key]
688             stats[key] = {
689                 'tx_pkts': interface['tx']['total_pkts'],
690                 'rx_pkts': interface['rx']['total_pkts'],
691                 'drop_percentage': interface['drop_rate_percent'],
692                 'drop_pct': interface['rx']['dropped_pkts'],
693                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
694                 'max_delay_usec': interface['rx']['max_delay_usec'],
695                 'min_delay_usec': interface['rx']['min_delay_usec'],
696             }
697
698         return stats
699
700     def __targets_found(self, rate, targets, results):
701         for tag, target in targets.iteritems():
702             LOG.info('Found %s (%s) load: %s', tag, target, rate)
703             self.__ndr_pdr_found(tag, rate)
704             results[tag]['timestamp_sec'] = time.time()
705
706     def __range_search(self, left, right, targets, results):
707         """Perform a binary search for a list of targets inside a [left..right] range or rate.
708
709         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
710                 indicating the rate to send on each interface
711         right   the right side of the range to search as a % of line rate
712                 indicating the rate to send on each interface
713         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
714                 ('ndr', 'pdr')
715         results a dict to store results
716         """
717         if not targets:
718             return
719         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
720
721         # Terminate search when gap is less than load epsilon
722         if right - left < self.config.measurement.load_epsilon:
723             self.__targets_found(left, targets, results)
724             return
725
726         # Obtain the average drop rate in for middle load
727         middle = (left + right) / 2.0
728         try:
729             stats, rates = self.__run_search_iteration(middle)
730         except STLError:
731             LOG.exception("Got exception from traffic generator during binary search")
732             self.__targets_found(left, targets, results)
733             return
734         # Split target dicts based on the avg drop rate
735         left_targets = {}
736         right_targets = {}
737         for tag, target in targets.iteritems():
738             if stats['overall']['drop_rate_percent'] <= target:
739                 # record the best possible rate found for this target
740                 results[tag] = rates
741                 results[tag].update({
742                     'load_percent_per_direction': middle,
743                     'stats': self.__format_output_stats(dict(stats)),
744                     'timestamp_sec': None
745                 })
746                 right_targets[tag] = target
747             else:
748                 # initialize to 0 all fields of result for
749                 # the worst case scenario of the binary search (if ndr/pdr is not found)
750                 if tag not in results:
751                     results[tag] = dict.fromkeys(rates, 0)
752                     empty_stats = self.__format_output_stats(dict(stats))
753                     for key in empty_stats:
754                         if isinstance(empty_stats[key], dict):
755                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
756                         else:
757                             empty_stats[key] = 0
758                     results[tag].update({
759                         'load_percent_per_direction': 0,
760                         'stats': empty_stats,
761                         'timestamp_sec': None
762                     })
763                 left_targets[tag] = target
764
765         # search lower half
766         self.__range_search(left, middle, left_targets, results)
767
768         # search upper half only if the upper rate does not exceed
769         # 100%, this only happens when the first search at 100%
770         # yields a DR that is < target DR
771         if middle >= 100:
772             self.__targets_found(100, right_targets, results)
773         else:
774             self.__range_search(middle, right, right_targets, results)
775
776     def __run_search_iteration(self, rate):
777         """Run one iteration at the given rate level.
778
779         rate: the rate to send on each port in percent (0 to 100)
780         """
781         self._modify_load(rate)
782
783         # poll interval stats and collect them
784         for stats in self.run_traffic():
785             self.interval_collector.add(stats)
786             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
787             if time_elapsed_ratio >= 1:
788                 self.cancel_traffic()
789                 if not self.skip_sleep():
790                     time.sleep(self.config.pause_sec)
791         self.interval_collector.reset()
792
793         # get stats from the run
794         stats = self.runner.client.get_stats()
795         current_traffic_config = self._get_traffic_config()
796         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
797                                         stats['total_tx_rate'])
798         if warning is not None:
799             stats['warning'] = warning
800
801         # save reliable stats from whole iteration
802         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
803         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
804         return stats, current_traffic_config['direction-total']
805
806     @staticmethod
807     def log_stats(stats):
808         """Log estimated stats during run."""
809         report = {
810             'datetime': str(datetime.now()),
811             'tx_packets': stats['overall']['tx']['total_pkts'],
812             'rx_packets': stats['overall']['rx']['total_pkts'],
813             'drop_packets': stats['overall']['rx']['dropped_pkts'],
814             'drop_rate_percent': stats['overall']['drop_rate_percent']
815         }
816         LOG.info('TX: %(tx_packets)d; '
817                  'RX: %(rx_packets)d; '
818                  'Est. Dropped: %(drop_packets)d; '
819                  'Est. Drop rate: %(drop_rate_percent).4f%%',
820                  report)
821
822     def run_traffic(self):
823         """Start traffic and return intermediate stats for each interval."""
824         stats = self.runner.run()
825         while self.runner.is_running:
826             self.log_stats(stats)
827             yield stats
828             stats = self.runner.poll_stats()
829             if stats is None:
830                 return
831         self.log_stats(stats)
832         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
833         yield stats
834
835     def cancel_traffic(self):
836         """Stop traffic."""
837         self.runner.stop()
838
839     def _get_traffic_config(self):
840         config = {}
841         load_total = 0.0
842         bps_total = 0.0
843         pps_total = 0.0
844         for idx, rate in enumerate(self.run_config['rates']):
845             key = 'direction-forward' if idx == 0 else 'direction-reverse'
846             config[key] = {
847                 'l2frame_size': self.run_config['l2frame_size'],
848                 'duration_sec': self.run_config['duration_sec']
849             }
850             config[key].update(rate)
851             config[key].update(self.__convert_rates(rate))
852             load_total += float(config[key]['rate_percent'])
853             bps_total += float(config[key]['rate_bps'])
854             pps_total += float(config[key]['rate_pps'])
855         config['direction-total'] = dict(config['direction-forward'])
856         config['direction-total'].update({
857             'rate_percent': load_total,
858             'rate_pps': cast_integer(pps_total),
859             'rate_bps': bps_total
860         })
861
862         return config
863
864     def get_run_config(self, results):
865         """Return configuration which was used for the last run."""
866         r = {}
867         # because we want each direction to have the far end RX rates,
868         # use the far end index (1-idx) to retrieve the RX rates
869         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
870             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
871             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
872             r[key] = {
873                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
874                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
875                 "rx": self.__convert_rates({'rate_pps': rx_rate})
876             }
877
878         total = {}
879         for direction in ['orig', 'tx', 'rx']:
880             total[direction] = {}
881             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
882                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
883
884         r['direction-total'] = total
885         return r
886
887     def insert_interface_stats(self, pps_list):
888         """Insert interface stats to a list of packet path stats.
889
890         pps_list: a list of packet path stats instances indexed by chain index
891
892         This function will insert the packet path stats for the traffic gen ports 0 and 1
893         with itemized per chain tx/rx counters.
894         There will be as many packet path stats as chains.
895         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
896         self.pps_list:
897         [
898         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
899         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
900         ...
901         ]
902         """
903         def get_if_stats(chain_idx):
904             return [InterfaceStats('p' + str(port), self.tool)
905                     for port in range(2)]
906         # keep the list of list of interface stats indexed by the chain id
907         self.ifstats = [get_if_stats(chain_idx)
908                         for chain_idx in range(self.config.service_chain_count)]
909         # note that we need to make a copy of the ifs list so that any modification in the
910         # list from pps will not change the list saved in self.ifstats
911         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
912         # insert the corresponding pps in the passed list
913         pps_list.extend(self.pps_list)
914
915     def update_interface_stats(self, diff=False):
916         """Update all interface stats.
917
918         diff: if False, simply refresh the interface stats values with latest values
919               if True, diff the interface stats with the latest values
920         Make sure that the interface stats inserted in insert_interface_stats() are updated
921         with proper values.
922         self.ifstats:
923         [
924         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
925         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
926         ...
927         ]
928         """
929         if diff:
930             stats = self.gen.get_stats()
931             for chain_idx, ifs in enumerate(self.ifstats):
932                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
933                 # corresponding to the
934                 # port 0 and port 1 for the given chain_idx
935                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
936                 # interface stats for the pps because it could have been modified to contain
937                 # additional interface stats
938                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
939
940
941     @staticmethod
942     def compare_tx_rates(required, actual):
943         """Compare the actual TX rate to the required TX rate."""
944         threshold = 0.9
945         are_different = False
946         try:
947             if float(actual) / required < threshold:
948                 are_different = True
949         except ZeroDivisionError:
950             are_different = True
951
952         if are_different:
953             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
954                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
955                   "to achieve the requested TX rate.".format(r=required, a=actual)
956             LOG.info(msg)
957             return msg
958
959         return None
960
961     def get_per_direction_rate(self):
962         """Get the rate for each direction."""
963         divisor = 2 if self.run_config['bidirectional'] else 1
964         if 'rate_percent' in self.current_total_rate:
965             # don't split rate if it's percentage
966             divisor = 1
967
968         return utils.divide_rate(self.current_total_rate, divisor)
969
970     def close(self):
971         """Close this instance."""
972         try:
973             self.gen.stop_traffic()
974         except Exception:
975             pass
976         self.gen.clear_stats()
977         self.gen.cleanup()