188e0762d796337bb6a9b76c1d54b5a006f17f10
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 from datetime import datetime
16 import re
17 import socket
18 import struct
19 import time
20
21 from attrdict import AttrDict
22 import bitmath
23 from netaddr import IPNetwork
24 # pylint: disable=import-error
25 from trex_stl_lib.api import STLError
26 # pylint: enable=import-error
27
28 from log import LOG
29 from network import Interface
30 from specs import ChainType
31 from stats_collector import IntervalCollector
32 from stats_collector import IterationCollector
33 import traffic_gen.traffic_utils as utils
34 from utils import cast_integer
35
36
37 class TrafficClientException(Exception):
38     """Generic traffic client exception."""
39
40     pass
41
42
43 class TrafficRunner(object):
44     """Serialize various steps required to run traffic."""
45
46     def __init__(self, client, duration_sec, interval_sec=0):
47         self.client = client
48         self.start_time = None
49         self.duration_sec = duration_sec
50         self.interval_sec = interval_sec
51
52     def run(self):
53         LOG.info('Running traffic generator')
54         self.client.gen.clear_stats()
55         self.client.gen.start_traffic()
56         self.start_time = time.time()
57         return self.poll_stats()
58
59     def stop(self):
60         if self.is_running():
61             self.start_time = None
62             self.client.gen.stop_traffic()
63
64     def is_running(self):
65         return self.start_time is not None
66
67     def time_elapsed(self):
68         if self.is_running():
69             return time.time() - self.start_time
70         return self.duration_sec
71
72     def poll_stats(self):
73         if not self.is_running():
74             return None
75         if self.client.skip_sleep:
76             self.stop()
77             return self.client.get_stats()
78         time_elapsed = self.time_elapsed()
79         if time_elapsed > self.duration_sec:
80             self.stop()
81             return None
82         time_left = self.duration_sec - time_elapsed
83         if self.interval_sec > 0.0:
84             if time_left <= self.interval_sec:
85                 time.sleep(time_left)
86                 self.stop()
87             else:
88                 time.sleep(self.interval_sec)
89         else:
90             time.sleep(self.duration_sec)
91             self.stop()
92         return self.client.get_stats()
93
94
95 class IpBlock(object):
96     """Manage a block of IP addresses."""
97
98     def __init__(self, base_ip, step_ip, count_ip):
99         self.base_ip_int = Device.ip_to_int(base_ip)
100         self.step = Device.ip_to_int(step_ip)
101         self.max_available = count_ip
102         self.next_free = 0
103
104     def get_ip(self, index=0):
105         """Return the IP address at given index."""
106         if index < 0 or index >= self.max_available:
107             raise IndexError('Index out of bounds')
108         return Device.int_to_ip(self.base_ip_int + index * self.step)
109
110     def reserve_ip_range(self, count):
111         """Reserve a range of count consecutive IP addresses spaced by step."""
112         if self.next_free + count > self.max_available:
113             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
114                              (self.next_free,
115                               self.max_available,
116                               count))
117         first_ip = self.get_ip(self.next_free)
118         last_ip = self.get_ip(self.next_free + count - 1)
119         self.next_free += count
120         return (first_ip, last_ip)
121
122     def reset_reservation(self):
123         self.next_free = 0
124
125
126 class Device(object):
127     """Represent a port device and all information associated to it."""
128
129     def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None,
130                  gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None,
131                  gateway_ip_addrs_step=None, udp_src_port=None, udp_dst_port=None,
132                  dst_mac=None, chain_count=1, flow_count=1, vlan_tagging=False):
133         self.chain_count = chain_count
134         self.flow_count = flow_count
135         self.dst = None
136         self.port = port
137         self.switch_port = switch_port
138         self.vtep_vlan = vtep_vlan
139         self.vlan_tag = None
140         self.vlan_tagging = vlan_tagging
141         self.pci = pci
142         self.mac = None
143         self.dst_mac = dst_mac
144         self.vm_mac_list = None
145         subnet = IPNetwork(ip)
146         self.ip = subnet.ip.format()
147         self.ip_prefixlen = subnet.prefixlen
148         self.ip_addrs_step = ip_addrs_step
149         self.tg_gateway_ip_addrs_step = tg_gateway_ip_addrs_step
150         self.gateway_ip_addrs_step = gateway_ip_addrs_step
151         self.gateway_ip = gateway_ip
152         self.tg_gateway_ip = tg_gateway_ip
153         self.ip_block = IpBlock(self.ip, ip_addrs_step, flow_count)
154         self.gw_ip_block = IpBlock(gateway_ip,
155                                    gateway_ip_addrs_step,
156                                    chain_count)
157         self.tg_gw_ip_block = IpBlock(tg_gateway_ip,
158                                       tg_gateway_ip_addrs_step,
159                                       chain_count)
160         self.udp_src_port = udp_src_port
161         self.udp_dst_port = udp_dst_port
162
163     def set_mac(self, mac):
164         if mac is None:
165             raise TrafficClientException('Trying to set traffic generator MAC address as None')
166         self.mac = mac
167         LOG.info("Port %d: src MAC %s", self.port, self.mac)
168
169     def set_destination(self, dst):
170         self.dst = dst
171
172     def set_vm_mac_list(self, vm_mac_list):
173         self.vm_mac_list = map(str, vm_mac_list)
174
175     def set_vlan_tag(self, vlan_tag):
176         if self.vlan_tagging and vlan_tag is None:
177             raise TrafficClientException('Trying to set VLAN tag as None')
178         self.vlan_tag = vlan_tag
179         LOG.info("Port %d: VLAN %d", self.port, self.vlan_tag)
180
181     def get_gw_ip(self, chain_index):
182         """Retrieve the IP address assigned for the gateway of a given chain."""
183         return self.gw_ip_block.get_ip(chain_index)
184
185     def get_stream_configs(self, service_chain):
186         configs = []
187         # exact flow count for each chain is calculated as follows:
188         # - all chains except the first will have the same flow count
189         #   calculated as (total_flows + chain_count - 1) / chain_count
190         # - the first chain will have the remainder
191         # example 11 flows and 3 chains => 3, 4, 4
192         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
193         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
194
195         self.ip_block.reset_reservation()
196         self.dst.ip_block.reset_reservation()
197
198         for chain_idx in xrange(self.chain_count):
199             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
200             dst_ip_first, dst_ip_last = self.dst.ip_block.reserve_ip_range(cur_chain_flow_count)
201
202             dst_mac = self.dst_mac[chain_idx] if self.dst_mac is not None else self.dst.mac
203             if not re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", dst_mac.lower()):
204                 raise TrafficClientException("Invalid MAC address '{mac}' specified in "
205                                              "mac_addrs_left/right".format(mac=dst_mac))
206
207             configs.append({
208                 'count': cur_chain_flow_count,
209                 'mac_src': self.mac,
210                 'mac_dst': dst_mac if service_chain == ChainType.EXT else self.vm_mac_list[
211                     chain_idx],
212                 'ip_src_addr': src_ip_first,
213                 'ip_src_addr_max': src_ip_last,
214                 'ip_src_count': cur_chain_flow_count,
215                 'ip_dst_addr': dst_ip_first,
216                 'ip_dst_addr_max': dst_ip_last,
217                 'ip_dst_count': cur_chain_flow_count,
218                 'ip_addrs_step': self.ip_addrs_step,
219                 'udp_src_port': self.udp_src_port,
220                 'udp_dst_port': self.udp_dst_port,
221                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
222                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
223                 'ip_dst_tg_gw': self.dst.tg_gw_ip_block.get_ip(chain_idx),
224                 'vlan_tag': self.vlan_tag if self.vlan_tagging else None
225             })
226             # after first chain, fall back to the flow count for all other chains
227             cur_chain_flow_count = flows_per_chain
228
229         return configs
230
231     def ip_range_overlaps(self):
232         """Check if this device ip range is overlapping with the dst device ip range."""
233         src_base_ip = Device.ip_to_int(self.ip)
234         dst_base_ip = Device.ip_to_int(self.dst.ip)
235         src_last_ip = src_base_ip + self.flow_count - 1
236         dst_last_ip = dst_base_ip + self.flow_count - 1
237         return dst_last_ip >= src_base_ip and src_last_ip >= dst_base_ip
238
239     @staticmethod
240     def mac_to_int(mac):
241         return int(mac.translate(None, ":.- "), 16)
242
243     @staticmethod
244     def int_to_mac(i):
245         mac = format(i, 'x').zfill(12)
246         blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)]
247         return ':'.join(blocks)
248
249     @staticmethod
250     def ip_to_int(addr):
251         return struct.unpack("!I", socket.inet_aton(addr))[0]
252
253     @staticmethod
254     def int_to_ip(nvalue):
255         return socket.inet_ntoa(struct.pack("!I", nvalue))
256
257
258 class RunningTrafficProfile(object):
259     """Represents traffic configuration for currently running traffic profile."""
260
261     DEFAULT_IP_STEP = '0.0.0.1'
262     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
263
264     def __init__(self, config, generator_profile):
265         generator_config = self.__match_generator_profile(config.traffic_generator,
266                                                           generator_profile)
267         self.generator_config = generator_config
268         self.service_chain = config.service_chain
269         self.service_chain_count = config.service_chain_count
270         self.flow_count = config.flow_count
271         self.host_name = generator_config.host_name
272         self.name = generator_config.name
273         self.tool = generator_config.tool
274         self.cores = generator_config.get('cores', 1)
275         self.ip_addrs_step = generator_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
276         self.tg_gateway_ip_addrs_step = \
277             generator_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
278         self.gateway_ip_addrs_step = generator_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
279         self.gateway_ips = generator_config.gateway_ip_addrs
280         self.ip = generator_config.ip
281         self.intf_speed = bitmath.parse_string(generator_config.intf_speed.replace('ps', '')).bits
282         self.vlan_tagging = config.vlan_tagging
283         self.no_arp = config.no_arp
284         self.src_device = None
285         self.dst_device = None
286         self.vm_mac_list = None
287         self.mac_addrs_left = generator_config.mac_addrs_left
288         self.mac_addrs_right = generator_config.mac_addrs_right
289         self.__prep_interfaces(generator_config)
290
291     def to_json(self):
292         return dict(self.generator_config)
293
294     def set_vm_mac_list(self, vm_mac_list):
295         self.src_device.set_vm_mac_list(vm_mac_list[0])
296         self.dst_device.set_vm_mac_list(vm_mac_list[1])
297
298     @staticmethod
299     def __match_generator_profile(traffic_generator, generator_profile):
300         generator_config = AttrDict(traffic_generator)
301         generator_config.pop('default_profile')
302         generator_config.pop('generator_profile')
303         matching_profile = [profile for profile in traffic_generator.generator_profile if
304                             profile.name == generator_profile]
305         if len(matching_profile) != 1:
306             raise Exception('Traffic generator profile not found: ' + generator_profile)
307
308         generator_config.update(matching_profile[0])
309
310         return generator_config
311
312     def __prep_interfaces(self, generator_config):
313         src_config = {
314             'chain_count': self.service_chain_count,
315             'flow_count': self.flow_count / 2,
316             'ip': generator_config.ip_addrs[0],
317             'ip_addrs_step': self.ip_addrs_step,
318             'gateway_ip': self.gateway_ips[0],
319             'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
320             'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[0],
321             'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
322             'udp_src_port': generator_config.udp_src_port,
323             'udp_dst_port': generator_config.udp_dst_port,
324             'vlan_tagging': self.vlan_tagging,
325             'dst_mac': generator_config.mac_addrs_left
326         }
327         dst_config = {
328             'chain_count': self.service_chain_count,
329             'flow_count': self.flow_count / 2,
330             'ip': generator_config.ip_addrs[1],
331             'ip_addrs_step': self.ip_addrs_step,
332             'gateway_ip': self.gateway_ips[1],
333             'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
334             'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[1],
335             'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
336             'udp_src_port': generator_config.udp_src_port,
337             'udp_dst_port': generator_config.udp_dst_port,
338             'vlan_tagging': self.vlan_tagging,
339             'dst_mac': generator_config.mac_addrs_right
340         }
341
342         self.src_device = Device(**dict(src_config, **generator_config.interfaces[0]))
343         self.dst_device = Device(**dict(dst_config, **generator_config.interfaces[1]))
344         self.src_device.set_destination(self.dst_device)
345         self.dst_device.set_destination(self.src_device)
346
347         if self.service_chain == ChainType.EXT and not self.no_arp \
348                 and self.src_device.ip_range_overlaps():
349             raise Exception('Overlapping IP address ranges src=%s dst=%d flows=%d' %
350                             self.src_device.ip,
351                             self.dst_device.ip,
352                             self.flow_count)
353
354     @property
355     def devices(self):
356         return [self.src_device, self.dst_device]
357
358     @property
359     def vlans(self):
360         return [self.src_device.vtep_vlan, self.dst_device.vtep_vlan]
361
362     @property
363     def ports(self):
364         return [self.src_device.port, self.dst_device.port]
365
366     @property
367     def switch_ports(self):
368         return [self.src_device.switch_port, self.dst_device.switch_port]
369
370     @property
371     def pcis(self):
372         return [self.src_device.pci, self.dst_device.pci]
373
374
375 class TrafficGeneratorFactory(object):
376     """Factory class to generate a traffic generator."""
377
378     def __init__(self, config):
379         self.config = config
380
381     def get_tool(self):
382         return self.config.generator_config.tool
383
384     def get_generator_client(self):
385         tool = self.get_tool().lower()
386         if tool == 'trex':
387             from traffic_gen import trex
388             return trex.TRex(self.config)
389         elif tool == 'dummy':
390             from traffic_gen import dummy
391             return dummy.DummyTG(self.config)
392         return None
393
394     def list_generator_profile(self):
395         return [profile.name for profile in self.config.traffic_generator.generator_profile]
396
397     def get_generator_config(self, generator_profile):
398         return RunningTrafficProfile(self.config, generator_profile)
399
400     def get_matching_profile(self, traffic_profile_name):
401         matching_profile = [profile for profile in self.config.traffic_profile if
402                             profile.name == traffic_profile_name]
403
404         if len(matching_profile) > 1:
405             raise Exception('Multiple traffic profiles with the same name found.')
406         elif not matching_profile:
407             raise Exception('No traffic profile found.')
408
409         return matching_profile[0]
410
411     def get_frame_sizes(self, traffic_profile):
412         matching_profile = self.get_matching_profile(traffic_profile)
413         return matching_profile.l2frame_size
414
415
416 class TrafficClient(object):
417     """Traffic generator client."""
418
419     PORTS = [0, 1]
420
421     def __init__(self, config, notifier=None, skip_sleep=False):
422         generator_factory = TrafficGeneratorFactory(config)
423         self.gen = generator_factory.get_generator_client()
424         self.tool = generator_factory.get_tool()
425         self.config = config
426         self.notifier = notifier
427         self.interval_collector = None
428         self.iteration_collector = None
429         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
430         if self.gen is None:
431             raise TrafficClientException('%s is not a supported traffic generator' % self.tool)
432
433         self.run_config = {
434             'l2frame_size': None,
435             'duration_sec': self.config.duration_sec,
436             'bidirectional': True,
437             'rates': []  # to avoid unsbuscriptable-obj warning
438         }
439         self.current_total_rate = {'rate_percent': '10'}
440         if self.config.single_run:
441             self.current_total_rate = utils.parse_rate_str(self.config.rate)
442         # UT with dummy TG can bypass all sleeps
443         self.skip_sleep = skip_sleep
444
445     def set_macs(self):
446         for mac, device in zip(self.gen.get_macs(), self.config.generator_config.devices):
447             device.set_mac(mac)
448
449     def start_traffic_generator(self):
450         self.gen.init()
451         self.gen.connect()
452
453     def setup(self):
454         self.gen.set_mode()
455         self.gen.config_interface()
456         self.gen.clear_stats()
457
458     def get_version(self):
459         return self.gen.get_version()
460
461     def ensure_end_to_end(self):
462         """Ensure traffic generator receives packets it has transmitted.
463
464         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
465
466         VMs that are started and in active state may not pass traffic yet. It is imperative to make
467         sure that all VMs are passing traffic in both directions before starting any benchmarking.
468         To verify this, we need to send at a low frequency bi-directional packets and make sure
469         that we receive all packets back from all VMs. The number of flows is equal to 2 times
470         the number of chains (1 per direction) and we need to make sure we receive packets coming
471         from exactly 2 x chain count different source MAC addresses.
472
473         Example:
474             PVP chain (1 VM per chain)
475             N = 10 (number of chains)
476             Flow count = 20 (number of flows)
477             If the number of unique source MAC addresses from received packets is 20 then
478             all 10 VMs 10 VMs are in operational state.
479         """
480         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
481         rate_pps = {'rate_pps': str(self.config.service_chain_count * 1)}
482         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
483
484         # ensures enough traffic is coming back
485         retry_count = (self.config.check_traffic_time_sec +
486                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
487         mac_addresses = set()
488         ln = 0
489         # in case of l2-loopback, we will only have 2 unique src MAC regardless of the
490         # number of chains configured because there are no VM involved
491         # otherwise, we expect to see packets coming from 2 unique MAC per chain
492         unique_src_mac_count = 2 if self.config.l2_loopback else self.config.service_chain_count * 2
493         for it in xrange(retry_count):
494             self.gen.clear_stats()
495             self.gen.start_traffic()
496             self.gen.start_capture()
497             LOG.info('Waiting for packets to be received back... (%d / %d)', it + 1, retry_count)
498             if not self.skip_sleep:
499                 time.sleep(self.config.generic_poll_sec)
500             self.gen.stop_traffic()
501             self.gen.fetch_capture_packets()
502             self.gen.stop_capture()
503
504             for packet in self.gen.packet_list:
505                 mac_addresses.add(packet['binary'][6:12])
506                 if ln != len(mac_addresses):
507                     ln = len(mac_addresses)
508                     LOG.info('Received unique source MAC %d / %d', ln, unique_src_mac_count)
509                 if len(mac_addresses) == unique_src_mac_count:
510                     LOG.info('End-to-end connectivity ensured')
511                     return
512
513             if not self.skip_sleep:
514                 time.sleep(self.config.generic_poll_sec)
515
516         raise TrafficClientException('End-to-end connectivity cannot be ensured')
517
518     def ensure_arp_successful(self):
519         if not self.gen.resolve_arp():
520             raise TrafficClientException('ARP cannot be resolved')
521
522     def set_traffic(self, frame_size, bidirectional):
523         self.run_config['bidirectional'] = bidirectional
524         self.run_config['l2frame_size'] = frame_size
525         self.run_config['rates'] = [self.get_per_direction_rate()]
526         if bidirectional:
527             self.run_config['rates'].append(self.get_per_direction_rate())
528         else:
529             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
530             if unidir_reverse_pps > 0:
531                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
532         # Fix for [NFVBENCH-67], convert the rate string to PPS
533         for idx, rate in enumerate(self.run_config['rates']):
534             if 'rate_pps' not in rate:
535                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
536
537         self.gen.clear_streamblock()
538         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
539
540     def modify_load(self, load):
541         self.current_total_rate = {'rate_percent': str(load)}
542         rate_per_direction = self.get_per_direction_rate()
543
544         self.gen.modify_rate(rate_per_direction, False)
545         self.run_config['rates'][0] = rate_per_direction
546         if self.run_config['bidirectional']:
547             self.gen.modify_rate(rate_per_direction, True)
548             self.run_config['rates'][1] = rate_per_direction
549
550     def get_ndr_and_pdr(self):
551         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
552         targets = {}
553         if self.config.ndr_run:
554             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
555             targets['ndr'] = self.config.measurement.NDR
556         if self.config.pdr_run:
557             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
558             targets['pdr'] = self.config.measurement.PDR
559
560         self.run_config['start_time'] = time.time()
561         self.interval_collector = IntervalCollector(self.run_config['start_time'])
562         self.interval_collector.attach_notifier(self.notifier)
563         self.iteration_collector = IterationCollector(self.run_config['start_time'])
564         results = {}
565         self.__range_search(0.0, 200.0, targets, results)
566
567         results['iteration_stats'] = {
568             'ndr_pdr': self.iteration_collector.get()
569         }
570
571         if self.config.ndr_run:
572             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
573             results['ndr']['time_taken_sec'] = \
574                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
575             if self.config.pdr_run:
576                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
577                 results['pdr']['time_taken_sec'] = \
578                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
579         else:
580             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
581             results['pdr']['time_taken_sec'] = \
582                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
583         return results
584
585     def __get_dropped_rate(self, result):
586         dropped_pkts = result['rx']['dropped_pkts']
587         total_pkts = result['tx']['total_pkts']
588         if not total_pkts:
589             return float('inf')
590         return float(dropped_pkts) / total_pkts * 100
591
592     def get_stats(self):
593         stats = self.gen.get_stats()
594         retDict = {'total_tx_rate': stats['total_tx_rate']}
595         for port in self.PORTS:
596             retDict[port] = {'tx': {}, 'rx': {}}
597
598         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
599         rx_keys = tx_keys + ['dropped_pkts']
600
601         for port in self.PORTS:
602             for key in tx_keys:
603                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
604             for key in rx_keys:
605                 try:
606                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
607                 except ValueError:
608                     retDict[port]['rx'][key] = 0
609             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
610                 stats[port]['rx']['avg_delay_usec'])
611             retDict[port]['rx']['min_delay_usec'] = cast_integer(
612                 stats[port]['rx']['min_delay_usec'])
613             retDict[port]['rx']['max_delay_usec'] = cast_integer(
614                 stats[port]['rx']['max_delay_usec'])
615             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
616
617         ports = sorted(retDict.keys())
618         if self.run_config['bidirectional']:
619             retDict['overall'] = {'tx': {}, 'rx': {}}
620             for key in tx_keys:
621                 retDict['overall']['tx'][key] = \
622                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
623             for key in rx_keys:
624                 retDict['overall']['rx'][key] = \
625                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
626             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
627                           retDict[ports[1]]['rx']['total_pkts']]
628             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
629                           retDict[ports[1]]['rx']['avg_delay_usec']]
630             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
631                           retDict[ports[1]]['rx']['max_delay_usec']]
632             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
633                           retDict[ports[1]]['rx']['min_delay_usec']]
634             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
635             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
636             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
637             for key in ['pkt_bit_rate', 'pkt_rate']:
638                 for dirc in ['tx', 'rx']:
639                     retDict['overall'][dirc][key] /= 2.0
640         else:
641             retDict['overall'] = retDict[ports[0]]
642         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
643         return retDict
644
645     def __convert_rates(self, rate):
646         return utils.convert_rates(self.run_config['l2frame_size'],
647                                    rate,
648                                    self.config.generator_config.intf_speed)
649
650     def __ndr_pdr_found(self, tag, load):
651         rates = self.__convert_rates({'rate_percent': load})
652         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
653         last_stats = self.iteration_collector.peek()
654         self.interval_collector.add_ndr_pdr(tag, last_stats)
655
656     def __format_output_stats(self, stats):
657         for key in self.PORTS + ['overall']:
658             interface = stats[key]
659             stats[key] = {
660                 'tx_pkts': interface['tx']['total_pkts'],
661                 'rx_pkts': interface['rx']['total_pkts'],
662                 'drop_percentage': interface['drop_rate_percent'],
663                 'drop_pct': interface['rx']['dropped_pkts'],
664                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
665                 'max_delay_usec': interface['rx']['max_delay_usec'],
666                 'min_delay_usec': interface['rx']['min_delay_usec'],
667             }
668
669         return stats
670
671     def __targets_found(self, rate, targets, results):
672         for tag, target in targets.iteritems():
673             LOG.info('Found %s (%s) load: %s', tag, target, rate)
674             self.__ndr_pdr_found(tag, rate)
675             results[tag]['timestamp_sec'] = time.time()
676
677     def __range_search(self, left, right, targets, results):
678         """Perform a binary search for a list of targets inside a [left..right] range or rate.
679
680         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
681                 indicating the rate to send on each interface
682         right   the right side of the range to search as a % of line rate
683                 indicating the rate to send on each interface
684         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
685                 ('ndr', 'pdr')
686         results a dict to store results
687         """
688         if not targets:
689             return
690         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
691
692         # Terminate search when gap is less than load epsilon
693         if right - left < self.config.measurement.load_epsilon:
694             self.__targets_found(left, targets, results)
695             return
696
697         # Obtain the average drop rate in for middle load
698         middle = (left + right) / 2.0
699         try:
700             stats, rates = self.__run_search_iteration(middle)
701         except STLError:
702             LOG.exception("Got exception from traffic generator during binary search")
703             self.__targets_found(left, targets, results)
704             return
705         # Split target dicts based on the avg drop rate
706         left_targets = {}
707         right_targets = {}
708         for tag, target in targets.iteritems():
709             if stats['overall']['drop_rate_percent'] <= target:
710                 # record the best possible rate found for this target
711                 results[tag] = rates
712                 results[tag].update({
713                     'load_percent_per_direction': middle,
714                     'stats': self.__format_output_stats(dict(stats)),
715                     'timestamp_sec': None
716                 })
717                 right_targets[tag] = target
718             else:
719                 # initialize to 0 all fields of result for
720                 # the worst case scenario of the binary search (if ndr/pdr is not found)
721                 if tag not in results:
722                     results[tag] = dict.fromkeys(rates, 0)
723                     empty_stats = self.__format_output_stats(dict(stats))
724                     for key in empty_stats:
725                         if isinstance(empty_stats[key], dict):
726                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
727                         else:
728                             empty_stats[key] = 0
729                     results[tag].update({
730                         'load_percent_per_direction': 0,
731                         'stats': empty_stats,
732                         'timestamp_sec': None
733                     })
734                 left_targets[tag] = target
735
736         # search lower half
737         self.__range_search(left, middle, left_targets, results)
738
739         # search upper half only if the upper rate does not exceed
740         # 100%, this only happens when the first search at 100%
741         # yields a DR that is < target DR
742         if middle >= 100:
743             self.__targets_found(100, right_targets, results)
744         else:
745             self.__range_search(middle, right, right_targets, results)
746
747     def __run_search_iteration(self, rate):
748         # set load
749         self.modify_load(rate)
750
751         # poll interval stats and collect them
752         for stats in self.run_traffic():
753             self.interval_collector.add(stats)
754             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
755             if time_elapsed_ratio >= 1:
756                 self.cancel_traffic()
757         self.interval_collector.reset()
758
759         # get stats from the run
760         stats = self.runner.client.get_stats()
761         current_traffic_config = self.get_traffic_config()
762         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
763                                         stats['total_tx_rate'])
764         if warning is not None:
765             stats['warning'] = warning
766
767         # save reliable stats from whole iteration
768         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
769         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
770
771         return stats, current_traffic_config['direction-total']
772
773     @staticmethod
774     def log_stats(stats):
775         report = {
776             'datetime': str(datetime.now()),
777             'tx_packets': stats['overall']['tx']['total_pkts'],
778             'rx_packets': stats['overall']['rx']['total_pkts'],
779             'drop_packets': stats['overall']['rx']['dropped_pkts'],
780             'drop_rate_percent': stats['overall']['drop_rate_percent']
781         }
782         LOG.info('TX: %(tx_packets)d; '
783                  'RX: %(rx_packets)d; '
784                  'Dropped: %(drop_packets)d; '
785                  'Drop rate: %(drop_rate_percent).4f%%',
786                  report)
787
788     def run_traffic(self):
789         stats = self.runner.run()
790         while self.runner.is_running:
791             self.log_stats(stats)
792             yield stats
793             stats = self.runner.poll_stats()
794             if stats is None:
795                 return
796         self.log_stats(stats)
797         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
798         yield stats
799
800     def cancel_traffic(self):
801         self.runner.stop()
802
803     def get_interface(self, port_index, stats):
804         port = self.gen.port_handle[port_index]
805         tx, rx = 0, 0
806         if stats and port in stats:
807             tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts'])
808         return Interface('traffic-generator', self.tool.lower(), tx, rx)
809
810     def get_traffic_config(self):
811         config = {}
812         load_total = 0.0
813         bps_total = 0.0
814         pps_total = 0.0
815         for idx, rate in enumerate(self.run_config['rates']):
816             key = 'direction-forward' if idx == 0 else 'direction-reverse'
817             config[key] = {
818                 'l2frame_size': self.run_config['l2frame_size'],
819                 'duration_sec': self.run_config['duration_sec']
820             }
821             config[key].update(rate)
822             config[key].update(self.__convert_rates(rate))
823             load_total += float(config[key]['rate_percent'])
824             bps_total += float(config[key]['rate_bps'])
825             pps_total += float(config[key]['rate_pps'])
826         config['direction-total'] = dict(config['direction-forward'])
827         config['direction-total'].update({
828             'rate_percent': load_total,
829             'rate_pps': cast_integer(pps_total),
830             'rate_bps': bps_total
831         })
832
833         return config
834
835     def get_run_config(self, results):
836         """Return configuration which was used for the last run."""
837         r = {}
838         # because we want each direction to have the far end RX rates,
839         # use the far end index (1-idx) to retrieve the RX rates
840         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
841             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
842             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
843             r[key] = {
844                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
845                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
846                 "rx": self.__convert_rates({'rate_pps': rx_rate})
847             }
848
849         total = {}
850         for direction in ['orig', 'tx', 'rx']:
851             total[direction] = {}
852             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
853                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
854
855         r['direction-total'] = total
856         return r
857
858     @staticmethod
859     def compare_tx_rates(required, actual):
860         threshold = 0.9
861         are_different = False
862         try:
863             if float(actual) / required < threshold:
864                 are_different = True
865         except ZeroDivisionError:
866             are_different = True
867
868         if are_different:
869             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
870                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
871                   "to achieve the requested TX rate.".format(r=required, a=actual)
872             LOG.info(msg)
873             return msg
874
875         return None
876
877     def get_per_direction_rate(self):
878         divisor = 2 if self.run_config['bidirectional'] else 1
879         if 'rate_percent' in self.current_total_rate:
880             # don't split rate if it's percentage
881             divisor = 1
882
883         return utils.divide_rate(self.current_total_rate, divisor)
884
885     def close(self):
886         try:
887             self.gen.stop_traffic()
888         except Exception:
889             pass
890         self.gen.clear_stats()
891         self.gen.cleanup()