27ff22764af21235f7dc5734773538bc51aac8ac
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 from attrdict import AttrDict
16 import bitmath
17 from datetime import datetime
18 from log import LOG
19 from netaddr import IPNetwork
20 from network import Interface
21 import socket
22 from specs import ChainType
23 from stats_collector import IntervalCollector
24 from stats_collector import IterationCollector
25 import struct
26 import time
27 import traffic_gen.traffic_utils as utils
28
29
30 class TrafficClientException(Exception):
31     pass
32
33
34 class TrafficRunner(object):
35
36     def __init__(self, client, duration_sec, interval_sec=0):
37         self.client = client
38         self.start_time = None
39         self.duration_sec = duration_sec
40         self.interval_sec = interval_sec
41
42     def run(self):
43         LOG.info('Running traffic generator')
44         self.client.gen.clear_stats()
45         self.client.gen.start_traffic()
46         self.start_time = time.time()
47         return self.poll_stats()
48
49     def stop(self):
50         if self.is_running():
51             self.start_time = None
52             self.client.gen.stop_traffic()
53
54     def is_running(self):
55         return self.start_time is not None
56
57     def time_elapsed(self):
58         if self.is_running():
59             return time.time() - self.start_time
60         else:
61             return self.duration_sec
62
63     def poll_stats(self):
64         if not self.is_running():
65             return None
66         time_elapsed = self.time_elapsed()
67         if time_elapsed > self.duration_sec:
68             self.stop()
69             return None
70         time_left = self.duration_sec - time_elapsed
71         if self.interval_sec > 0.0:
72             if time_left <= self.interval_sec:
73                 time.sleep(time_left)
74                 self.stop()
75             else:
76                 time.sleep(self.interval_sec)
77         else:
78             time.sleep(self.duration_sec)
79             self.stop()
80         return self.client.get_stats()
81
82
83 class Device(object):
84
85     def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None,
86                  gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None,
87                  gateway_ip_addrs_step=None, udp_src_port=None, udp_dst_port=None,
88                  chain_count=1, flow_count=1, vlan_tagging=False):
89         self.chain_count = chain_count
90         self.flow_count = flow_count
91         self.dst = None
92         self.port = port
93         self.switch_port = switch_port
94         self.vtep_vlan = vtep_vlan
95         self.vlan_tag = None
96         self.vlan_tagging = vlan_tagging
97         self.pci = pci
98         self.mac = None
99         self.vm_mac_list = None
100         subnet = IPNetwork(ip)
101         self.ip = subnet.ip.format()
102         self.ip_prefixlen = subnet.prefixlen
103         self.ip_addrs_step = ip_addrs_step
104         self.tg_gateway_ip_addrs_step = tg_gateway_ip_addrs_step
105         self.gateway_ip_addrs_step = gateway_ip_addrs_step
106         self.ip_list = self.expand_ip(self.ip, self.ip_addrs_step, self.flow_count)
107         self.gateway_ip = gateway_ip
108         self.gateway_ip_list = self.expand_ip(self.gateway_ip,
109                                               self.gateway_ip_addrs_step,
110                                               self.chain_count)
111         self.tg_gateway_ip = tg_gateway_ip
112         self.tg_gateway_ip_list = self.expand_ip(self.tg_gateway_ip,
113                                                  self.tg_gateway_ip_addrs_step,
114                                                  self.chain_count)
115         self.udp_src_port = udp_src_port
116         self.udp_dst_port = udp_dst_port
117
118     def set_mac(self, mac):
119         if mac is None:
120             raise TrafficClientException('Trying to set traffic generator MAC address as None')
121         self.mac = mac
122
123     def set_destination(self, dst):
124         self.dst = dst
125
126     def set_vm_mac_list(self, vm_mac_list):
127         self.vm_mac_list = map(str, vm_mac_list)
128
129     def set_vlan_tag(self, vlan_tag):
130         if self.vlan_tagging and vlan_tag is None:
131             raise TrafficClientException('Trying to set VLAN tag as None')
132         self.vlan_tag = vlan_tag
133
134     def get_stream_configs(self, service_chain):
135         configs = []
136         flow_idx = 0
137         for chain_idx in xrange(self.chain_count):
138             current_flow_count = (self.flow_count - flow_idx) / (self.chain_count - chain_idx)
139             max_idx = flow_idx + current_flow_count - 1
140             ip_src_count = self.ip_to_int(self.ip_list[max_idx]) - \
141                 self.ip_to_int(self.ip_list[flow_idx]) + 1
142             ip_dst_count = self.ip_to_int(self.dst.ip_list[max_idx]) - \
143                 self.ip_to_int(self.dst.ip_list[flow_idx]) + 1
144
145             configs.append({
146                 'count': current_flow_count,
147                 'mac_src': self.mac,
148                 'mac_dst': self.dst.mac if service_chain == ChainType.EXT
149                 else self.vm_mac_list[chain_idx],
150                 'ip_src_addr': self.ip_list[flow_idx],
151                 'ip_src_addr_max': self.ip_list[max_idx],
152                 'ip_src_count': ip_src_count,
153                 'ip_dst_addr': self.dst.ip_list[flow_idx],
154                 'ip_dst_addr_max': self.dst.ip_list[max_idx],
155                 'ip_dst_count': ip_dst_count,
156                 'ip_addrs_step': self.ip_addrs_step,
157                 'udp_src_port': self.udp_src_port,
158                 'udp_dst_port': self.udp_dst_port,
159                 'mac_discovery_gw': self.gateway_ip_list[chain_idx],
160                 'ip_src_tg_gw': self.tg_gateway_ip_list[chain_idx],
161                 'ip_dst_tg_gw': self.dst.tg_gateway_ip_list[chain_idx],
162                 'vlan_tag': self.vlan_tag if self.vlan_tagging else None
163             })
164
165             flow_idx += current_flow_count
166         return configs
167
168     @classmethod
169     def expand_ip(cls, ip, step_ip, count):
170         if step_ip == 'random':
171             # Repeatable Random will used in the stream src/dst IP pairs, but we still need
172             # to expand the IP based on the number of chains and flows configured. So we use
173             # "0.0.0.1" as the step to have the exact IP flow ranges for every chain.
174             step_ip = '0.0.0.1'
175
176         step_ip_in_int = cls.ip_to_int(step_ip)
177         subnet = IPNetwork(ip)
178         ip_list = []
179         for _ in xrange(count):
180             ip_list.append(subnet.ip.format())
181             subnet = subnet.next(step_ip_in_int)
182         return ip_list
183
184     @staticmethod
185     def mac_to_int(mac):
186         return int(mac.translate(None, ":.- "), 16)
187
188     @staticmethod
189     def int_to_mac(i):
190         mac = format(i, 'x').zfill(12)
191         blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)]
192         return ':'.join(blocks)
193
194     @staticmethod
195     def ip_to_int(addr):
196         return struct.unpack("!I", socket.inet_aton(addr))[0]
197
198
199 class RunningTrafficProfile(object):
200     """Represents traffic configuration for currently running traffic profile."""
201
202     DEFAULT_IP_STEP = '0.0.0.1'
203     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
204
205     def __init__(self, config, generator_profile):
206         generator_config = self.__match_generator_profile(config.traffic_generator,
207                                                           generator_profile)
208         self.generator_config = generator_config
209         self.service_chain = config.service_chain
210         self.service_chain_count = config.service_chain_count
211         self.flow_count = config.flow_count
212         self.host_name = generator_config.host_name
213         self.name = generator_config.name
214         self.tool = generator_config.tool
215         self.cores = generator_config.get('cores', 1)
216         self.ip_addrs_step = generator_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
217         self.tg_gateway_ip_addrs_step = \
218             generator_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
219         self.gateway_ip_addrs_step = generator_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
220         self.gateway_ips = generator_config.gateway_ip_addrs
221         self.ip = generator_config.ip
222         self.intf_speed = bitmath.parse_string(generator_config.intf_speed.replace('ps', '')).bits
223         self.vlan_tagging = config.vlan_tagging
224         self.no_arp = config.no_arp
225         self.src_device = None
226         self.dst_device = None
227         self.vm_mac_list = None
228         self.__prep_interfaces(generator_config)
229
230     def to_json(self):
231         return dict(self.generator_config)
232
233     def set_vm_mac_list(self, vm_mac_list):
234         self.src_device.set_vm_mac_list(vm_mac_list[0])
235         self.dst_device.set_vm_mac_list(vm_mac_list[1])
236
237     @staticmethod
238     def __match_generator_profile(traffic_generator, generator_profile):
239         generator_config = AttrDict(traffic_generator)
240         generator_config.pop('default_profile')
241         generator_config.pop('generator_profile')
242         matching_profile = filter(lambda profile: profile.name == generator_profile,
243                                   traffic_generator.generator_profile)
244         if len(matching_profile) != 1:
245             raise Exception('Traffic generator profile not found: ' + generator_profile)
246
247         generator_config.update(matching_profile[0])
248
249         return generator_config
250
251     def __prep_interfaces(self, generator_config):
252         src_config = {
253             'chain_count': self.service_chain_count,
254             'flow_count': self.flow_count / 2,
255             'ip': generator_config.ip_addrs[0],
256             'ip_addrs_step': self.ip_addrs_step,
257             'gateway_ip': self.gateway_ips[0],
258             'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
259             'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[0],
260             'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
261             'udp_src_port': generator_config.udp_src_port,
262             'udp_dst_port': generator_config.udp_dst_port,
263             'vlan_tagging': self.vlan_tagging
264         }
265         dst_config = {
266             'chain_count': self.service_chain_count,
267             'flow_count': self.flow_count / 2,
268             'ip': generator_config.ip_addrs[1],
269             'ip_addrs_step': self.ip_addrs_step,
270             'gateway_ip': self.gateway_ips[1],
271             'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
272             'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[1],
273             'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
274             'udp_src_port': generator_config.udp_src_port,
275             'udp_dst_port': generator_config.udp_dst_port,
276             'vlan_tagging': self.vlan_tagging
277         }
278
279         self.src_device = Device(**dict(src_config, **generator_config.interfaces[0]))
280         self.dst_device = Device(**dict(dst_config, **generator_config.interfaces[1]))
281         self.src_device.set_destination(self.dst_device)
282         self.dst_device.set_destination(self.src_device)
283
284         if self.service_chain == ChainType.EXT and not self.no_arp \
285                 and not self.__are_unique(self.src_device.ip_list, self.dst_device.ip_list):
286             raise Exception('Computed IP addresses are not unique, choose different base. '
287                             'Start IPs: {start}. End IPs: {end}'
288                             .format(start=self.src_device.ip_list,
289                                     end=self.dst_device.ip_list))
290
291     def __are_unique(self, list1, list2):
292         return set(list1).isdisjoint(set(list2))
293
294     @property
295     def devices(self):
296         return [self.src_device, self.dst_device]
297
298     @property
299     def vlans(self):
300         return [self.src_device.vtep_vlan, self.dst_device.vtep_vlan]
301
302     @property
303     def ports(self):
304         return [self.src_device.port, self.dst_device.port]
305
306     @property
307     def switch_ports(self):
308         return [self.src_device.switch_port, self.dst_device.switch_port]
309
310     @property
311     def pcis(self):
312         return [self.src_device.pci, self.dst_device.pci]
313
314
315 class TrafficGeneratorFactory(object):
316
317     def __init__(self, config):
318         self.config = config
319
320     def get_tool(self):
321         return self.config.generator_config.tool
322
323     def get_generator_client(self):
324         tool = self.get_tool().lower()
325         if tool == 'trex':
326             from traffic_gen import trex
327             return trex.TRex(self.config)
328         elif tool == 'dummy':
329             from traffic_gen import dummy
330             return dummy.DummyTG(self.config)
331         else:
332             return None
333
334     def list_generator_profile(self):
335         return [profile.name for profile in self.config.traffic_generator.generator_profile]
336
337     def get_generator_config(self, generator_profile):
338         return RunningTrafficProfile(self.config, generator_profile)
339
340     def get_matching_profile(self, traffic_profile_name):
341         matching_profile = filter(lambda profile: profile.name == traffic_profile_name,
342                                   self.config.traffic_profile)
343
344         if len(matching_profile) > 1:
345             raise Exception('Multiple traffic profiles with the same name found.')
346         elif len(matching_profile) == 0:
347             raise Exception('No traffic profile found.')
348
349         return matching_profile[0]
350
351     def get_frame_sizes(self, traffic_profile):
352         matching_profile = self.get_matching_profile(traffic_profile)
353         return matching_profile.l2frame_size
354
355
356 class TrafficClient(object):
357
358     PORTS = [0, 1]
359
360     def __init__(self, config, notifier=None):
361         generator_factory = TrafficGeneratorFactory(config)
362         self.gen = generator_factory.get_generator_client()
363         self.tool = generator_factory.get_tool()
364         self.config = config
365         self.notifier = notifier
366         self.interval_collector = None
367         self.iteration_collector = None
368         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
369         if self.gen is None:
370             raise TrafficClientException('%s is not a supported traffic generator' % self.tool)
371
372         self.run_config = {
373             'l2frame_size': None,
374             'duration_sec': self.config.duration_sec,
375             'bidirectional': True,
376             'rates': None
377         }
378         self.current_total_rate = {'rate_percent': '10'}
379         if self.config.single_run:
380             self.current_total_rate = utils.parse_rate_str(self.config.rate)
381
382     def set_macs(self):
383         for mac, device in zip(self.gen.get_macs(), self.config.generator_config.devices):
384             device.set_mac(mac)
385
386     def start_traffic_generator(self):
387         self.gen.init()
388         self.gen.connect()
389
390     def setup(self):
391         self.gen.set_mode()
392         self.gen.config_interface()
393         self.gen.clear_stats()
394
395     def get_version(self):
396         return self.gen.get_version()
397
398     def ensure_end_to_end(self):
399         """
400         Ensure traffic generator receives packets it has transmitted.
401         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
402
403         At this point all VMs are in active state, but forwarding does not have to work.
404         Small amount of traffic is sent to every chain. Then total of sent and received packets
405         is compared. If ratio between received and transmitted packets is higher than (N-1)/N,
406         N being number of chains, traffic flows through every chain and real measurements can be
407         performed.
408
409         Example:
410             PVP chain (1 VM per chain)
411             N = 10 (number of chains)
412             threshold = (N-1)/N = 9/10 = 0.9 (acceptable ratio ensuring working conditions)
413             if total_received/total_sent > 0.9, traffic is flowing to more than 9 VMs meaning
414             all 10 VMs are in operational state.
415         """
416         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
417         rate_pps = {'rate_pps': str(self.config.service_chain_count * 100)}
418         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
419
420         # ensures enough traffic is coming back
421         threshold = (self.config.service_chain_count - 1) / float(self.config.service_chain_count)
422
423         for it in xrange(self.config.generic_retry_count):
424             self.gen.clear_stats()
425             self.gen.start_traffic()
426             LOG.info('Waiting for packets to be received back... ({} / {})'.format(it + 1,
427                      self.config.generic_retry_count))
428             time.sleep(self.config.generic_poll_sec)
429             self.gen.stop_traffic()
430             stats = self.gen.get_stats()
431
432             # compute total sent and received traffic on both ports
433             total_rx = 0
434             total_tx = 0
435             for port in self.PORTS:
436                 total_rx += float(stats[port]['rx'].get('total_pkts', 0))
437                 total_tx += float(stats[port]['tx'].get('total_pkts', 0))
438
439             # how much of traffic came back
440             ratio = total_rx / total_tx if total_tx else 0
441
442             if ratio > threshold:
443                 self.gen.clear_stats()
444                 self.gen.clear_streamblock()
445                 LOG.info('End-to-end connectivity ensured')
446                 return
447
448             time.sleep(self.config.generic_poll_sec)
449
450         raise TrafficClientException('End-to-end connectivity cannot be ensured')
451
452     def ensure_arp_successful(self):
453         if not self.gen.resolve_arp():
454             raise TrafficClientException('ARP cannot be resolved')
455
456     def set_traffic(self, frame_size, bidirectional):
457         self.run_config['bidirectional'] = bidirectional
458         self.run_config['l2frame_size'] = frame_size
459         self.run_config['rates'] = [self.get_per_direction_rate()]
460         if bidirectional:
461             self.run_config['rates'].append(self.get_per_direction_rate())
462         else:
463             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
464             if unidir_reverse_pps > 0:
465                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
466
467         self.gen.clear_streamblock()
468         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
469
470     def modify_load(self, load):
471         self.current_total_rate = {'rate_percent': str(load)}
472         rate_per_direction = self.get_per_direction_rate()
473
474         self.gen.modify_rate(rate_per_direction, False)
475         self.run_config['rates'][0] = rate_per_direction
476         if self.run_config['bidirectional']:
477             self.gen.modify_rate(rate_per_direction, True)
478             self.run_config['rates'][1] = rate_per_direction
479
480     def get_ndr_and_pdr(self):
481         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
482         targets = {}
483         if self.config.ndr_run:
484             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
485             targets['ndr'] = self.config.measurement.NDR
486         if self.config.pdr_run:
487             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
488             targets['pdr'] = self.config.measurement.PDR
489
490         self.run_config['start_time'] = time.time()
491         self.interval_collector = IntervalCollector(self.run_config['start_time'])
492         self.interval_collector.attach_notifier(self.notifier)
493         self.iteration_collector = IterationCollector(self.run_config['start_time'])
494         results = {}
495         self.__range_search(0.0, 200.0, targets, results)
496
497         results['iteration_stats'] = {
498             'ndr_pdr': self.iteration_collector.get()
499         }
500
501         if self.config.ndr_run:
502             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
503             results['ndr']['time_taken_sec'] = \
504                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
505             if self.config.pdr_run:
506                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
507                 results['pdr']['time_taken_sec'] = \
508                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
509         else:
510             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
511             results['pdr']['time_taken_sec'] = \
512                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
513         return results
514
515     def __get_dropped_rate(self, result):
516         dropped_pkts = result['rx']['dropped_pkts']
517         total_pkts = result['tx']['total_pkts']
518         if not total_pkts:
519             return float('inf')
520         else:
521             return float(dropped_pkts) / total_pkts * 100
522
523     def get_stats(self):
524         stats = self.gen.get_stats()
525         retDict = {'total_tx_rate': stats['total_tx_rate']}
526         for port in self.PORTS:
527             retDict[port] = {'tx': {}, 'rx': {}}
528
529         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
530         rx_keys = tx_keys + ['dropped_pkts']
531
532         for port in self.PORTS:
533             for key in tx_keys:
534                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
535             for key in rx_keys:
536                 try:
537                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
538                 except ValueError:
539                     retDict[port]['rx'][key] = 0
540             retDict[port]['rx']['avg_delay_usec'] = int(stats[port]['rx']['avg_delay_usec'])
541             retDict[port]['rx']['min_delay_usec'] = int(stats[port]['rx']['min_delay_usec'])
542             retDict[port]['rx']['max_delay_usec'] = int(stats[port]['rx']['max_delay_usec'])
543             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
544
545         ports = sorted(retDict.keys())
546         if self.run_config['bidirectional']:
547             retDict['overall'] = {'tx': {}, 'rx': {}}
548             for key in tx_keys:
549                 retDict['overall']['tx'][key] = \
550                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
551             for key in rx_keys:
552                 retDict['overall']['rx'][key] = \
553                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
554             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
555                           retDict[ports[1]]['rx']['total_pkts']]
556             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
557                           retDict[ports[1]]['rx']['avg_delay_usec']]
558             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
559                           retDict[ports[1]]['rx']['max_delay_usec']]
560             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
561                           retDict[ports[1]]['rx']['min_delay_usec']]
562             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
563             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
564             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
565             for key in ['pkt_bit_rate', 'pkt_rate']:
566                 for dirc in ['tx', 'rx']:
567                     retDict['overall'][dirc][key] /= 2.0
568         else:
569             retDict['overall'] = retDict[ports[0]]
570         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
571         return retDict
572
573     def __convert_rates(self, rate):
574         return utils.convert_rates(self.run_config['l2frame_size'],
575                                    rate,
576                                    self.config.generator_config.intf_speed)
577
578     def __ndr_pdr_found(self, tag, load):
579         rates = self.__convert_rates({'rate_percent': load})
580         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
581         last_stats = self.iteration_collector.peek()
582         self.interval_collector.add_ndr_pdr(tag, last_stats)
583
584     def __format_output_stats(self, stats):
585         for key in (self.PORTS + ['overall']):
586             interface = stats[key]
587             stats[key] = {
588                 'tx_pkts': interface['tx']['total_pkts'],
589                 'rx_pkts': interface['rx']['total_pkts'],
590                 'drop_percentage': interface['drop_rate_percent'],
591                 'drop_pct': interface['rx']['dropped_pkts'],
592                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
593                 'max_delay_usec': interface['rx']['max_delay_usec'],
594                 'min_delay_usec': interface['rx']['min_delay_usec'],
595             }
596
597         return stats
598
599     def __targets_found(self, rate, targets, results):
600         for tag, target in targets.iteritems():
601             LOG.info('Found {} ({}) load: {}'.format(tag, target, rate))
602             self.__ndr_pdr_found(tag, rate)
603             results[tag]['timestamp_sec'] = time.time()
604
605     def __range_search(self, left, right, targets, results):
606         '''Perform a binary search for a list of targets inside a [left..right] range or rate
607
608         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
609                 indicating the rate to send on each interface
610         right   the right side of the range to search as a % of line rate
611                 indicating the rate to send on each interface
612         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
613                 ('ndr', 'pdr')
614         results a dict to store results
615         '''
616         if len(targets) == 0:
617             return
618         LOG.info('Range search [{} .. {}] targets: {}'.format(left, right, targets))
619
620         # Terminate search when gap is less than load epsilon
621         if right - left < self.config.measurement.load_epsilon:
622             self.__targets_found(left, targets, results)
623             return
624
625         # Obtain the average drop rate in for middle load
626         middle = (left + right) / 2.0
627         stats, rates = self.__run_search_iteration(middle)
628
629         # Split target dicts based on the avg drop rate
630         left_targets = {}
631         right_targets = {}
632         for tag, target in targets.iteritems():
633             if stats['overall']['drop_rate_percent'] <= target:
634                 # record the best possible rate found for this target
635                 results[tag] = rates
636                 results[tag].update({
637                     'load_percent_per_direction': middle,
638                     'stats': self.__format_output_stats(dict(stats)),
639                     'timestamp_sec': None
640                 })
641                 right_targets[tag] = target
642             else:
643                 left_targets[tag] = target
644
645         # search lower half
646         self.__range_search(left, middle, left_targets, results)
647
648         # search upper half only if the upper rate does not exceed
649         # 100%, this only happens when the first search at 100%
650         # yields a DR that is < target DR
651         if middle >= 100:
652             self.__targets_found(100, right_targets, results)
653         else:
654             self.__range_search(middle, right, right_targets, results)
655
656     def __run_search_iteration(self, rate):
657         # set load
658         self.modify_load(rate)
659
660         # poll interval stats and collect them
661         for stats in self.run_traffic():
662             self.interval_collector.add(stats)
663             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
664             if time_elapsed_ratio >= 1:
665                 self.cancel_traffic()
666         self.interval_collector.reset()
667
668         # get stats from the run
669         stats = self.runner.client.get_stats()
670         current_traffic_config = self.get_traffic_config()
671         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
672                                         stats['total_tx_rate'])
673         if warning is not None:
674             stats['warning'] = warning
675
676         # save reliable stats from whole iteration
677         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
678         LOG.info('Average drop rate: {}'.format(stats['overall']['drop_rate_percent']))
679
680         return stats, current_traffic_config['direction-total']
681
682     @staticmethod
683     def log_stats(stats):
684         report = {
685             'datetime': str(datetime.now()),
686             'tx_packets': stats['overall']['tx']['total_pkts'],
687             'rx_packets': stats['overall']['rx']['total_pkts'],
688             'drop_packets': stats['overall']['rx']['dropped_pkts'],
689             'drop_rate_percent': stats['overall']['drop_rate_percent']
690         }
691         LOG.info('TX: %(tx_packets)d; '
692                  'RX: %(rx_packets)d; '
693                  'Dropped: %(drop_packets)d; '
694                  'Drop rate: %(drop_rate_percent).4f%%',
695                  report)
696
697     def run_traffic(self):
698         stats = self.runner.run()
699         while self.runner.is_running:
700             self.log_stats(stats)
701             yield stats
702             stats = self.runner.poll_stats()
703             if stats is None:
704                 return
705         self.log_stats(stats)
706         LOG.info('Drop rate: {}'.format(stats['overall']['drop_rate_percent']))
707         yield stats
708
709     def cancel_traffic(self):
710         self.runner.stop()
711
712     def get_interface(self, port_index):
713         port = self.gen.port_handle[port_index]
714         tx, rx = 0, 0
715         if not self.config.no_traffic:
716             stats = self.get_stats()
717             if port in stats:
718                 tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts'])
719         return Interface('traffic-generator', self.tool.lower(), tx, rx)
720
721     def get_traffic_config(self):
722         config = {}
723         load_total = 0.0
724         bps_total = 0.0
725         pps_total = 0.0
726         for idx, rate in enumerate(self.run_config['rates']):
727             key = 'direction-forward' if idx == 0 else 'direction-reverse'
728             config[key] = {
729                 'l2frame_size': self.run_config['l2frame_size'],
730                 'duration_sec': self.run_config['duration_sec']
731             }
732             config[key].update(rate)
733             config[key].update(self.__convert_rates(rate))
734             load_total += float(config[key]['rate_percent'])
735             bps_total += float(config[key]['rate_bps'])
736             pps_total += float(config[key]['rate_pps'])
737         config['direction-total'] = dict(config['direction-forward'])
738         config['direction-total'].update({
739             'rate_percent': load_total,
740             'rate_pps': pps_total,
741             'rate_bps': bps_total
742         })
743
744         return config
745
746     def get_run_config(self, results):
747         """Returns configuration which was used for the last run."""
748         r = {}
749         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
750             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
751             rx_rate = results["stats"][idx]["rx"]["total_pkts"] / self.config.duration_sec
752             r[key] = {
753                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
754                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
755                 "rx": self.__convert_rates({'rate_pps': rx_rate})
756             }
757
758         total = {}
759         for direction in ['orig', 'tx', 'rx']:
760             total[direction] = {}
761             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
762                 total[direction][unit] = sum(map(lambda x: float(x[direction][unit]), r.values()))
763
764         r['direction-total'] = total
765         return r
766
767     @staticmethod
768     def compare_tx_rates(required, actual):
769         threshold = 0.9
770         are_different = False
771         try:
772             if float(actual) / required < threshold:
773                 are_different = True
774         except ZeroDivisionError:
775             are_different = True
776
777         if are_different:
778             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
779                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
780                   "to achieve the requested TX rate.".format(r=required, a=actual)
781             LOG.info(msg)
782             return msg
783
784         return None
785
786     def get_per_direction_rate(self):
787         divisor = 2 if self.run_config['bidirectional'] else 1
788         if 'rate_percent' in self.current_total_rate:
789             # don't split rate if it's percentage
790             divisor = 1
791
792         return utils.divide_rate(self.current_total_rate, divisor)
793
794     def close(self):
795         try:
796             self.gen.stop_traffic()
797         except Exception:
798             pass
799         self.gen.clear_stats()
800         self.gen.cleanup()