Perform strict src mac check on ensure end to end
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex_stl_lib.api import STLError
27 # pylint: enable=import-error
28
29 from log import LOG
30 from packet_stats import InterfaceStats
31 from packet_stats import PacketPathStats
32 from stats_collector import IntervalCollector
33 from stats_collector import IterationCollector
34 import traffic_gen.traffic_utils as utils
35 from utils import cast_integer
36
37
38 class TrafficClientException(Exception):
39     """Generic traffic client exception."""
40
41     pass
42
43
44 class TrafficRunner(object):
45     """Serialize various steps required to run traffic."""
46
47     def __init__(self, client, duration_sec, interval_sec=0):
48         """Create a traffic runner."""
49         self.client = client
50         self.start_time = None
51         self.duration_sec = duration_sec
52         self.interval_sec = interval_sec
53
54     def run(self):
55         """Clear stats and instruct the traffic generator to start generating traffic."""
56         if self.is_running():
57             return None
58         LOG.info('Running traffic generator')
59         self.client.gen.clear_stats()
60         self.client.gen.start_traffic()
61         self.start_time = time.time()
62         return self.poll_stats()
63
64     def stop(self):
65         """Stop the current run and instruct the traffic generator to stop traffic."""
66         if self.is_running():
67             self.start_time = None
68             self.client.gen.stop_traffic()
69
70     def is_running(self):
71         """Check if a run is still pending."""
72         return self.start_time is not None
73
74     def time_elapsed(self):
75         """Return time elapsed since start of run."""
76         if self.is_running():
77             return time.time() - self.start_time
78         return self.duration_sec
79
80     def poll_stats(self):
81         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
82
83         return: latest stats or None if traffic is stopped
84         """
85         if not self.is_running():
86             return None
87         if self.client.skip_sleep():
88             self.stop()
89             return self.client.get_stats()
90         time_elapsed = self.time_elapsed()
91         if time_elapsed > self.duration_sec:
92             self.stop()
93             return None
94         time_left = self.duration_sec - time_elapsed
95         if self.interval_sec > 0.0:
96             if time_left <= self.interval_sec:
97                 time.sleep(time_left)
98                 self.stop()
99             else:
100                 time.sleep(self.interval_sec)
101         else:
102             time.sleep(self.duration_sec)
103             self.stop()
104         return self.client.get_stats()
105
106
107 class IpBlock(object):
108     """Manage a block of IP addresses."""
109
110     def __init__(self, base_ip, step_ip, count_ip):
111         """Create an IP block."""
112         self.base_ip_int = Device.ip_to_int(base_ip)
113         self.step = Device.ip_to_int(step_ip)
114         self.max_available = count_ip
115         self.next_free = 0
116
117     def get_ip(self, index=0):
118         """Return the IP address at given index."""
119         if index < 0 or index >= self.max_available:
120             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
121         return Device.int_to_ip(self.base_ip_int + index * self.step)
122
123     def reserve_ip_range(self, count):
124         """Reserve a range of count consecutive IP addresses spaced by step."""
125         if self.next_free + count > self.max_available:
126             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
127                              (self.next_free,
128                               self.max_available,
129                               count))
130         first_ip = self.get_ip(self.next_free)
131         last_ip = self.get_ip(self.next_free + count - 1)
132         self.next_free += count
133         return (first_ip, last_ip)
134
135     def reset_reservation(self):
136         """Reset all reservations and restart with a completely unused IP block."""
137         self.next_free = 0
138
139
140 class Device(object):
141     """Represent a port device and all information associated to it.
142
143     In the curent version we only support 2 port devices for the traffic generator
144     identified as port 0 or port 1.
145     """
146
147     def __init__(self, port, generator_config, vtep_vlan=None):
148         """Create a new device for a given port."""
149         self.generator_config = generator_config
150         self.chain_count = generator_config.service_chain_count
151         self.flow_count = generator_config.flow_count / 2
152         self.port = port
153         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
154         self.vtep_vlan = vtep_vlan
155         self.pci = generator_config.interfaces[port].pci
156         self.mac = None
157         self.dest_macs = None
158         self.vlans = None
159         self.ip_addrs = generator_config.ip_addrs[port]
160         subnet = IPNetwork(self.ip_addrs)
161         self.ip = subnet.ip.format()
162         self.ip_addrs_step = generator_config.ip_addrs_step
163         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
164         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
165                                    generator_config.gateway_ip_addrs_step,
166                                    self.chain_count)
167         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
168         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
169                                       generator_config.tg_gateway_ip_addrs_step,
170                                       self.chain_count)
171         self.udp_src_port = generator_config.udp_src_port
172         self.udp_dst_port = generator_config.udp_dst_port
173
174     def set_mac(self, mac):
175         """Set the local MAC for this port device."""
176         if mac is None:
177             raise TrafficClientException('Trying to set traffic generator MAC address as None')
178         self.mac = mac
179
180     def get_peer_device(self):
181         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
182         return self.generator_config.devices[1 - self.port]
183
184     def set_dest_macs(self, dest_macs):
185         """Set the list of dest MACs indexed by the chain id.
186
187         This is only called in 2 cases:
188         - VM macs discovered using openstack API
189         - dest MACs provisioned in config file
190         """
191         self.dest_macs = map(str, dest_macs)
192
193     def get_dest_macs(self):
194         """Get the list of dest macs for this device.
195
196         If set_dest_macs was never called, assumes l2-loopback and return
197         a list of peer mac (as many as chains but normally only 1 chain)
198         """
199         if self.dest_macs:
200             return self.dest_macs
201         # assume this is l2-loopback
202         return [self.get_peer_device().mac] * self.chain_count
203
204     def set_vlans(self, vlans):
205         """Set the list of vlans to use indexed by the chain id."""
206         self.vlans = vlans
207         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
208
209     def get_gw_ip(self, chain_index):
210         """Retrieve the IP address assigned for the gateway of a given chain."""
211         return self.gw_ip_block.get_ip(chain_index)
212
213     def get_stream_configs(self):
214         """Get the stream config for a given chain on this device.
215
216         Called by the traffic generator driver to program the traffic generator properly
217         before generating traffic
218         """
219         configs = []
220         # exact flow count for each chain is calculated as follows:
221         # - all chains except the first will have the same flow count
222         #   calculated as (total_flows + chain_count - 1) / chain_count
223         # - the first chain will have the remainder
224         # example 11 flows and 3 chains => 3, 4, 4
225         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
226         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
227         peer = self.get_peer_device()
228         self.ip_block.reset_reservation()
229         peer.ip_block.reset_reservation()
230         dest_macs = self.get_dest_macs()
231
232         for chain_idx in xrange(self.chain_count):
233             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
234             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
235
236             configs.append({
237                 'count': cur_chain_flow_count,
238                 'mac_src': self.mac,
239                 'mac_dst': dest_macs[chain_idx],
240                 'ip_src_addr': src_ip_first,
241                 'ip_src_addr_max': src_ip_last,
242                 'ip_src_count': cur_chain_flow_count,
243                 'ip_dst_addr': dst_ip_first,
244                 'ip_dst_addr_max': dst_ip_last,
245                 'ip_dst_count': cur_chain_flow_count,
246                 'ip_addrs_step': self.ip_addrs_step,
247                 'udp_src_port': self.udp_src_port,
248                 'udp_dst_port': self.udp_dst_port,
249                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
250                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
251                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
252                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None
253             })
254             # after first chain, fall back to the flow count for all other chains
255             cur_chain_flow_count = flows_per_chain
256         return configs
257
258     @staticmethod
259     def ip_to_int(addr):
260         """Convert an IP address from string to numeric."""
261         return struct.unpack("!I", socket.inet_aton(addr))[0]
262
263     @staticmethod
264     def int_to_ip(nvalue):
265         """Convert an IP address from numeric to string."""
266         return socket.inet_ntoa(struct.pack("!I", nvalue))
267
268
269 class GeneratorConfig(object):
270     """Represents traffic configuration for currently running traffic profile."""
271
272     DEFAULT_IP_STEP = '0.0.0.1'
273     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
274
275     def __init__(self, config):
276         """Create a generator config."""
277         self.config = config
278         # name of the generator profile (normally trex or dummy)
279         # pick the default one if not specified explicitly from cli options
280         if not config.generator_profile:
281             config.generator_profile = config.traffic_generator.default_profile
282         # pick up the profile dict based on the name
283         gen_config = self.__match_generator_profile(config.traffic_generator,
284                                                     config.generator_profile)
285         self.gen_config = gen_config
286         # copy over fields from the dict
287         self.tool = gen_config.tool
288         self.ip = gen_config.ip
289         self.cores = gen_config.get('cores', 1)
290         if gen_config.intf_speed:
291             # interface speed is overriden from config
292             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
293         else:
294             # interface speed is discovered/provided by the traffic generator
295             self.intf_speed = 0
296         self.software_mode = gen_config.get('software_mode', False)
297         self.interfaces = gen_config.interfaces
298         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
299             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
300
301         self.service_chain = config.service_chain
302         self.service_chain_count = config.service_chain_count
303         self.flow_count = config.flow_count
304         self.host_name = gen_config.host_name
305
306         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
307         self.ip_addrs = gen_config.ip_addrs
308         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
309         self.tg_gateway_ip_addrs_step = \
310             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
311         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
312         self.gateway_ips = gen_config.gateway_ip_addrs
313         self.udp_src_port = gen_config.udp_src_port
314         self.udp_dst_port = gen_config.udp_dst_port
315         self.devices = [Device(port, self) for port in [0, 1]]
316         # This should normally always be [0, 1]
317         self.ports = [device.port for device in self.devices]
318
319         # check that pci is not empty
320         if not gen_config.interfaces[0].get('pci', None) or \
321            not gen_config.interfaces[1].get('pci', None):
322             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
323
324         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
325         self.vlan_tagging = config.vlan_tagging
326
327         # needed for result/summarizer
328         config['tg-name'] = gen_config.name
329         config['tg-tool'] = self.tool
330
331     def to_json(self):
332         """Get json form to display the content into the overall result dict."""
333         return dict(self.gen_config)
334
335     def set_dest_macs(self, port_index, dest_macs):
336         """Set the list of dest MACs indexed by the chain id on given port.
337
338         port_index: the port for which dest macs must be set
339         dest_macs: a list of dest MACs indexed by chain id
340         """
341         if len(dest_macs) != self.config.service_chain_count:
342             raise TrafficClientException('Dest MAC list %s must have %d entries' %
343                                          (dest_macs, self.config.service_chain_count))
344         self.devices[port_index].set_dest_macs(dest_macs)
345         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
346
347     def get_dest_macs(self):
348         """Return the list of dest macs indexed by port."""
349         return [dev.get_dest_macs() for dev in self.devices]
350
351     def set_vlans(self, port_index, vlans):
352         """Set the list of vlans to use indexed by the chain id on given port.
353
354         port_index: the port for which VLANs must be set
355         vlans: a  list of vlan lists indexed by chain id
356         """
357         if len(vlans) != self.config.service_chain_count:
358             raise TrafficClientException('VLAN list %s must have %d entries' %
359                                          (vlans, self.config.service_chain_count))
360         self.devices[port_index].set_vlans(vlans)
361
362     @staticmethod
363     def __match_generator_profile(traffic_generator, generator_profile):
364         gen_config = AttrDict(traffic_generator)
365         gen_config.pop('default_profile')
366         gen_config.pop('generator_profile')
367         matching_profile = [profile for profile in traffic_generator.generator_profile if
368                             profile.name == generator_profile]
369         if len(matching_profile) != 1:
370             raise Exception('Traffic generator profile not found: ' + generator_profile)
371
372         gen_config.update(matching_profile[0])
373         return gen_config
374
375
376 class TrafficClient(object):
377     """Traffic generator client with NDR/PDR binary seearch."""
378
379     PORTS = [0, 1]
380
381     def __init__(self, config, notifier=None):
382         """Create a new TrafficClient instance.
383
384         config: nfvbench config
385         notifier: notifier (optional)
386
387         A new instance is created everytime the nfvbench config may have changed.
388         """
389         self.config = config
390         self.generator_config = GeneratorConfig(config)
391         self.tool = self.generator_config.tool
392         self.gen = self._get_generator()
393         self.notifier = notifier
394         self.interval_collector = None
395         self.iteration_collector = None
396         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
397         self.config.frame_sizes = self._get_frame_sizes()
398         self.run_config = {
399             'l2frame_size': None,
400             'duration_sec': self.config.duration_sec,
401             'bidirectional': True,
402             'rates': []  # to avoid unsbuscriptable-obj warning
403         }
404         self.current_total_rate = {'rate_percent': '10'}
405         if self.config.single_run:
406             self.current_total_rate = utils.parse_rate_str(self.config.rate)
407         self.ifstats = None
408         # Speed is either discovered when connecting to TG or set from config
409         # This variable is 0 if not yet discovered from TG or must be the speed of
410         # each interface in bits per second
411         self.intf_speed = self.generator_config.intf_speed
412
413     def _get_generator(self):
414         tool = self.tool.lower()
415         if tool == 'trex':
416             from traffic_gen import trex
417             return trex.TRex(self)
418         if tool == 'dummy':
419             from traffic_gen import dummy
420             return dummy.DummyTG(self)
421         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
422
423     def skip_sleep(self):
424         """Skip all sleeps when doing unit testing with dummy TG.
425
426         Must be overriden using mock.patch
427         """
428         return False
429
430     def _get_frame_sizes(self):
431         traffic_profile_name = self.config.traffic.profile
432         matching_profiles = [profile for profile in self.config.traffic_profile if
433                              profile.name == traffic_profile_name]
434         if len(matching_profiles) > 1:
435             raise TrafficClientException('Multiple traffic profiles with name: ' +
436                                          traffic_profile_name)
437         elif not matching_profiles:
438             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
439         return matching_profiles[0].l2frame_size
440
441     def start_traffic_generator(self):
442         """Start the traffic generator process (traffic not started yet)."""
443         self.gen.connect()
444         # pick up the interface speed if it is not set from config
445         intf_speeds = self.gen.get_port_speed_gbps()
446         # convert Gbps unit into bps
447         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
448         if self.intf_speed:
449             # interface speed is overriden from config
450             if self.intf_speed != tg_if_speed:
451                 # Warn the user if the speed in the config is different
452                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
453                             intf_speeds[0])
454         else:
455             # interface speed not provisioned by config
456             self.intf_speed = tg_if_speed
457             # also update the speed in the tg config
458             self.generator_config.intf_speed = tg_if_speed
459
460         # Save the traffic generator local MAC
461         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
462             device.set_mac(mac)
463
464     def setup(self):
465         """Set up the traffic client."""
466         self.gen.set_mode()
467         self.gen.clear_stats()
468
469     def get_version(self):
470         """Get the traffic generator version."""
471         return self.gen.get_version()
472
473     def ensure_end_to_end(self):
474         """Ensure traffic generator receives packets it has transmitted.
475
476         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
477
478         VMs that are started and in active state may not pass traffic yet. It is imperative to make
479         sure that all VMs are passing traffic in both directions before starting any benchmarking.
480         To verify this, we need to send at a low frequency bi-directional packets and make sure
481         that we receive all packets back from all VMs. The number of flows is equal to 2 times
482         the number of chains (1 per direction) and we need to make sure we receive packets coming
483         from exactly 2 x chain count different source MAC addresses.
484
485         Example:
486             PVP chain (1 VM per chain)
487             N = 10 (number of chains)
488             Flow count = 20 (number of flows)
489             If the number of unique source MAC addresses from received packets is 20 then
490             all 10 VMs 10 VMs are in operational state.
491         """
492         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
493         # send 2pps on each chain and each direction
494         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
495         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
496
497         # ensures enough traffic is coming back
498         retry_count = (self.config.check_traffic_time_sec +
499                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
500
501         # we expect to see packets coming from 2 unique MAC per chain
502         # because there can be flooding in the case of shared net
503         # we must verify that packets from the right VMs are received
504         # and not just count unique src MAC
505         # create a dict of (port, chain) tuples indexed by dest mac
506         mac_map = {}
507         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
508             for chain, mac in enumerate(dest_macs):
509                 mac_map[mac] = (port, chain)
510         unique_src_mac_count = len(mac_map)
511         for it in xrange(retry_count):
512             self.gen.clear_stats()
513             self.gen.start_traffic()
514             self.gen.start_capture()
515             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
516                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
517                      it + 1, retry_count)
518             if not self.skip_sleep():
519                 time.sleep(self.config.generic_poll_sec)
520             self.gen.stop_traffic()
521             self.gen.fetch_capture_packets()
522             self.gen.stop_capture()
523
524             for packet in self.gen.packet_list:
525                 src_mac = packet['binary'][6:12]
526                 src_mac = ':'.join(["%02x" % ord(x) for x in src_mac])
527                 if src_mac in mac_map:
528                     port, chain = mac_map[src_mac]
529                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
530                              src_mac, chain, port)
531                     mac_map.pop(src_mac, None)
532
533                 if not mac_map:
534                     LOG.info('End-to-end connectivity established')
535                     return
536
537         raise TrafficClientException('End-to-end connectivity cannot be ensured')
538
539     def ensure_arp_successful(self):
540         """Resolve all IP using ARP and throw an exception in case of failure."""
541         dest_macs = self.gen.resolve_arp()
542         if dest_macs:
543             # all dest macs are discovered, saved them into the generator config
544             self.generator_config.set_dest_macs(0, dest_macs[0])
545             self.generator_config.set_dest_macs(1, dest_macs[1])
546         else:
547             raise TrafficClientException('ARP cannot be resolved')
548
549     def set_traffic(self, frame_size, bidirectional):
550         """Reconfigure the traffic generator for a new frame size."""
551         self.run_config['bidirectional'] = bidirectional
552         self.run_config['l2frame_size'] = frame_size
553         self.run_config['rates'] = [self.get_per_direction_rate()]
554         if bidirectional:
555             self.run_config['rates'].append(self.get_per_direction_rate())
556         else:
557             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
558             if unidir_reverse_pps > 0:
559                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
560         # Fix for [NFVBENCH-67], convert the rate string to PPS
561         for idx, rate in enumerate(self.run_config['rates']):
562             if 'rate_pps' not in rate:
563                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
564
565         self.gen.clear_streamblock()
566         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
567
568     def _modify_load(self, load):
569         self.current_total_rate = {'rate_percent': str(load)}
570         rate_per_direction = self.get_per_direction_rate()
571
572         self.gen.modify_rate(rate_per_direction, False)
573         self.run_config['rates'][0] = rate_per_direction
574         if self.run_config['bidirectional']:
575             self.gen.modify_rate(rate_per_direction, True)
576             self.run_config['rates'][1] = rate_per_direction
577
578     def get_ndr_and_pdr(self):
579         """Start the NDR/PDR iteration and return the results."""
580         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
581         targets = {}
582         if self.config.ndr_run:
583             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
584             targets['ndr'] = self.config.measurement.NDR
585         if self.config.pdr_run:
586             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
587             targets['pdr'] = self.config.measurement.PDR
588
589         self.run_config['start_time'] = time.time()
590         self.interval_collector = IntervalCollector(self.run_config['start_time'])
591         self.interval_collector.attach_notifier(self.notifier)
592         self.iteration_collector = IterationCollector(self.run_config['start_time'])
593         results = {}
594         self.__range_search(0.0, 200.0, targets, results)
595
596         results['iteration_stats'] = {
597             'ndr_pdr': self.iteration_collector.get()
598         }
599
600         if self.config.ndr_run:
601             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
602             results['ndr']['time_taken_sec'] = \
603                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
604             if self.config.pdr_run:
605                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
606                 results['pdr']['time_taken_sec'] = \
607                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
608         else:
609             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
610             results['pdr']['time_taken_sec'] = \
611                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
612         return results
613
614     def __get_dropped_rate(self, result):
615         dropped_pkts = result['rx']['dropped_pkts']
616         total_pkts = result['tx']['total_pkts']
617         if not total_pkts:
618             return float('inf')
619         return float(dropped_pkts) / total_pkts * 100
620
621     def get_stats(self):
622         """Collect final stats for previous run."""
623         stats = self.gen.get_stats()
624         retDict = {'total_tx_rate': stats['total_tx_rate']}
625         for port in self.PORTS:
626             retDict[port] = {'tx': {}, 'rx': {}}
627
628         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
629         rx_keys = tx_keys + ['dropped_pkts']
630
631         for port in self.PORTS:
632             for key in tx_keys:
633                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
634             for key in rx_keys:
635                 try:
636                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
637                 except ValueError:
638                     retDict[port]['rx'][key] = 0
639             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
640                 stats[port]['rx']['avg_delay_usec'])
641             retDict[port]['rx']['min_delay_usec'] = cast_integer(
642                 stats[port]['rx']['min_delay_usec'])
643             retDict[port]['rx']['max_delay_usec'] = cast_integer(
644                 stats[port]['rx']['max_delay_usec'])
645             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
646
647         ports = sorted(retDict.keys())
648         if self.run_config['bidirectional']:
649             retDict['overall'] = {'tx': {}, 'rx': {}}
650             for key in tx_keys:
651                 retDict['overall']['tx'][key] = \
652                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
653             for key in rx_keys:
654                 retDict['overall']['rx'][key] = \
655                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
656             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
657                           retDict[ports[1]]['rx']['total_pkts']]
658             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
659                           retDict[ports[1]]['rx']['avg_delay_usec']]
660             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
661                           retDict[ports[1]]['rx']['max_delay_usec']]
662             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
663                           retDict[ports[1]]['rx']['min_delay_usec']]
664             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
665             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
666             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
667             for key in ['pkt_bit_rate', 'pkt_rate']:
668                 for dirc in ['tx', 'rx']:
669                     retDict['overall'][dirc][key] /= 2.0
670         else:
671             retDict['overall'] = retDict[ports[0]]
672         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
673         return retDict
674
675     def __convert_rates(self, rate):
676         return utils.convert_rates(self.run_config['l2frame_size'],
677                                    rate,
678                                    self.intf_speed)
679
680     def __ndr_pdr_found(self, tag, load):
681         rates = self.__convert_rates({'rate_percent': load})
682         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
683         last_stats = self.iteration_collector.peek()
684         self.interval_collector.add_ndr_pdr(tag, last_stats)
685
686     def __format_output_stats(self, stats):
687         for key in self.PORTS + ['overall']:
688             interface = stats[key]
689             stats[key] = {
690                 'tx_pkts': interface['tx']['total_pkts'],
691                 'rx_pkts': interface['rx']['total_pkts'],
692                 'drop_percentage': interface['drop_rate_percent'],
693                 'drop_pct': interface['rx']['dropped_pkts'],
694                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
695                 'max_delay_usec': interface['rx']['max_delay_usec'],
696                 'min_delay_usec': interface['rx']['min_delay_usec'],
697             }
698
699         return stats
700
701     def __targets_found(self, rate, targets, results):
702         for tag, target in targets.iteritems():
703             LOG.info('Found %s (%s) load: %s', tag, target, rate)
704             self.__ndr_pdr_found(tag, rate)
705             results[tag]['timestamp_sec'] = time.time()
706
707     def __range_search(self, left, right, targets, results):
708         """Perform a binary search for a list of targets inside a [left..right] range or rate.
709
710         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
711                 indicating the rate to send on each interface
712         right   the right side of the range to search as a % of line rate
713                 indicating the rate to send on each interface
714         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
715                 ('ndr', 'pdr')
716         results a dict to store results
717         """
718         if not targets:
719             return
720         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
721
722         # Terminate search when gap is less than load epsilon
723         if right - left < self.config.measurement.load_epsilon:
724             self.__targets_found(left, targets, results)
725             return
726
727         # Obtain the average drop rate in for middle load
728         middle = (left + right) / 2.0
729         try:
730             stats, rates = self.__run_search_iteration(middle)
731         except STLError:
732             LOG.exception("Got exception from traffic generator during binary search")
733             self.__targets_found(left, targets, results)
734             return
735         # Split target dicts based on the avg drop rate
736         left_targets = {}
737         right_targets = {}
738         for tag, target in targets.iteritems():
739             if stats['overall']['drop_rate_percent'] <= target:
740                 # record the best possible rate found for this target
741                 results[tag] = rates
742                 results[tag].update({
743                     'load_percent_per_direction': middle,
744                     'stats': self.__format_output_stats(dict(stats)),
745                     'timestamp_sec': None
746                 })
747                 right_targets[tag] = target
748             else:
749                 # initialize to 0 all fields of result for
750                 # the worst case scenario of the binary search (if ndr/pdr is not found)
751                 if tag not in results:
752                     results[tag] = dict.fromkeys(rates, 0)
753                     empty_stats = self.__format_output_stats(dict(stats))
754                     for key in empty_stats:
755                         if isinstance(empty_stats[key], dict):
756                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
757                         else:
758                             empty_stats[key] = 0
759                     results[tag].update({
760                         'load_percent_per_direction': 0,
761                         'stats': empty_stats,
762                         'timestamp_sec': None
763                     })
764                 left_targets[tag] = target
765
766         # search lower half
767         self.__range_search(left, middle, left_targets, results)
768
769         # search upper half only if the upper rate does not exceed
770         # 100%, this only happens when the first search at 100%
771         # yields a DR that is < target DR
772         if middle >= 100:
773             self.__targets_found(100, right_targets, results)
774         else:
775             self.__range_search(middle, right, right_targets, results)
776
777     def __run_search_iteration(self, rate):
778         """Run one iteration at the given rate level.
779
780         rate: the rate to send on each port in percent (0 to 100)
781         """
782         self._modify_load(rate)
783
784         # poll interval stats and collect them
785         for stats in self.run_traffic():
786             self.interval_collector.add(stats)
787             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
788             if time_elapsed_ratio >= 1:
789                 self.cancel_traffic()
790                 if not self.skip_sleep():
791                     time.sleep(self.config.pause_sec)
792         self.interval_collector.reset()
793
794         # get stats from the run
795         stats = self.runner.client.get_stats()
796         current_traffic_config = self._get_traffic_config()
797         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
798                                         stats['total_tx_rate'])
799         if warning is not None:
800             stats['warning'] = warning
801
802         # save reliable stats from whole iteration
803         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
804         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
805         return stats, current_traffic_config['direction-total']
806
807     @staticmethod
808     def log_stats(stats):
809         """Log estimated stats during run."""
810         report = {
811             'datetime': str(datetime.now()),
812             'tx_packets': stats['overall']['tx']['total_pkts'],
813             'rx_packets': stats['overall']['rx']['total_pkts'],
814             'drop_packets': stats['overall']['rx']['dropped_pkts'],
815             'drop_rate_percent': stats['overall']['drop_rate_percent']
816         }
817         LOG.info('TX: %(tx_packets)d; '
818                  'RX: %(rx_packets)d; '
819                  'Est. Dropped: %(drop_packets)d; '
820                  'Est. Drop rate: %(drop_rate_percent).4f%%',
821                  report)
822
823     def run_traffic(self):
824         """Start traffic and return intermediate stats for each interval."""
825         stats = self.runner.run()
826         while self.runner.is_running:
827             self.log_stats(stats)
828             yield stats
829             stats = self.runner.poll_stats()
830             if stats is None:
831                 return
832         self.log_stats(stats)
833         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
834         yield stats
835
836     def cancel_traffic(self):
837         """Stop traffic."""
838         self.runner.stop()
839
840     def _get_traffic_config(self):
841         config = {}
842         load_total = 0.0
843         bps_total = 0.0
844         pps_total = 0.0
845         for idx, rate in enumerate(self.run_config['rates']):
846             key = 'direction-forward' if idx == 0 else 'direction-reverse'
847             config[key] = {
848                 'l2frame_size': self.run_config['l2frame_size'],
849                 'duration_sec': self.run_config['duration_sec']
850             }
851             config[key].update(rate)
852             config[key].update(self.__convert_rates(rate))
853             load_total += float(config[key]['rate_percent'])
854             bps_total += float(config[key]['rate_bps'])
855             pps_total += float(config[key]['rate_pps'])
856         config['direction-total'] = dict(config['direction-forward'])
857         config['direction-total'].update({
858             'rate_percent': load_total,
859             'rate_pps': cast_integer(pps_total),
860             'rate_bps': bps_total
861         })
862
863         return config
864
865     def get_run_config(self, results):
866         """Return configuration which was used for the last run."""
867         r = {}
868         # because we want each direction to have the far end RX rates,
869         # use the far end index (1-idx) to retrieve the RX rates
870         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
871             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
872             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
873             r[key] = {
874                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
875                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
876                 "rx": self.__convert_rates({'rate_pps': rx_rate})
877             }
878
879         total = {}
880         for direction in ['orig', 'tx', 'rx']:
881             total[direction] = {}
882             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
883                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
884
885         r['direction-total'] = total
886         return r
887
888     def insert_interface_stats(self, pps_list):
889         """Insert interface stats to a list of packet path stats.
890
891         pps_list: a list of packet path stats instances indexed by chain index
892
893         This function will insert the packet path stats for the traffic gen ports 0 and 1
894         with itemized per chain tx/rx counters.
895         There will be as many packet path stats as chains.
896         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
897         self.pps_list:
898         [
899         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
900         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
901         ...
902         ]
903         """
904         def get_if_stats(chain_idx):
905             return [InterfaceStats('p' + str(port), self.tool)
906                     for port in range(2)]
907         # keep the list of list of interface stats indexed by the chain id
908         self.ifstats = [get_if_stats(chain_idx)
909                         for chain_idx in range(self.config.service_chain_count)]
910         # note that we need to make a copy of the ifs list so that any modification in the
911         # list from pps will not change the list saved in self.ifstats
912         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
913         # insert the corresponding pps in the passed list
914         pps_list.extend(self.pps_list)
915
916     def update_interface_stats(self, diff=False):
917         """Update all interface stats.
918
919         diff: if False, simply refresh the interface stats values with latest values
920               if True, diff the interface stats with the latest values
921         Make sure that the interface stats inserted in insert_interface_stats() are updated
922         with proper values.
923         self.ifstats:
924         [
925         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
926         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
927         ...
928         ]
929         """
930         if diff:
931             stats = self.gen.get_stats()
932             for chain_idx, ifs in enumerate(self.ifstats):
933                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
934                 # corresponding to the
935                 # port 0 and port 1 for the given chain_idx
936                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
937                 # interface stats for the pps because it could have been modified to contain
938                 # additional interface stats
939                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
940
941
942     @staticmethod
943     def compare_tx_rates(required, actual):
944         """Compare the actual TX rate to the required TX rate."""
945         threshold = 0.9
946         are_different = False
947         try:
948             if float(actual) / required < threshold:
949                 are_different = True
950         except ZeroDivisionError:
951             are_different = True
952
953         if are_different:
954             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
955                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
956                   "to achieve the requested TX rate.".format(r=required, a=actual)
957             LOG.info(msg)
958             return msg
959
960         return None
961
962     def get_per_direction_rate(self):
963         """Get the rate for each direction."""
964         divisor = 2 if self.run_config['bidirectional'] else 1
965         if 'rate_percent' in self.current_total_rate:
966             # don't split rate if it's percentage
967             divisor = 1
968
969         return utils.divide_rate(self.current_total_rate, divisor)
970
971     def close(self):
972         """Close this instance."""
973         try:
974             self.gen.stop_traffic()
975         except Exception:
976             pass
977         self.gen.clear_stats()
978         self.gen.cleanup()