2.0 beta NFVBENCH-91 Allow multi-chaining with separate edge networks
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex_stl_lib.api import STLError
27 # pylint: enable=import-error
28
29 from log import LOG
30 from packet_stats import InterfaceStats
31 from packet_stats import PacketPathStats
32 from stats_collector import IntervalCollector
33 from stats_collector import IterationCollector
34 import traffic_gen.traffic_utils as utils
35 from utils import cast_integer
36
37
38 class TrafficClientException(Exception):
39     """Generic traffic client exception."""
40
41     pass
42
43
44 class TrafficRunner(object):
45     """Serialize various steps required to run traffic."""
46
47     def __init__(self, client, duration_sec, interval_sec=0):
48         """Create a traffic runner."""
49         self.client = client
50         self.start_time = None
51         self.duration_sec = duration_sec
52         self.interval_sec = interval_sec
53
54     def run(self):
55         """Clear stats and instruct the traffic generator to start generating traffic."""
56         if self.is_running():
57             return None
58         LOG.info('Running traffic generator')
59         self.client.gen.clear_stats()
60         self.client.gen.start_traffic()
61         self.start_time = time.time()
62         return self.poll_stats()
63
64     def stop(self):
65         """Stop the current run and instruct the traffic generator to stop traffic."""
66         if self.is_running():
67             self.start_time = None
68             self.client.gen.stop_traffic()
69
70     def is_running(self):
71         """Check if a run is still pending."""
72         return self.start_time is not None
73
74     def time_elapsed(self):
75         """Return time elapsed since start of run."""
76         if self.is_running():
77             return time.time() - self.start_time
78         return self.duration_sec
79
80     def poll_stats(self):
81         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
82
83         return: latest stats or None if traffic is stopped
84         """
85         if not self.is_running():
86             return None
87         if self.client.skip_sleep():
88             self.stop()
89             return self.client.get_stats()
90         time_elapsed = self.time_elapsed()
91         if time_elapsed > self.duration_sec:
92             self.stop()
93             return None
94         time_left = self.duration_sec - time_elapsed
95         if self.interval_sec > 0.0:
96             if time_left <= self.interval_sec:
97                 time.sleep(time_left)
98                 self.stop()
99             else:
100                 time.sleep(self.interval_sec)
101         else:
102             time.sleep(self.duration_sec)
103             self.stop()
104         return self.client.get_stats()
105
106
107 class IpBlock(object):
108     """Manage a block of IP addresses."""
109
110     def __init__(self, base_ip, step_ip, count_ip):
111         """Create an IP block."""
112         self.base_ip_int = Device.ip_to_int(base_ip)
113         self.step = Device.ip_to_int(step_ip)
114         self.max_available = count_ip
115         self.next_free = 0
116
117     def get_ip(self, index=0):
118         """Return the IP address at given index."""
119         if index < 0 or index >= self.max_available:
120             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
121         return Device.int_to_ip(self.base_ip_int + index * self.step)
122
123     def reserve_ip_range(self, count):
124         """Reserve a range of count consecutive IP addresses spaced by step."""
125         if self.next_free + count > self.max_available:
126             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
127                              (self.next_free,
128                               self.max_available,
129                               count))
130         first_ip = self.get_ip(self.next_free)
131         last_ip = self.get_ip(self.next_free + count - 1)
132         self.next_free += count
133         return (first_ip, last_ip)
134
135     def reset_reservation(self):
136         """Reset all reservations and restart with a completely unused IP block."""
137         self.next_free = 0
138
139
140 class Device(object):
141     """Represent a port device and all information associated to it.
142
143     In the curent version we only support 2 port devices for the traffic generator
144     identified as port 0 or port 1.
145     """
146
147     def __init__(self, port, generator_config, vtep_vlan=None):
148         """Create a new device for a given port."""
149         self.generator_config = generator_config
150         self.chain_count = generator_config.service_chain_count
151         self.flow_count = generator_config.flow_count / 2
152         self.port = port
153         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
154         self.vtep_vlan = vtep_vlan
155         self.pci = generator_config.interfaces[port].pci
156         self.mac = None
157         self.dest_macs = None
158         self.vlans = None
159         self.ip_addrs = generator_config.ip_addrs[port]
160         subnet = IPNetwork(self.ip_addrs)
161         self.ip = subnet.ip.format()
162         self.ip_addrs_step = generator_config.ip_addrs_step
163         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
164         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
165                                    generator_config.gateway_ip_addrs_step,
166                                    self.chain_count)
167         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
168         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
169                                       generator_config.tg_gateway_ip_addrs_step,
170                                       self.chain_count)
171         self.udp_src_port = generator_config.udp_src_port
172         self.udp_dst_port = generator_config.udp_dst_port
173
174     def set_mac(self, mac):
175         """Set the local MAC for this port device."""
176         if mac is None:
177             raise TrafficClientException('Trying to set traffic generator MAC address as None')
178         self.mac = mac
179
180     def get_peer_device(self):
181         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
182         return self.generator_config.devices[1 - self.port]
183
184     def set_dest_macs(self, dest_macs):
185         """Set the list of dest MACs indexed by the chain id."""
186         self.dest_macs = map(str, dest_macs)
187
188     def set_vlans(self, vlans):
189         """Set the list of vlans to use indexed by the chain id."""
190         self.vlans = vlans
191         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
192
193     def get_gw_ip(self, chain_index):
194         """Retrieve the IP address assigned for the gateway of a given chain."""
195         return self.gw_ip_block.get_ip(chain_index)
196
197     def get_stream_configs(self):
198         """Get the stream config for a given chain on this device.
199
200         Called by the traffic generator driver to program the traffic generator properly
201         before generating traffic
202         """
203         configs = []
204         # exact flow count for each chain is calculated as follows:
205         # - all chains except the first will have the same flow count
206         #   calculated as (total_flows + chain_count - 1) / chain_count
207         # - the first chain will have the remainder
208         # example 11 flows and 3 chains => 3, 4, 4
209         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
210         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
211         peer = self.get_peer_device()
212         self.ip_block.reset_reservation()
213         peer.ip_block.reset_reservation()
214
215         for chain_idx in xrange(self.chain_count):
216             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
217             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
218
219             dest_mac = self.dest_macs[chain_idx] if self.dest_macs else peer.mac
220             configs.append({
221                 'count': cur_chain_flow_count,
222                 'mac_src': self.mac,
223                 'mac_dst': dest_mac,
224                 'ip_src_addr': src_ip_first,
225                 'ip_src_addr_max': src_ip_last,
226                 'ip_src_count': cur_chain_flow_count,
227                 'ip_dst_addr': dst_ip_first,
228                 'ip_dst_addr_max': dst_ip_last,
229                 'ip_dst_count': cur_chain_flow_count,
230                 'ip_addrs_step': self.ip_addrs_step,
231                 'udp_src_port': self.udp_src_port,
232                 'udp_dst_port': self.udp_dst_port,
233                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
234                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
235                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
236                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None
237             })
238             # after first chain, fall back to the flow count for all other chains
239             cur_chain_flow_count = flows_per_chain
240         return configs
241
242     @staticmethod
243     def ip_to_int(addr):
244         """Convert an IP address from string to numeric."""
245         return struct.unpack("!I", socket.inet_aton(addr))[0]
246
247     @staticmethod
248     def int_to_ip(nvalue):
249         """Convert an IP address from numeric to string."""
250         return socket.inet_ntoa(struct.pack("!I", nvalue))
251
252
253 class GeneratorConfig(object):
254     """Represents traffic configuration for currently running traffic profile."""
255
256     DEFAULT_IP_STEP = '0.0.0.1'
257     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
258
259     def __init__(self, config):
260         """Create a generator config."""
261         self.config = config
262         # name of the generator profile (normally trex or dummy)
263         # pick the default one if not specified explicitly from cli options
264         if not config.generator_profile:
265             config.generator_profile = config.traffic_generator.default_profile
266         # pick up the profile dict based on the name
267         gen_config = self.__match_generator_profile(config.traffic_generator,
268                                                     config.generator_profile)
269         self.gen_config = gen_config
270         # copy over fields from the dict
271         self.tool = gen_config.tool
272         self.ip = gen_config.ip
273         self.cores = gen_config.get('cores', 1)
274         if gen_config.intf_speed:
275             # interface speed is overriden from config
276             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
277         else:
278             # interface speed is discovered/provided by the traffic generator
279             self.intf_speed = 0
280         self.software_mode = gen_config.get('software_mode', False)
281         self.interfaces = gen_config.interfaces
282         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
283             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
284
285         self.service_chain = config.service_chain
286         self.service_chain_count = config.service_chain_count
287         self.flow_count = config.flow_count
288         self.host_name = gen_config.host_name
289
290         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
291         self.ip_addrs = gen_config.ip_addrs
292         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
293         self.tg_gateway_ip_addrs_step = \
294             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
295         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
296         self.gateway_ips = gen_config.gateway_ip_addrs
297         self.udp_src_port = gen_config.udp_src_port
298         self.udp_dst_port = gen_config.udp_dst_port
299         self.devices = [Device(port, self) for port in [0, 1]]
300         # This should normally always be [0, 1]
301         self.ports = [device.port for device in self.devices]
302
303         # check that pci is not empty
304         if not gen_config.interfaces[0].get('pci', None) or \
305            not gen_config.interfaces[1].get('pci', None):
306             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
307
308         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
309         self.vlan_tagging = config.vlan_tagging
310
311         # needed for result/summarizer
312         config['tg-name'] = gen_config.name
313         config['tg-tool'] = self.tool
314
315     def to_json(self):
316         """Get json form to display the content into the overall result dict."""
317         return dict(self.gen_config)
318
319     def set_dest_macs(self, port_index, dest_macs):
320         """Set the list of dest MACs indexed by the chain id on given port.
321
322         port_index: the port for which dest macs must be set
323         dest_macs: a list of dest MACs indexed by chain id
324         """
325         if len(dest_macs) != self.config.service_chain_count:
326             raise TrafficClientException('Dest MAC list %s must have %d entries' %
327                                          (dest_macs, self.config.service_chain_count))
328         self.devices[port_index].set_dest_macs(dest_macs)
329         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
330
331     def set_vlans(self, port_index, vlans):
332         """Set the list of vlans to use indexed by the chain id on given port.
333
334         port_index: the port for which VLANs must be set
335         vlans: a  list of vlan lists indexed by chain id
336         """
337         if len(vlans) != self.config.service_chain_count:
338             raise TrafficClientException('VLAN list %s must have %d entries' %
339                                          (vlans, self.config.service_chain_count))
340         self.devices[port_index].set_vlans(vlans)
341
342     @staticmethod
343     def __match_generator_profile(traffic_generator, generator_profile):
344         gen_config = AttrDict(traffic_generator)
345         gen_config.pop('default_profile')
346         gen_config.pop('generator_profile')
347         matching_profile = [profile for profile in traffic_generator.generator_profile if
348                             profile.name == generator_profile]
349         if len(matching_profile) != 1:
350             raise Exception('Traffic generator profile not found: ' + generator_profile)
351
352         gen_config.update(matching_profile[0])
353         return gen_config
354
355
356 class TrafficClient(object):
357     """Traffic generator client with NDR/PDR binary seearch."""
358
359     PORTS = [0, 1]
360
361     def __init__(self, config, notifier=None):
362         """Create a new TrafficClient instance.
363
364         config: nfvbench config
365         notifier: notifier (optional)
366
367         A new instance is created everytime the nfvbench config may have changed.
368         """
369         self.config = config
370         self.generator_config = GeneratorConfig(config)
371         self.tool = self.generator_config.tool
372         self.gen = self._get_generator()
373         self.notifier = notifier
374         self.interval_collector = None
375         self.iteration_collector = None
376         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
377         self.config.frame_sizes = self._get_frame_sizes()
378         self.run_config = {
379             'l2frame_size': None,
380             'duration_sec': self.config.duration_sec,
381             'bidirectional': True,
382             'rates': []  # to avoid unsbuscriptable-obj warning
383         }
384         self.current_total_rate = {'rate_percent': '10'}
385         if self.config.single_run:
386             self.current_total_rate = utils.parse_rate_str(self.config.rate)
387         self.ifstats = None
388         # Speed is either discovered when connecting to TG or set from config
389         # This variable is 0 if not yet discovered from TG or must be the speed of
390         # each interface in bits per second
391         self.intf_speed = self.generator_config.intf_speed
392
393     def _get_generator(self):
394         tool = self.tool.lower()
395         if tool == 'trex':
396             from traffic_gen import trex
397             return trex.TRex(self)
398         if tool == 'dummy':
399             from traffic_gen import dummy
400             return dummy.DummyTG(self)
401         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
402
403     def skip_sleep(self):
404         """Skip all sleeps when doing unit testing with dummy TG.
405
406         Must be overriden using mock.patch
407         """
408         return False
409
410     def _get_frame_sizes(self):
411         traffic_profile_name = self.config.traffic.profile
412         matching_profiles = [profile for profile in self.config.traffic_profile if
413                              profile.name == traffic_profile_name]
414         if len(matching_profiles) > 1:
415             raise TrafficClientException('Multiple traffic profiles with name: ' +
416                                          traffic_profile_name)
417         elif not matching_profiles:
418             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
419         return matching_profiles[0].l2frame_size
420
421     def start_traffic_generator(self):
422         """Start the traffic generator process (traffic not started yet)."""
423         self.gen.connect()
424         # pick up the interface speed if it is not set from config
425         intf_speeds = self.gen.get_port_speed_gbps()
426         # convert Gbps unit into bps
427         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
428         if self.intf_speed:
429             # interface speed is overriden from config
430             if self.intf_speed != tg_if_speed:
431                 # Warn the user if the speed in the config is different
432                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
433                             intf_speeds[0])
434         else:
435             # interface speed not provisioned by config
436             self.intf_speed = tg_if_speed
437             # also update the speed in the tg config
438             self.generator_config.intf_speed = tg_if_speed
439
440         # Save the traffic generator local MAC
441         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
442             device.set_mac(mac)
443
444     def setup(self):
445         """Set up the traffic client."""
446         self.gen.set_mode()
447         self.gen.clear_stats()
448
449     def get_version(self):
450         """Get the traffic generator version."""
451         return self.gen.get_version()
452
453     def ensure_end_to_end(self):
454         """Ensure traffic generator receives packets it has transmitted.
455
456         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
457
458         VMs that are started and in active state may not pass traffic yet. It is imperative to make
459         sure that all VMs are passing traffic in both directions before starting any benchmarking.
460         To verify this, we need to send at a low frequency bi-directional packets and make sure
461         that we receive all packets back from all VMs. The number of flows is equal to 2 times
462         the number of chains (1 per direction) and we need to make sure we receive packets coming
463         from exactly 2 x chain count different source MAC addresses.
464
465         Example:
466             PVP chain (1 VM per chain)
467             N = 10 (number of chains)
468             Flow count = 20 (number of flows)
469             If the number of unique source MAC addresses from received packets is 20 then
470             all 10 VMs 10 VMs are in operational state.
471         """
472         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
473         # send 2pps on each chain and each direction
474         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
475         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
476
477         # ensures enough traffic is coming back
478         retry_count = (self.config.check_traffic_time_sec +
479                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
480         mac_addresses = set()
481
482         # we expect to see packets coming from 2 unique MAC per chain
483         unique_src_mac_count = self.config.service_chain_count * 2
484         for it in xrange(retry_count):
485             self.gen.clear_stats()
486             self.gen.start_traffic()
487             self.gen.start_capture()
488             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
489                      len(mac_addresses), unique_src_mac_count,
490                      it + 1, retry_count)
491             if not self.skip_sleep():
492                 time.sleep(self.config.generic_poll_sec)
493             self.gen.stop_traffic()
494             self.gen.fetch_capture_packets()
495             self.gen.stop_capture()
496
497             for packet in self.gen.packet_list:
498                 src_mac = packet['binary'][6:12]
499                 if src_mac not in mac_addresses:
500                     LOG.info('Received packet from mac: %s',
501                              ':'.join(["%02x" % ord(x) for x in src_mac]))
502                     mac_addresses.add(src_mac)
503
504                 if len(mac_addresses) == unique_src_mac_count:
505                     LOG.info('End-to-end connectivity established')
506                     return
507
508         raise TrafficClientException('End-to-end connectivity cannot be ensured')
509
510     def ensure_arp_successful(self):
511         """Resolve all IP using ARP and throw an exception in case of failure."""
512         if not self.gen.resolve_arp():
513             raise TrafficClientException('ARP cannot be resolved')
514
515     def set_traffic(self, frame_size, bidirectional):
516         """Reconfigure the traffic generator for a new frame size."""
517         self.run_config['bidirectional'] = bidirectional
518         self.run_config['l2frame_size'] = frame_size
519         self.run_config['rates'] = [self.get_per_direction_rate()]
520         if bidirectional:
521             self.run_config['rates'].append(self.get_per_direction_rate())
522         else:
523             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
524             if unidir_reverse_pps > 0:
525                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
526         # Fix for [NFVBENCH-67], convert the rate string to PPS
527         for idx, rate in enumerate(self.run_config['rates']):
528             if 'rate_pps' not in rate:
529                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
530
531         self.gen.clear_streamblock()
532         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
533
534     def _modify_load(self, load):
535         self.current_total_rate = {'rate_percent': str(load)}
536         rate_per_direction = self.get_per_direction_rate()
537
538         self.gen.modify_rate(rate_per_direction, False)
539         self.run_config['rates'][0] = rate_per_direction
540         if self.run_config['bidirectional']:
541             self.gen.modify_rate(rate_per_direction, True)
542             self.run_config['rates'][1] = rate_per_direction
543
544     def get_ndr_and_pdr(self):
545         """Start the NDR/PDR iteration and return the results."""
546         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
547         targets = {}
548         if self.config.ndr_run:
549             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
550             targets['ndr'] = self.config.measurement.NDR
551         if self.config.pdr_run:
552             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
553             targets['pdr'] = self.config.measurement.PDR
554
555         self.run_config['start_time'] = time.time()
556         self.interval_collector = IntervalCollector(self.run_config['start_time'])
557         self.interval_collector.attach_notifier(self.notifier)
558         self.iteration_collector = IterationCollector(self.run_config['start_time'])
559         results = {}
560         self.__range_search(0.0, 200.0, targets, results)
561
562         results['iteration_stats'] = {
563             'ndr_pdr': self.iteration_collector.get()
564         }
565
566         if self.config.ndr_run:
567             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
568             results['ndr']['time_taken_sec'] = \
569                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
570             if self.config.pdr_run:
571                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
572                 results['pdr']['time_taken_sec'] = \
573                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
574         else:
575             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
576             results['pdr']['time_taken_sec'] = \
577                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
578         return results
579
580     def __get_dropped_rate(self, result):
581         dropped_pkts = result['rx']['dropped_pkts']
582         total_pkts = result['tx']['total_pkts']
583         if not total_pkts:
584             return float('inf')
585         return float(dropped_pkts) / total_pkts * 100
586
587     def get_stats(self):
588         """Collect final stats for previous run."""
589         stats = self.gen.get_stats()
590         retDict = {'total_tx_rate': stats['total_tx_rate']}
591         for port in self.PORTS:
592             retDict[port] = {'tx': {}, 'rx': {}}
593
594         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
595         rx_keys = tx_keys + ['dropped_pkts']
596
597         for port in self.PORTS:
598             for key in tx_keys:
599                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
600             for key in rx_keys:
601                 try:
602                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
603                 except ValueError:
604                     retDict[port]['rx'][key] = 0
605             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
606                 stats[port]['rx']['avg_delay_usec'])
607             retDict[port]['rx']['min_delay_usec'] = cast_integer(
608                 stats[port]['rx']['min_delay_usec'])
609             retDict[port]['rx']['max_delay_usec'] = cast_integer(
610                 stats[port]['rx']['max_delay_usec'])
611             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
612
613         ports = sorted(retDict.keys())
614         if self.run_config['bidirectional']:
615             retDict['overall'] = {'tx': {}, 'rx': {}}
616             for key in tx_keys:
617                 retDict['overall']['tx'][key] = \
618                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
619             for key in rx_keys:
620                 retDict['overall']['rx'][key] = \
621                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
622             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
623                           retDict[ports[1]]['rx']['total_pkts']]
624             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
625                           retDict[ports[1]]['rx']['avg_delay_usec']]
626             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
627                           retDict[ports[1]]['rx']['max_delay_usec']]
628             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
629                           retDict[ports[1]]['rx']['min_delay_usec']]
630             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
631             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
632             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
633             for key in ['pkt_bit_rate', 'pkt_rate']:
634                 for dirc in ['tx', 'rx']:
635                     retDict['overall'][dirc][key] /= 2.0
636         else:
637             retDict['overall'] = retDict[ports[0]]
638         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
639         return retDict
640
641     def __convert_rates(self, rate):
642         return utils.convert_rates(self.run_config['l2frame_size'],
643                                    rate,
644                                    self.intf_speed)
645
646     def __ndr_pdr_found(self, tag, load):
647         rates = self.__convert_rates({'rate_percent': load})
648         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
649         last_stats = self.iteration_collector.peek()
650         self.interval_collector.add_ndr_pdr(tag, last_stats)
651
652     def __format_output_stats(self, stats):
653         for key in self.PORTS + ['overall']:
654             interface = stats[key]
655             stats[key] = {
656                 'tx_pkts': interface['tx']['total_pkts'],
657                 'rx_pkts': interface['rx']['total_pkts'],
658                 'drop_percentage': interface['drop_rate_percent'],
659                 'drop_pct': interface['rx']['dropped_pkts'],
660                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
661                 'max_delay_usec': interface['rx']['max_delay_usec'],
662                 'min_delay_usec': interface['rx']['min_delay_usec'],
663             }
664
665         return stats
666
667     def __targets_found(self, rate, targets, results):
668         for tag, target in targets.iteritems():
669             LOG.info('Found %s (%s) load: %s', tag, target, rate)
670             self.__ndr_pdr_found(tag, rate)
671             results[tag]['timestamp_sec'] = time.time()
672
673     def __range_search(self, left, right, targets, results):
674         """Perform a binary search for a list of targets inside a [left..right] range or rate.
675
676         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
677                 indicating the rate to send on each interface
678         right   the right side of the range to search as a % of line rate
679                 indicating the rate to send on each interface
680         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
681                 ('ndr', 'pdr')
682         results a dict to store results
683         """
684         if not targets:
685             return
686         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
687
688         # Terminate search when gap is less than load epsilon
689         if right - left < self.config.measurement.load_epsilon:
690             self.__targets_found(left, targets, results)
691             return
692
693         # Obtain the average drop rate in for middle load
694         middle = (left + right) / 2.0
695         try:
696             stats, rates = self.__run_search_iteration(middle)
697         except STLError:
698             LOG.exception("Got exception from traffic generator during binary search")
699             self.__targets_found(left, targets, results)
700             return
701         # Split target dicts based on the avg drop rate
702         left_targets = {}
703         right_targets = {}
704         for tag, target in targets.iteritems():
705             if stats['overall']['drop_rate_percent'] <= target:
706                 # record the best possible rate found for this target
707                 results[tag] = rates
708                 results[tag].update({
709                     'load_percent_per_direction': middle,
710                     'stats': self.__format_output_stats(dict(stats)),
711                     'timestamp_sec': None
712                 })
713                 right_targets[tag] = target
714             else:
715                 # initialize to 0 all fields of result for
716                 # the worst case scenario of the binary search (if ndr/pdr is not found)
717                 if tag not in results:
718                     results[tag] = dict.fromkeys(rates, 0)
719                     empty_stats = self.__format_output_stats(dict(stats))
720                     for key in empty_stats:
721                         if isinstance(empty_stats[key], dict):
722                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
723                         else:
724                             empty_stats[key] = 0
725                     results[tag].update({
726                         'load_percent_per_direction': 0,
727                         'stats': empty_stats,
728                         'timestamp_sec': None
729                     })
730                 left_targets[tag] = target
731
732         # search lower half
733         self.__range_search(left, middle, left_targets, results)
734
735         # search upper half only if the upper rate does not exceed
736         # 100%, this only happens when the first search at 100%
737         # yields a DR that is < target DR
738         if middle >= 100:
739             self.__targets_found(100, right_targets, results)
740         else:
741             self.__range_search(middle, right, right_targets, results)
742
743     def __run_search_iteration(self, rate):
744         """Run one iteration at the given rate level.
745
746         rate: the rate to send on each port in percent (0 to 100)
747         """
748         self._modify_load(rate)
749
750         # poll interval stats and collect them
751         for stats in self.run_traffic():
752             self.interval_collector.add(stats)
753             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
754             if time_elapsed_ratio >= 1:
755                 self.cancel_traffic()
756                 if not self.skip_sleep():
757                     time.sleep(self.config.pause_sec)
758         self.interval_collector.reset()
759
760         # get stats from the run
761         stats = self.runner.client.get_stats()
762         current_traffic_config = self._get_traffic_config()
763         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
764                                         stats['total_tx_rate'])
765         if warning is not None:
766             stats['warning'] = warning
767
768         # save reliable stats from whole iteration
769         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
770         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
771         return stats, current_traffic_config['direction-total']
772
773     @staticmethod
774     def log_stats(stats):
775         """Log estimated stats during run."""
776         report = {
777             'datetime': str(datetime.now()),
778             'tx_packets': stats['overall']['tx']['total_pkts'],
779             'rx_packets': stats['overall']['rx']['total_pkts'],
780             'drop_packets': stats['overall']['rx']['dropped_pkts'],
781             'drop_rate_percent': stats['overall']['drop_rate_percent']
782         }
783         LOG.info('TX: %(tx_packets)d; '
784                  'RX: %(rx_packets)d; '
785                  'Est. Dropped: %(drop_packets)d; '
786                  'Est. Drop rate: %(drop_rate_percent).4f%%',
787                  report)
788
789     def run_traffic(self):
790         """Start traffic and return intermediate stats for each interval."""
791         stats = self.runner.run()
792         while self.runner.is_running:
793             self.log_stats(stats)
794             yield stats
795             stats = self.runner.poll_stats()
796             if stats is None:
797                 return
798         self.log_stats(stats)
799         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
800         yield stats
801
802     def cancel_traffic(self):
803         """Stop traffic."""
804         self.runner.stop()
805
806     def _get_traffic_config(self):
807         config = {}
808         load_total = 0.0
809         bps_total = 0.0
810         pps_total = 0.0
811         for idx, rate in enumerate(self.run_config['rates']):
812             key = 'direction-forward' if idx == 0 else 'direction-reverse'
813             config[key] = {
814                 'l2frame_size': self.run_config['l2frame_size'],
815                 'duration_sec': self.run_config['duration_sec']
816             }
817             config[key].update(rate)
818             config[key].update(self.__convert_rates(rate))
819             load_total += float(config[key]['rate_percent'])
820             bps_total += float(config[key]['rate_bps'])
821             pps_total += float(config[key]['rate_pps'])
822         config['direction-total'] = dict(config['direction-forward'])
823         config['direction-total'].update({
824             'rate_percent': load_total,
825             'rate_pps': cast_integer(pps_total),
826             'rate_bps': bps_total
827         })
828
829         return config
830
831     def get_run_config(self, results):
832         """Return configuration which was used for the last run."""
833         r = {}
834         # because we want each direction to have the far end RX rates,
835         # use the far end index (1-idx) to retrieve the RX rates
836         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
837             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
838             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
839             r[key] = {
840                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
841                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
842                 "rx": self.__convert_rates({'rate_pps': rx_rate})
843             }
844
845         total = {}
846         for direction in ['orig', 'tx', 'rx']:
847             total[direction] = {}
848             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
849                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
850
851         r['direction-total'] = total
852         return r
853
854     def insert_interface_stats(self, pps_list):
855         """Insert interface stats to a list of packet path stats.
856
857         pps_list: a list of packet path stats instances indexed by chain index
858
859         This function will insert the packet path stats for the traffic gen ports 0 and 1
860         with itemized per chain tx/rx counters.
861         There will be as many packet path stats as chains.
862         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
863         self.pps_list:
864         [
865         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
866         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
867         ...
868         ]
869         """
870         def get_if_stats(chain_idx):
871             return [InterfaceStats('p' + str(port), self.tool)
872                     for port in range(2)]
873         # keep the list of list of interface stats indexed by the chain id
874         self.ifstats = [get_if_stats(chain_idx)
875                         for chain_idx in range(self.config.service_chain_count)]
876         # note that we need to make a copy of the ifs list so that any modification in the
877         # list from pps will not change the list saved in self.ifstats
878         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
879         # insert the corresponding pps in the passed list
880         pps_list.extend(self.pps_list)
881
882     def update_interface_stats(self, diff=False):
883         """Update all interface stats.
884
885         diff: if False, simply refresh the interface stats values with latest values
886               if True, diff the interface stats with the latest values
887         Make sure that the interface stats inserted in insert_interface_stats() are updated
888         with proper values.
889         self.ifstats:
890         [
891         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
892         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
893         ...
894         ]
895         """
896         if diff:
897             stats = self.gen.get_stats()
898             for chain_idx, ifs in enumerate(self.ifstats):
899                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
900                 # corresponding to the
901                 # port 0 and port 1 for the given chain_idx
902                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
903                 # interface stats for the pps because it could have been modified to contain
904                 # additional interface stats
905                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
906
907
908     @staticmethod
909     def compare_tx_rates(required, actual):
910         """Compare the actual TX rate to the required TX rate."""
911         threshold = 0.9
912         are_different = False
913         try:
914             if float(actual) / required < threshold:
915                 are_different = True
916         except ZeroDivisionError:
917             are_different = True
918
919         if are_different:
920             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
921                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
922                   "to achieve the requested TX rate.".format(r=required, a=actual)
923             LOG.info(msg)
924             return msg
925
926         return None
927
928     def get_per_direction_rate(self):
929         """Get the rate for each direction."""
930         divisor = 2 if self.run_config['bidirectional'] else 1
931         if 'rate_percent' in self.current_total_rate:
932             # don't split rate if it's percentage
933             divisor = 1
934
935         return utils.divide_rate(self.current_total_rate, divisor)
936
937     def close(self):
938         """Close this instance."""
939         try:
940             self.gen.stop_traffic()
941         except Exception:
942             pass
943         self.gen.clear_stats()
944         self.gen.cleanup()