[NFVBENCH-67] NFVbench should have same TX rate for different rate formats
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 from datetime import datetime
16 import re
17 import socket
18 import struct
19 import time
20
21 from attrdict import AttrDict
22 import bitmath
23 from netaddr import IPNetwork
24 # pylint: disable=import-error
25 from trex_stl_lib.api import STLError
26 # pylint: enable=import-error
27
28 from log import LOG
29 from network import Interface
30 from specs import ChainType
31 from stats_collector import IntervalCollector
32 from stats_collector import IterationCollector
33 import traffic_gen.traffic_utils as utils
34 from utils import cast_integer
35
36
37 class TrafficClientException(Exception):
38     pass
39
40
41 class TrafficRunner(object):
42     def __init__(self, client, duration_sec, interval_sec=0):
43         self.client = client
44         self.start_time = None
45         self.duration_sec = duration_sec
46         self.interval_sec = interval_sec
47
48     def run(self):
49         LOG.info('Running traffic generator')
50         self.client.gen.clear_stats()
51         self.client.gen.start_traffic()
52         self.start_time = time.time()
53         return self.poll_stats()
54
55     def stop(self):
56         if self.is_running():
57             self.start_time = None
58             self.client.gen.stop_traffic()
59
60     def is_running(self):
61         return self.start_time is not None
62
63     def time_elapsed(self):
64         if self.is_running():
65             return time.time() - self.start_time
66         return self.duration_sec
67
68     def poll_stats(self):
69         if not self.is_running():
70             return None
71         if self.client.skip_sleep:
72             self.stop()
73             return self.client.get_stats()
74         time_elapsed = self.time_elapsed()
75         if time_elapsed > self.duration_sec:
76             self.stop()
77             return None
78         time_left = self.duration_sec - time_elapsed
79         if self.interval_sec > 0.0:
80             if time_left <= self.interval_sec:
81                 time.sleep(time_left)
82                 self.stop()
83             else:
84                 time.sleep(self.interval_sec)
85         else:
86             time.sleep(self.duration_sec)
87             self.stop()
88         return self.client.get_stats()
89
90
91 class IpBlock(object):
92     def __init__(self, base_ip, step_ip, count_ip):
93         self.base_ip_int = Device.ip_to_int(base_ip)
94         self.step = Device.ip_to_int(step_ip)
95         self.max_available = count_ip
96         self.next_free = 0
97
98     def get_ip(self, index=0):
99         '''Return the IP address at given index
100         '''
101         if index < 0 or index >= self.max_available:
102             raise IndexError('Index out of bounds')
103         return Device.int_to_ip(self.base_ip_int + index * self.step)
104
105     def reserve_ip_range(self, count):
106         '''Reserve a range of count consecutive IP addresses spaced by step
107         '''
108         if self.next_free + count > self.max_available:
109             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
110                              (self.next_free,
111                               self.max_available,
112                               count))
113         first_ip = self.get_ip(self.next_free)
114         last_ip = self.get_ip(self.next_free + count - 1)
115         self.next_free += count
116         return (first_ip, last_ip)
117
118     def reset_reservation(self):
119         self.next_free = 0
120
121
122 class Device(object):
123     def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None,
124                  gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None,
125                  gateway_ip_addrs_step=None, udp_src_port=None, udp_dst_port=None,
126                  dst_mac=None, chain_count=1, flow_count=1, vlan_tagging=False):
127         self.chain_count = chain_count
128         self.flow_count = flow_count
129         self.dst = None
130         self.port = port
131         self.switch_port = switch_port
132         self.vtep_vlan = vtep_vlan
133         self.vlan_tag = None
134         self.vlan_tagging = vlan_tagging
135         self.pci = pci
136         self.mac = None
137         self.dst_mac = dst_mac
138         self.vm_mac_list = None
139         subnet = IPNetwork(ip)
140         self.ip = subnet.ip.format()
141         self.ip_prefixlen = subnet.prefixlen
142         self.ip_addrs_step = ip_addrs_step
143         self.tg_gateway_ip_addrs_step = tg_gateway_ip_addrs_step
144         self.gateway_ip_addrs_step = gateway_ip_addrs_step
145         self.gateway_ip = gateway_ip
146         self.tg_gateway_ip = tg_gateway_ip
147         self.ip_block = IpBlock(self.ip, ip_addrs_step, flow_count)
148         self.gw_ip_block = IpBlock(gateway_ip,
149                                    gateway_ip_addrs_step,
150                                    chain_count)
151         self.tg_gw_ip_block = IpBlock(tg_gateway_ip,
152                                       tg_gateway_ip_addrs_step,
153                                       chain_count)
154         self.udp_src_port = udp_src_port
155         self.udp_dst_port = udp_dst_port
156
157     def set_mac(self, mac):
158         if mac is None:
159             raise TrafficClientException('Trying to set traffic generator MAC address as None')
160         self.mac = mac
161
162     def set_destination(self, dst):
163         self.dst = dst
164
165     def set_vm_mac_list(self, vm_mac_list):
166         self.vm_mac_list = map(str, vm_mac_list)
167
168     def set_vlan_tag(self, vlan_tag):
169         if self.vlan_tagging and vlan_tag is None:
170             raise TrafficClientException('Trying to set VLAN tag as None')
171         self.vlan_tag = vlan_tag
172
173     def get_gw_ip(self, chain_index):
174         '''Retrieve the IP address assigned for the gateway of a given chain
175         '''
176         return self.gw_ip_block.get_ip(chain_index)
177
178     def get_stream_configs(self, service_chain):
179         configs = []
180         # exact flow count for each chain is calculated as follows:
181         # - all chains except the first will have the same flow count
182         #   calculated as (total_flows + chain_count - 1) / chain_count
183         # - the first chain will have the remainder
184         # example 11 flows and 3 chains => 3, 4, 4
185         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
186         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
187
188         self.ip_block.reset_reservation()
189         self.dst.ip_block.reset_reservation()
190
191         for chain_idx in xrange(self.chain_count):
192             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
193             dst_ip_first, dst_ip_last = self.dst.ip_block.reserve_ip_range(cur_chain_flow_count)
194
195             dst_mac = self.dst_mac[chain_idx] if self.dst_mac is not None else self.dst.mac
196             if not re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", dst_mac.lower()):
197                 raise TrafficClientException("Invalid MAC address '{mac}' specified in "
198                                              "mac_addrs_left/right".format(mac=dst_mac))
199
200             configs.append({
201                 'count': cur_chain_flow_count,
202                 'mac_src': self.mac,
203                 'mac_dst': dst_mac if service_chain == ChainType.EXT else self.vm_mac_list[
204                     chain_idx],
205                 'ip_src_addr': src_ip_first,
206                 'ip_src_addr_max': src_ip_last,
207                 'ip_src_count': cur_chain_flow_count,
208                 'ip_dst_addr': dst_ip_first,
209                 'ip_dst_addr_max': dst_ip_last,
210                 'ip_dst_count': cur_chain_flow_count,
211                 'ip_addrs_step': self.ip_addrs_step,
212                 'udp_src_port': self.udp_src_port,
213                 'udp_dst_port': self.udp_dst_port,
214                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
215                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
216                 'ip_dst_tg_gw': self.dst.tg_gw_ip_block.get_ip(chain_idx),
217                 'vlan_tag': self.vlan_tag if self.vlan_tagging else None
218             })
219             # after first chain, fall back to the flow count for all other chains
220             cur_chain_flow_count = flows_per_chain
221
222         return configs
223
224     def ip_range_overlaps(self):
225         '''Check if this device ip range is overlapping with the dst device ip range
226         '''
227         src_base_ip = Device.ip_to_int(self.ip)
228         dst_base_ip = Device.ip_to_int(self.dst.ip)
229         src_last_ip = src_base_ip + self.flow_count - 1
230         dst_last_ip = dst_base_ip + self.flow_count - 1
231         return dst_last_ip >= src_base_ip and src_last_ip >= dst_base_ip
232
233     @staticmethod
234     def mac_to_int(mac):
235         return int(mac.translate(None, ":.- "), 16)
236
237     @staticmethod
238     def int_to_mac(i):
239         mac = format(i, 'x').zfill(12)
240         blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)]
241         return ':'.join(blocks)
242
243     @staticmethod
244     def ip_to_int(addr):
245         return struct.unpack("!I", socket.inet_aton(addr))[0]
246
247     @staticmethod
248     def int_to_ip(nvalue):
249         return socket.inet_ntoa(struct.pack("!I", nvalue))
250
251
252 class RunningTrafficProfile(object):
253     """Represents traffic configuration for currently running traffic profile."""
254
255     DEFAULT_IP_STEP = '0.0.0.1'
256     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
257
258     def __init__(self, config, generator_profile):
259         generator_config = self.__match_generator_profile(config.traffic_generator,
260                                                           generator_profile)
261         self.generator_config = generator_config
262         self.service_chain = config.service_chain
263         self.service_chain_count = config.service_chain_count
264         self.flow_count = config.flow_count
265         self.host_name = generator_config.host_name
266         self.name = generator_config.name
267         self.tool = generator_config.tool
268         self.cores = generator_config.get('cores', 1)
269         self.ip_addrs_step = generator_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
270         self.tg_gateway_ip_addrs_step = \
271             generator_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
272         self.gateway_ip_addrs_step = generator_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
273         self.gateway_ips = generator_config.gateway_ip_addrs
274         self.ip = generator_config.ip
275         self.intf_speed = bitmath.parse_string(generator_config.intf_speed.replace('ps', '')).bits
276         self.vlan_tagging = config.vlan_tagging
277         self.no_arp = config.no_arp
278         self.src_device = None
279         self.dst_device = None
280         self.vm_mac_list = None
281         self.mac_addrs_left = generator_config.mac_addrs_left
282         self.mac_addrs_right = generator_config.mac_addrs_right
283         self.__prep_interfaces(generator_config)
284
285     def to_json(self):
286         return dict(self.generator_config)
287
288     def set_vm_mac_list(self, vm_mac_list):
289         self.src_device.set_vm_mac_list(vm_mac_list[0])
290         self.dst_device.set_vm_mac_list(vm_mac_list[1])
291
292     @staticmethod
293     def __match_generator_profile(traffic_generator, generator_profile):
294         generator_config = AttrDict(traffic_generator)
295         generator_config.pop('default_profile')
296         generator_config.pop('generator_profile')
297         matching_profile = [profile for profile in traffic_generator.generator_profile if
298                             profile.name == generator_profile]
299         if len(matching_profile) != 1:
300             raise Exception('Traffic generator profile not found: ' + generator_profile)
301
302         generator_config.update(matching_profile[0])
303
304         return generator_config
305
306     def __prep_interfaces(self, generator_config):
307         src_config = {
308             'chain_count': self.service_chain_count,
309             'flow_count': self.flow_count / 2,
310             'ip': generator_config.ip_addrs[0],
311             'ip_addrs_step': self.ip_addrs_step,
312             'gateway_ip': self.gateway_ips[0],
313             'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
314             'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[0],
315             'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
316             'udp_src_port': generator_config.udp_src_port,
317             'udp_dst_port': generator_config.udp_dst_port,
318             'vlan_tagging': self.vlan_tagging,
319             'dst_mac': generator_config.mac_addrs_left
320         }
321         dst_config = {
322             'chain_count': self.service_chain_count,
323             'flow_count': self.flow_count / 2,
324             'ip': generator_config.ip_addrs[1],
325             'ip_addrs_step': self.ip_addrs_step,
326             'gateway_ip': self.gateway_ips[1],
327             'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
328             'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[1],
329             'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
330             'udp_src_port': generator_config.udp_src_port,
331             'udp_dst_port': generator_config.udp_dst_port,
332             'vlan_tagging': self.vlan_tagging,
333             'dst_mac': generator_config.mac_addrs_right
334         }
335
336         self.src_device = Device(**dict(src_config, **generator_config.interfaces[0]))
337         self.dst_device = Device(**dict(dst_config, **generator_config.interfaces[1]))
338         self.src_device.set_destination(self.dst_device)
339         self.dst_device.set_destination(self.src_device)
340
341         if self.service_chain == ChainType.EXT and not self.no_arp \
342                 and self.src_device.ip_range_overlaps():
343             raise Exception('Overlapping IP address ranges src=%s dst=%d flows=%d' %
344                             self.src_device.ip,
345                             self.dst_device.ip,
346                             self.flow_count)
347
348     @property
349     def devices(self):
350         return [self.src_device, self.dst_device]
351
352     @property
353     def vlans(self):
354         return [self.src_device.vtep_vlan, self.dst_device.vtep_vlan]
355
356     @property
357     def ports(self):
358         return [self.src_device.port, self.dst_device.port]
359
360     @property
361     def switch_ports(self):
362         return [self.src_device.switch_port, self.dst_device.switch_port]
363
364     @property
365     def pcis(self):
366         return [self.src_device.pci, self.dst_device.pci]
367
368
369 class TrafficGeneratorFactory(object):
370     def __init__(self, config):
371         self.config = config
372
373     def get_tool(self):
374         return self.config.generator_config.tool
375
376     def get_generator_client(self):
377         tool = self.get_tool().lower()
378         if tool == 'trex':
379             from traffic_gen import trex
380             return trex.TRex(self.config)
381         elif tool == 'dummy':
382             from traffic_gen import dummy
383             return dummy.DummyTG(self.config)
384         return None
385
386     def list_generator_profile(self):
387         return [profile.name for profile in self.config.traffic_generator.generator_profile]
388
389     def get_generator_config(self, generator_profile):
390         return RunningTrafficProfile(self.config, generator_profile)
391
392     def get_matching_profile(self, traffic_profile_name):
393         matching_profile = [profile for profile in self.config.traffic_profile if
394                             profile.name == traffic_profile_name]
395
396         if len(matching_profile) > 1:
397             raise Exception('Multiple traffic profiles with the same name found.')
398         elif not matching_profile:
399             raise Exception('No traffic profile found.')
400
401         return matching_profile[0]
402
403     def get_frame_sizes(self, traffic_profile):
404         matching_profile = self.get_matching_profile(traffic_profile)
405         return matching_profile.l2frame_size
406
407
408 class TrafficClient(object):
409     PORTS = [0, 1]
410
411     def __init__(self, config, notifier=None, skip_sleep=False):
412         generator_factory = TrafficGeneratorFactory(config)
413         self.gen = generator_factory.get_generator_client()
414         self.tool = generator_factory.get_tool()
415         self.config = config
416         self.notifier = notifier
417         self.interval_collector = None
418         self.iteration_collector = None
419         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
420         if self.gen is None:
421             raise TrafficClientException('%s is not a supported traffic generator' % self.tool)
422
423         self.run_config = {
424             'l2frame_size': None,
425             'duration_sec': self.config.duration_sec,
426             'bidirectional': True,
427             'rates': []  # to avoid unsbuscriptable-obj warning
428         }
429         self.current_total_rate = {'rate_percent': '10'}
430         if self.config.single_run:
431             self.current_total_rate = utils.parse_rate_str(self.config.rate)
432         # UT with dummy TG can bypass all sleeps
433         self.skip_sleep = skip_sleep
434
435     def set_macs(self):
436         for mac, device in zip(self.gen.get_macs(), self.config.generator_config.devices):
437             device.set_mac(mac)
438
439     def start_traffic_generator(self):
440         self.gen.init()
441         self.gen.connect()
442
443     def setup(self):
444         self.gen.set_mode()
445         self.gen.config_interface()
446         self.gen.clear_stats()
447
448     def get_version(self):
449         return self.gen.get_version()
450
451     def ensure_end_to_end(self):
452         """
453         Ensure traffic generator receives packets it has transmitted.
454         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
455
456         At this point all VMs are in active state, but forwarding does not have to work.
457         Small amount of traffic is sent to every chain. Then total of sent and received packets
458         is compared. If ratio between received and transmitted packets is higher than (N-1)/N,
459         N being number of chains, traffic flows through every chain and real measurements can be
460         performed.
461
462         Example:
463             PVP chain (1 VM per chain)
464             N = 10 (number of chains)
465             threshold = (N-1)/N = 9/10 = 0.9 (acceptable ratio ensuring working conditions)
466             if total_received/total_sent > 0.9, traffic is flowing to more than 9 VMs meaning
467             all 10 VMs are in operational state.
468         """
469         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
470         rate_pps = {'rate_pps': str(self.config.service_chain_count * 100)}
471         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
472
473         # ensures enough traffic is coming back
474         threshold = (self.config.service_chain_count - 1) / float(self.config.service_chain_count)
475         retry_count = (self.config.check_traffic_time_sec +
476                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
477         for it in xrange(retry_count):
478             self.gen.clear_stats()
479             self.gen.start_traffic()
480             LOG.info('Waiting for packets to be received back... (%d / %d)', it + 1, retry_count)
481             if not self.skip_sleep:
482                 time.sleep(self.config.generic_poll_sec)
483             self.gen.stop_traffic()
484             stats = self.gen.get_stats()
485
486             # compute total sent and received traffic on both ports
487             total_rx = 0
488             total_tx = 0
489             for port in self.PORTS:
490                 total_rx += float(stats[port]['rx'].get('total_pkts', 0))
491                 total_tx += float(stats[port]['tx'].get('total_pkts', 0))
492
493             # how much of traffic came back
494             ratio = total_rx / total_tx if total_tx else 0
495
496             if ratio > threshold:
497                 self.gen.clear_stats()
498                 self.gen.clear_streamblock()
499                 LOG.info('End-to-end connectivity ensured')
500                 return
501
502             if not self.skip_sleep:
503                 time.sleep(self.config.generic_poll_sec)
504
505         raise TrafficClientException('End-to-end connectivity cannot be ensured')
506
507     def ensure_arp_successful(self):
508         if not self.gen.resolve_arp():
509             raise TrafficClientException('ARP cannot be resolved')
510
511     def set_traffic(self, frame_size, bidirectional):
512         self.run_config['bidirectional'] = bidirectional
513         self.run_config['l2frame_size'] = frame_size
514         self.run_config['rates'] = [self.get_per_direction_rate()]
515         if bidirectional:
516             self.run_config['rates'].append(self.get_per_direction_rate())
517         else:
518             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
519             if unidir_reverse_pps > 0:
520                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
521         # Fix for [NFVBENCH-67], convert the rate string to PPS
522         for idx, rate in enumerate(self.run_config['rates']):
523             if 'rate_pps' not in rate:
524                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
525
526         self.gen.clear_streamblock()
527         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
528
529     def modify_load(self, load):
530         self.current_total_rate = {'rate_percent': str(load)}
531         rate_per_direction = self.get_per_direction_rate()
532
533         self.gen.modify_rate(rate_per_direction, False)
534         self.run_config['rates'][0] = rate_per_direction
535         if self.run_config['bidirectional']:
536             self.gen.modify_rate(rate_per_direction, True)
537             self.run_config['rates'][1] = rate_per_direction
538
539     def get_ndr_and_pdr(self):
540         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
541         targets = {}
542         if self.config.ndr_run:
543             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
544             targets['ndr'] = self.config.measurement.NDR
545         if self.config.pdr_run:
546             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
547             targets['pdr'] = self.config.measurement.PDR
548
549         self.run_config['start_time'] = time.time()
550         self.interval_collector = IntervalCollector(self.run_config['start_time'])
551         self.interval_collector.attach_notifier(self.notifier)
552         self.iteration_collector = IterationCollector(self.run_config['start_time'])
553         results = {}
554         self.__range_search(0.0, 200.0, targets, results)
555
556         results['iteration_stats'] = {
557             'ndr_pdr': self.iteration_collector.get()
558         }
559
560         if self.config.ndr_run:
561             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
562             results['ndr']['time_taken_sec'] = \
563                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
564             if self.config.pdr_run:
565                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
566                 results['pdr']['time_taken_sec'] = \
567                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
568         else:
569             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
570             results['pdr']['time_taken_sec'] = \
571                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
572         return results
573
574     def __get_dropped_rate(self, result):
575         dropped_pkts = result['rx']['dropped_pkts']
576         total_pkts = result['tx']['total_pkts']
577         if not total_pkts:
578             return float('inf')
579         return float(dropped_pkts) / total_pkts * 100
580
581     def get_stats(self):
582         stats = self.gen.get_stats()
583         retDict = {'total_tx_rate': stats['total_tx_rate']}
584         for port in self.PORTS:
585             retDict[port] = {'tx': {}, 'rx': {}}
586
587         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
588         rx_keys = tx_keys + ['dropped_pkts']
589
590         for port in self.PORTS:
591             for key in tx_keys:
592                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
593             for key in rx_keys:
594                 try:
595                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
596                 except ValueError:
597                     retDict[port]['rx'][key] = 0
598             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
599                 stats[port]['rx']['avg_delay_usec'])
600             retDict[port]['rx']['min_delay_usec'] = cast_integer(
601                 stats[port]['rx']['min_delay_usec'])
602             retDict[port]['rx']['max_delay_usec'] = cast_integer(
603                 stats[port]['rx']['max_delay_usec'])
604             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
605
606         ports = sorted(retDict.keys())
607         if self.run_config['bidirectional']:
608             retDict['overall'] = {'tx': {}, 'rx': {}}
609             for key in tx_keys:
610                 retDict['overall']['tx'][key] = \
611                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
612             for key in rx_keys:
613                 retDict['overall']['rx'][key] = \
614                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
615             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
616                           retDict[ports[1]]['rx']['total_pkts']]
617             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
618                           retDict[ports[1]]['rx']['avg_delay_usec']]
619             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
620                           retDict[ports[1]]['rx']['max_delay_usec']]
621             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
622                           retDict[ports[1]]['rx']['min_delay_usec']]
623             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
624             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
625             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
626             for key in ['pkt_bit_rate', 'pkt_rate']:
627                 for dirc in ['tx', 'rx']:
628                     retDict['overall'][dirc][key] /= 2.0
629         else:
630             retDict['overall'] = retDict[ports[0]]
631         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
632         return retDict
633
634     def __convert_rates(self, rate):
635         return utils.convert_rates(self.run_config['l2frame_size'],
636                                    rate,
637                                    self.config.generator_config.intf_speed)
638
639     def __ndr_pdr_found(self, tag, load):
640         rates = self.__convert_rates({'rate_percent': load})
641         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
642         last_stats = self.iteration_collector.peek()
643         self.interval_collector.add_ndr_pdr(tag, last_stats)
644
645     def __format_output_stats(self, stats):
646         for key in self.PORTS + ['overall']:
647             interface = stats[key]
648             stats[key] = {
649                 'tx_pkts': interface['tx']['total_pkts'],
650                 'rx_pkts': interface['rx']['total_pkts'],
651                 'drop_percentage': interface['drop_rate_percent'],
652                 'drop_pct': interface['rx']['dropped_pkts'],
653                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
654                 'max_delay_usec': interface['rx']['max_delay_usec'],
655                 'min_delay_usec': interface['rx']['min_delay_usec'],
656             }
657
658         return stats
659
660     def __targets_found(self, rate, targets, results):
661         for tag, target in targets.iteritems():
662             LOG.info('Found %s (%s) load: %s', tag, target, rate)
663             self.__ndr_pdr_found(tag, rate)
664             results[tag]['timestamp_sec'] = time.time()
665
666     def __range_search(self, left, right, targets, results):
667         '''Perform a binary search for a list of targets inside a [left..right] range or rate
668
669         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
670                 indicating the rate to send on each interface
671         right   the right side of the range to search as a % of line rate
672                 indicating the rate to send on each interface
673         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
674                 ('ndr', 'pdr')
675         results a dict to store results
676         '''
677         if not targets:
678             return
679         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
680
681         # Terminate search when gap is less than load epsilon
682         if right - left < self.config.measurement.load_epsilon:
683             self.__targets_found(left, targets, results)
684             return
685
686         # Obtain the average drop rate in for middle load
687         middle = (left + right) / 2.0
688         try:
689             stats, rates = self.__run_search_iteration(middle)
690         except STLError:
691             LOG.exception("Got exception from traffic generator during binary search")
692             self.__targets_found(left, targets, results)
693             return
694         # Split target dicts based on the avg drop rate
695         left_targets = {}
696         right_targets = {}
697         for tag, target in targets.iteritems():
698             if stats['overall']['drop_rate_percent'] <= target:
699                 # record the best possible rate found for this target
700                 results[tag] = rates
701                 results[tag].update({
702                     'load_percent_per_direction': middle,
703                     'stats': self.__format_output_stats(dict(stats)),
704                     'timestamp_sec': None
705                 })
706                 right_targets[tag] = target
707             else:
708                 # initialize to 0 all fields of result for
709                 # the worst case scenario of the binary search (if ndr/pdr is not found)
710                 if tag not in results:
711                     results[tag] = dict.fromkeys(rates, 0)
712                     empty_stats = self.__format_output_stats(dict(stats))
713                     for key in empty_stats:
714                         if isinstance(empty_stats[key], dict):
715                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
716                         else:
717                             empty_stats[key] = 0
718                     results[tag].update({
719                         'load_percent_per_direction': 0,
720                         'stats': empty_stats,
721                         'timestamp_sec': None
722                     })
723                 left_targets[tag] = target
724
725         # search lower half
726         self.__range_search(left, middle, left_targets, results)
727
728         # search upper half only if the upper rate does not exceed
729         # 100%, this only happens when the first search at 100%
730         # yields a DR that is < target DR
731         if middle >= 100:
732             self.__targets_found(100, right_targets, results)
733         else:
734             self.__range_search(middle, right, right_targets, results)
735
736     def __run_search_iteration(self, rate):
737         # set load
738         self.modify_load(rate)
739
740         # poll interval stats and collect them
741         for stats in self.run_traffic():
742             self.interval_collector.add(stats)
743             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
744             if time_elapsed_ratio >= 1:
745                 self.cancel_traffic()
746         self.interval_collector.reset()
747
748         # get stats from the run
749         stats = self.runner.client.get_stats()
750         current_traffic_config = self.get_traffic_config()
751         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
752                                         stats['total_tx_rate'])
753         if warning is not None:
754             stats['warning'] = warning
755
756         # save reliable stats from whole iteration
757         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
758         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
759
760         return stats, current_traffic_config['direction-total']
761
762     @staticmethod
763     def log_stats(stats):
764         report = {
765             'datetime': str(datetime.now()),
766             'tx_packets': stats['overall']['tx']['total_pkts'],
767             'rx_packets': stats['overall']['rx']['total_pkts'],
768             'drop_packets': stats['overall']['rx']['dropped_pkts'],
769             'drop_rate_percent': stats['overall']['drop_rate_percent']
770         }
771         LOG.info('TX: %(tx_packets)d; '
772                  'RX: %(rx_packets)d; '
773                  'Dropped: %(drop_packets)d; '
774                  'Drop rate: %(drop_rate_percent).4f%%',
775                  report)
776
777     def run_traffic(self):
778         stats = self.runner.run()
779         while self.runner.is_running:
780             self.log_stats(stats)
781             yield stats
782             stats = self.runner.poll_stats()
783             if stats is None:
784                 return
785         self.log_stats(stats)
786         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
787         yield stats
788
789     def cancel_traffic(self):
790         self.runner.stop()
791
792     def get_interface(self, port_index):
793         port = self.gen.port_handle[port_index]
794         tx, rx = 0, 0
795         if not self.config.no_traffic:
796             stats = self.get_stats()
797             if port in stats:
798                 tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts'])
799         return Interface('traffic-generator', self.tool.lower(), tx, rx)
800
801     def get_traffic_config(self):
802         config = {}
803         load_total = 0.0
804         bps_total = 0.0
805         pps_total = 0.0
806         for idx, rate in enumerate(self.run_config['rates']):
807             key = 'direction-forward' if idx == 0 else 'direction-reverse'
808             config[key] = {
809                 'l2frame_size': self.run_config['l2frame_size'],
810                 'duration_sec': self.run_config['duration_sec']
811             }
812             config[key].update(rate)
813             config[key].update(self.__convert_rates(rate))
814             load_total += float(config[key]['rate_percent'])
815             bps_total += float(config[key]['rate_bps'])
816             pps_total += float(config[key]['rate_pps'])
817         config['direction-total'] = dict(config['direction-forward'])
818         config['direction-total'].update({
819             'rate_percent': load_total,
820             'rate_pps': cast_integer(pps_total),
821             'rate_bps': bps_total
822         })
823
824         return config
825
826     def get_run_config(self, results):
827         """Returns configuration which was used for the last run."""
828         r = {}
829         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
830             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
831             rx_rate = results["stats"][idx]["rx"]["total_pkts"] / self.config.duration_sec
832             r[key] = {
833                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
834                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
835                 "rx": self.__convert_rates({'rate_pps': rx_rate})
836             }
837
838         total = {}
839         for direction in ['orig', 'tx', 'rx']:
840             total[direction] = {}
841             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
842
843                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
844
845         r['direction-total'] = total
846         return r
847
848     @staticmethod
849     def compare_tx_rates(required, actual):
850         threshold = 0.9
851         are_different = False
852         try:
853             if float(actual) / required < threshold:
854                 are_different = True
855         except ZeroDivisionError:
856             are_different = True
857
858         if are_different:
859             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
860                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
861                   "to achieve the requested TX rate.".format(r=required, a=actual)
862             LOG.info(msg)
863             return msg
864
865         return None
866
867     def get_per_direction_rate(self):
868         divisor = 2 if self.run_config['bidirectional'] else 1
869         if 'rate_percent' in self.current_total_rate:
870             # don't split rate if it's percentage
871             divisor = 1
872
873         return utils.divide_rate(self.current_total_rate, divisor)
874
875     def close(self):
876         try:
877             self.gen.stop_traffic()
878         except Exception:
879             pass
880         self.gen.clear_stats()
881         self.gen.cleanup()