Initial code drop from Cisco
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 from attrdict import AttrDict
16 import bitmath
17 from datetime import datetime
18 from log import LOG
19 from netaddr import IPNetwork
20 from network import Interface
21 import socket
22 from specs import ChainType
23 from stats_collector import IntervalCollector
24 from stats_collector import IterationCollector
25 import struct
26 import time
27 import traffic_gen.traffic_utils as utils
28
29
30 class TrafficClientException(Exception):
31     pass
32
33
34 class TrafficRunner(object):
35
36     def __init__(self, client, duration_sec, interval_sec=0):
37         self.client = client
38         self.start_time = None
39         self.duration_sec = duration_sec
40         self.interval_sec = interval_sec
41
42     def run(self):
43         LOG.info('Running traffic generator')
44         self.client.gen.clear_stats()
45         self.client.gen.start_traffic()
46         self.start_time = time.time()
47         return self.poll_stats()
48
49     def stop(self):
50         if self.is_running():
51             self.start_time = None
52             self.client.gen.stop_traffic()
53
54     def is_running(self):
55         return self.start_time is not None
56
57     def time_elapsed(self):
58         if self.is_running():
59             return time.time() - self.start_time
60         else:
61             return self.duration_sec
62
63     def poll_stats(self):
64         if not self.is_running():
65             return None
66         time_elapsed = self.time_elapsed()
67         if time_elapsed > self.duration_sec:
68             self.stop()
69             return None
70         time_left = self.duration_sec - time_elapsed
71         if self.interval_sec > 0.0:
72             if time_left <= self.interval_sec:
73                 time.sleep(time_left)
74                 self.stop()
75             else:
76                 time.sleep(self.interval_sec)
77         else:
78             time.sleep(self.duration_sec)
79             self.stop()
80         return self.client.get_stats()
81
82
83 class Device(object):
84
85     def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None,
86                  gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None,
87                  gateway_ip_addrs_step=None, chain_count=1, flow_count=1, vlan_tagging=False):
88         self.chain_count = chain_count
89         self.flow_count = flow_count
90         self.dst = None
91         self.port = port
92         self.switch_port = switch_port
93         self.vtep_vlan = vtep_vlan
94         self.vlan_tag = None
95         self.vlan_tagging = vlan_tagging
96         self.pci = pci
97         self.mac = None
98         self.vm_mac_list = None
99         subnet = IPNetwork(ip)
100         self.ip = subnet.ip.format()
101         self.ip_prefixlen = subnet.prefixlen
102         self.ip_addrs_step = ip_addrs_step
103         self.tg_gateway_ip_addrs_step = tg_gateway_ip_addrs_step
104         self.gateway_ip_addrs_step = gateway_ip_addrs_step
105         self.ip_list = self.expand_ip(self.ip, self.ip_addrs_step, self.flow_count)
106         self.gateway_ip = gateway_ip
107         self.gateway_ip_list = self.expand_ip(self.gateway_ip,
108                                               self.gateway_ip_addrs_step,
109                                               self.chain_count)
110         self.tg_gateway_ip = tg_gateway_ip
111         self.tg_gateway_ip_list = self.expand_ip(self.tg_gateway_ip,
112                                                  self.tg_gateway_ip_addrs_step,
113                                                  self.chain_count)
114
115     def set_mac(self, mac):
116         if mac is None:
117             raise TrafficClientException('Trying to set traffic generator MAC address as None')
118         self.mac = mac
119
120     def set_destination(self, dst):
121         self.dst = dst
122
123     def set_vm_mac_list(self, vm_mac_list):
124         self.vm_mac_list = map(str, vm_mac_list)
125
126     def set_vlan_tag(self, vlan_tag):
127         if self.vlan_tagging and vlan_tag is None:
128             raise TrafficClientException('Trying to set VLAN tag as None')
129         self.vlan_tag = vlan_tag
130
131     def get_stream_configs(self, service_chain):
132         configs = []
133         flow_idx = 0
134         for chain_idx in xrange(self.chain_count):
135             current_flow_count = (self.flow_count - flow_idx) / (self.chain_count - chain_idx)
136             max_idx = flow_idx + current_flow_count - 1
137             ip_src_count = self.ip_to_int(self.ip_list[max_idx]) - \
138                 self.ip_to_int(self.ip_list[flow_idx]) + 1
139             ip_dst_count = self.ip_to_int(self.dst.ip_list[max_idx]) - \
140                 self.ip_to_int(self.dst.ip_list[flow_idx]) + 1
141
142             configs.append({
143                 'count': current_flow_count,
144                 'mac_src': self.mac,
145                 'mac_dst': self.dst.mac if service_chain == ChainType.EXT
146                 else self.vm_mac_list[chain_idx],
147                 'ip_src_addr': self.ip_list[flow_idx],
148                 'ip_src_addr_max': self.ip_list[max_idx],
149                 'ip_src_count': ip_src_count,
150                 'ip_dst_addr': self.dst.ip_list[flow_idx],
151                 'ip_dst_addr_max': self.dst.ip_list[max_idx],
152                 'ip_dst_count': ip_dst_count,
153                 'ip_addrs_step': self.ip_addrs_step,
154                 'mac_discovery_gw': self.gateway_ip_list[chain_idx],
155                 'ip_src_tg_gw': self.tg_gateway_ip_list[chain_idx],
156                 'ip_dst_tg_gw': self.dst.tg_gateway_ip_list[chain_idx],
157                 'vlan_tag': self.vlan_tag if self.vlan_tagging else None
158             })
159
160             flow_idx += current_flow_count
161         return configs
162
163     @classmethod
164     def expand_ip(cls, ip, step_ip, count):
165         if step_ip == 'random':
166             # Repeatable Random will used in the stream src/dst IP pairs, but we still need
167             # to expand the IP based on the number of chains and flows configured. So we use
168             # "0.0.0.1" as the step to have the exact IP flow ranges for every chain.
169             step_ip = '0.0.0.1'
170
171         step_ip_in_int = cls.ip_to_int(step_ip)
172         subnet = IPNetwork(ip)
173         ip_list = []
174         for _ in xrange(count):
175             ip_list.append(subnet.ip.format())
176             subnet = subnet.next(step_ip_in_int)
177         return ip_list
178
179     @staticmethod
180     def mac_to_int(mac):
181         return int(mac.translate(None, ":.- "), 16)
182
183     @staticmethod
184     def int_to_mac(i):
185         mac = format(i, 'x').zfill(12)
186         blocks = [mac[x:x + 2] for x in xrange(0, len(mac), 2)]
187         return ':'.join(blocks)
188
189     @staticmethod
190     def ip_to_int(addr):
191         return struct.unpack("!I", socket.inet_aton(addr))[0]
192
193
194 class RunningTrafficProfile(object):
195     """Represents traffic configuration for currently running traffic profile."""
196
197     DEFAULT_IP_STEP = '0.0.0.1'
198     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
199
200     def __init__(self, config, generator_profile):
201         generator_config = self.__match_generator_profile(config.traffic_generator,
202                                                           generator_profile)
203         self.generator_config = generator_config
204         self.service_chain = config.service_chain
205         self.service_chain_count = config.service_chain_count
206         self.flow_count = config.flow_count
207         self.host_name = generator_config.host_name
208         self.name = generator_config.name
209         self.tool = generator_config.tool
210         self.cores = generator_config.get('cores', 1)
211         self.ip_addrs_step = generator_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
212         self.tg_gateway_ip_addrs_step = \
213             generator_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
214         self.gateway_ip_addrs_step = generator_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
215         self.gateway_ips = generator_config.gateway_ip_addrs
216         self.ip = generator_config.ip
217         self.intf_speed = bitmath.parse_string(generator_config.intf_speed.replace('ps', '')).bits
218         self.vlan_tagging = config.vlan_tagging
219         self.no_arp = config.no_arp
220         self.src_device = None
221         self.dst_device = None
222         self.vm_mac_list = None
223         self.__prep_interfaces(generator_config)
224
225     def to_json(self):
226         return dict(self.generator_config)
227
228     def set_vm_mac_list(self, vm_mac_list):
229         self.src_device.set_vm_mac_list(vm_mac_list[0])
230         self.dst_device.set_vm_mac_list(vm_mac_list[1])
231
232     @staticmethod
233     def __match_generator_profile(traffic_generator, generator_profile):
234         generator_config = AttrDict(traffic_generator)
235         generator_config.pop('default_profile')
236         generator_config.pop('generator_profile')
237         matching_profile = filter(lambda profile: profile.name == generator_profile,
238                                   traffic_generator.generator_profile)
239         if len(matching_profile) != 1:
240             raise Exception('Traffic generator profile not found: ' + generator_profile)
241
242         generator_config.update(matching_profile[0])
243
244         return generator_config
245
246     def __prep_interfaces(self, generator_config):
247         src_config = {
248             'chain_count': self.service_chain_count,
249             'flow_count': self.flow_count / 2,
250             'ip': generator_config.ip_addrs[0],
251             'ip_addrs_step': self.ip_addrs_step,
252             'gateway_ip': self.gateway_ips[0],
253             'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
254             'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[0],
255             'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
256             'vlan_tagging': self.vlan_tagging
257         }
258         dst_config = {
259             'chain_count': self.service_chain_count,
260             'flow_count': self.flow_count / 2,
261             'ip': generator_config.ip_addrs[1],
262             'ip_addrs_step': self.ip_addrs_step,
263             'gateway_ip': self.gateway_ips[1],
264             'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
265             'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[1],
266             'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
267             'vlan_tagging': self.vlan_tagging
268         }
269
270         self.src_device = Device(**dict(src_config, **generator_config.interfaces[0]))
271         self.dst_device = Device(**dict(dst_config, **generator_config.interfaces[1]))
272         self.src_device.set_destination(self.dst_device)
273         self.dst_device.set_destination(self.src_device)
274
275         if self.service_chain == ChainType.EXT and not self.no_arp \
276                 and not self.__are_unique(self.src_device.ip_list, self.dst_device.ip_list):
277             raise Exception('Computed IP addresses are not unique, choose different base. '
278                             'Start IPs: {start}. End IPs: {end}'
279                             .format(start=self.src_device.ip_list,
280                                     end=self.dst_device.ip_list))
281
282     def __are_unique(self, list1, list2):
283         return set(list1).isdisjoint(set(list2))
284
285     @property
286     def devices(self):
287         return [self.src_device, self.dst_device]
288
289     @property
290     def vlans(self):
291         return [self.src_device.vtep_vlan, self.dst_device.vtep_vlan]
292
293     @property
294     def ports(self):
295         return [self.src_device.port, self.dst_device.port]
296
297     @property
298     def switch_ports(self):
299         return [self.src_device.switch_port, self.dst_device.switch_port]
300
301     @property
302     def pcis(self):
303         return [self.src_device.pci, self.dst_device.pci]
304
305
306 class TrafficGeneratorFactory(object):
307
308     def __init__(self, config):
309         self.config = config
310
311     def get_tool(self):
312         return self.config.generator_config.tool
313
314     def get_generator_client(self):
315         tool = self.get_tool().lower()
316         if tool == 'trex':
317             from traffic_gen import trex
318             return trex.TRex(self.config)
319         elif tool == 'dummy':
320             from traffic_gen import dummy
321             return dummy.DummyTG(self.config)
322         else:
323             return None
324
325     def list_generator_profile(self):
326         return [profile.name for profile in self.config.traffic_generator.generator_profile]
327
328     def get_generator_config(self, generator_profile):
329         return RunningTrafficProfile(self.config, generator_profile)
330
331     def get_matching_profile(self, traffic_profile_name):
332         matching_profile = filter(lambda profile: profile.name == traffic_profile_name,
333                                   self.config.traffic_profile)
334
335         if len(matching_profile) > 1:
336             raise Exception('Multiple traffic profiles with the same name found.')
337         elif len(matching_profile) == 0:
338             raise Exception('No traffic profile found.')
339
340         return matching_profile[0]
341
342     def get_frame_sizes(self, traffic_profile):
343         matching_profile = self.get_matching_profile(traffic_profile)
344         return matching_profile.l2frame_size
345
346
347 class TrafficClient(object):
348
349     PORTS = [0, 1]
350
351     def __init__(self, config, notifier=None):
352         generator_factory = TrafficGeneratorFactory(config)
353         self.gen = generator_factory.get_generator_client()
354         self.tool = generator_factory.get_tool()
355         self.config = config
356         self.notifier = notifier
357         self.interval_collector = None
358         self.iteration_collector = None
359         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
360         if self.gen is None:
361             raise TrafficClientException('%s is not a supported traffic generator' % self.tool)
362
363         self.run_config = {
364             'l2frame_size': None,
365             'duration_sec': self.config.duration_sec,
366             'bidirectional': True,
367             'rates': None
368         }
369         self.current_total_rate = {'rate_percent': '10'}
370         if self.config.single_run:
371             self.current_total_rate = utils.parse_rate_str(self.config.rate)
372
373     def set_macs(self):
374         for mac, device in zip(self.gen.get_macs(), self.config.generator_config.devices):
375             device.set_mac(mac)
376
377     def start_traffic_generator(self):
378         self.gen.init()
379         self.gen.connect()
380
381     def setup(self):
382         self.gen.set_mode()
383         self.gen.config_interface()
384         self.gen.clear_stats()
385
386     def get_version(self):
387         return self.gen.get_version()
388
389     def ensure_end_to_end(self):
390         """
391         Ensure traffic generator receives packets it has transmitted.
392         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
393
394         At this point all VMs are in active state, but forwarding does not have to work.
395         Small amount of traffic is sent to every chain. Then total of sent and received packets
396         is compared. If ratio between received and transmitted packets is higher than (N-1)/N,
397         N being number of chains, traffic flows through every chain and real measurements can be
398         performed.
399
400         Example:
401             PVP chain (1 VM per chain)
402             N = 10 (number of chains)
403             threshold = (N-1)/N = 9/10 = 0.9 (acceptable ratio ensuring working conditions)
404             if total_received/total_sent > 0.9, traffic is flowing to more than 9 VMs meaning
405             all 10 VMs are in operational state.
406         """
407         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
408         rate_pps = {'rate_pps': str(self.config.service_chain_count * 100)}
409         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
410
411         # ensures enough traffic is coming back
412         threshold = (self.config.service_chain_count - 1) / float(self.config.service_chain_count)
413
414         for it in xrange(self.config.generic_retry_count):
415             self.gen.clear_stats()
416             self.gen.start_traffic()
417             LOG.info('Waiting for packets to be received back... ({} / {})'.format(it + 1,
418                      self.config.generic_retry_count))
419             time.sleep(self.config.generic_poll_sec)
420             self.gen.stop_traffic()
421             stats = self.gen.get_stats()
422
423             # compute total sent and received traffic on both ports
424             total_rx = 0
425             total_tx = 0
426             for port in self.PORTS:
427                 total_rx += float(stats[port]['rx'].get('total_pkts', 0))
428                 total_tx += float(stats[port]['tx'].get('total_pkts', 0))
429
430             # how much of traffic came back
431             ratio = total_rx / total_tx if total_tx else 0
432
433             if ratio > threshold:
434                 self.gen.clear_stats()
435                 self.gen.clear_streamblock()
436                 LOG.info('End-to-end connectivity ensured')
437                 return
438
439             time.sleep(self.config.generic_poll_sec)
440
441         raise TrafficClientException('End-to-end connectivity cannot be ensured')
442
443     def ensure_arp_successful(self):
444         if not self.gen.resolve_arp():
445             raise TrafficClientException('ARP cannot be resolved')
446
447     def set_traffic(self, frame_size, bidirectional):
448         self.run_config['bidirectional'] = bidirectional
449         self.run_config['l2frame_size'] = frame_size
450         self.run_config['rates'] = [self.get_per_direction_rate()]
451         if bidirectional:
452             self.run_config['rates'].append(self.get_per_direction_rate())
453         else:
454             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
455             if unidir_reverse_pps > 0:
456                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
457
458         self.gen.clear_streamblock()
459         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
460
461     def modify_load(self, load):
462         self.current_total_rate = {'rate_percent': str(load)}
463         rate_per_direction = self.get_per_direction_rate()
464
465         self.gen.modify_rate(rate_per_direction, False)
466         self.run_config['rates'][0] = rate_per_direction
467         if self.run_config['bidirectional']:
468             self.gen.modify_rate(rate_per_direction, True)
469             self.run_config['rates'][1] = rate_per_direction
470
471     def get_ndr_and_pdr(self):
472         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
473         targets = {}
474         if self.config.ndr_run:
475             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
476             targets['ndr'] = self.config.measurement.NDR
477         if self.config.pdr_run:
478             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
479             targets['pdr'] = self.config.measurement.PDR
480
481         self.run_config['start_time'] = time.time()
482         self.interval_collector = IntervalCollector(self.run_config['start_time'])
483         self.interval_collector.attach_notifier(self.notifier)
484         self.iteration_collector = IterationCollector(self.run_config['start_time'])
485         results = {}
486         self.__range_search(0.0, 200.0, targets, results)
487
488         results['iteration_stats'] = {
489             'ndr_pdr': self.iteration_collector.get()
490         }
491
492         if self.config.ndr_run:
493             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
494             results['ndr']['time_taken_sec'] = \
495                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
496             if self.config.pdr_run:
497                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
498                 results['pdr']['time_taken_sec'] = \
499                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
500         else:
501             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
502             results['pdr']['time_taken_sec'] = \
503                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
504         return results
505
506     def __get_dropped_rate(self, result):
507         dropped_pkts = result['rx']['dropped_pkts']
508         total_pkts = result['tx']['total_pkts']
509         if not total_pkts:
510             return float('inf')
511         else:
512             return float(dropped_pkts) / total_pkts * 100
513
514     def get_stats(self):
515         stats = self.gen.get_stats()
516         retDict = {'total_tx_rate': stats['total_tx_rate']}
517         for port in self.PORTS:
518             retDict[port] = {'tx': {}, 'rx': {}}
519
520         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
521         rx_keys = tx_keys + ['dropped_pkts']
522
523         for port in self.PORTS:
524             for key in tx_keys:
525                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
526             for key in rx_keys:
527                 try:
528                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
529                 except ValueError:
530                     retDict[port]['rx'][key] = 0
531             retDict[port]['rx']['avg_delay_usec'] = float(stats[port]['rx']['avg_delay_usec'])
532             retDict[port]['rx']['min_delay_usec'] = float(stats[port]['rx']['min_delay_usec'])
533             retDict[port]['rx']['max_delay_usec'] = float(stats[port]['rx']['max_delay_usec'])
534             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
535
536         ports = sorted(retDict.keys())
537         if self.run_config['bidirectional']:
538             retDict['overall'] = {'tx': {}, 'rx': {}}
539             for key in tx_keys:
540                 retDict['overall']['tx'][key] = \
541                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
542             for key in rx_keys:
543                 retDict['overall']['rx'][key] = \
544                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
545             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
546                           retDict[ports[1]]['rx']['total_pkts']]
547             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
548                           retDict[ports[1]]['rx']['avg_delay_usec']]
549             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
550                           retDict[ports[1]]['rx']['max_delay_usec']]
551             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
552                           retDict[ports[1]]['rx']['min_delay_usec']]
553             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
554             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
555             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
556             for key in ['pkt_bit_rate', 'pkt_rate']:
557                 for dirc in ['tx', 'rx']:
558                     retDict['overall'][dirc][key] /= 2.0
559         else:
560             retDict['overall'] = retDict[ports[0]]
561         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
562         return retDict
563
564     def __convert_rates(self, rate):
565         return utils.convert_rates(self.run_config['l2frame_size'],
566                                    rate,
567                                    self.config.generator_config.intf_speed)
568
569     def __ndr_pdr_found(self, tag, load):
570         rates = self.__convert_rates({'rate_percent': load})
571         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
572         last_stats = self.iteration_collector.peek()
573         self.interval_collector.add_ndr_pdr(tag, last_stats)
574
575     def __format_output_stats(self, stats):
576         for key in (self.PORTS + ['overall']):
577             interface = stats[key]
578             stats[key] = {
579                 'tx_pkts': interface['tx']['total_pkts'],
580                 'rx_pkts': interface['rx']['total_pkts'],
581                 'drop_percentage': interface['drop_rate_percent'],
582                 'drop_pct': interface['rx']['dropped_pkts'],
583                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
584                 'max_delay_usec': interface['rx']['max_delay_usec'],
585                 'min_delay_usec': interface['rx']['min_delay_usec'],
586             }
587
588         return stats
589
590     def __targets_found(self, rate, targets, results):
591         for tag, target in targets.iteritems():
592             LOG.info('Found {} ({}) load: {}'.format(tag, target, rate))
593             self.__ndr_pdr_found(tag, rate)
594             results[tag]['timestamp_sec'] = time.time()
595
596     def __range_search(self, left, right, targets, results):
597         '''Perform a binary search for a list of targets inside a [left..right] range or rate
598
599         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
600                 indicating the rate to send on each interface
601         right   the right side of the range to search as a % of line rate
602                 indicating the rate to send on each interface
603         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag" ('ndr', 'pdr')
604         results a dict to store results
605         '''
606         if len(targets) == 0:
607             return
608         LOG.info('Range search [{} .. {}] targets: {}'.format(left, right, targets))
609
610         # Terminate search when gap is less than load epsilon
611         if right - left < self.config.measurement.load_epsilon:
612             self.__targets_found(left, targets, results)
613             return
614
615         # Obtain the average drop rate in for middle load
616         middle = (left + right) / 2.0
617         stats, rates = self.__run_search_iteration(middle)
618
619         # Split target dicts based on the avg drop rate
620         left_targets = {}
621         right_targets = {}
622         for tag, target in targets.iteritems():
623             if stats['overall']['drop_rate_percent'] <= target:
624                 # record the best possible rate found for this target
625                 results[tag] = rates
626                 results[tag].update({
627                     'load_percent_per_direction': middle,
628                     'stats': self.__format_output_stats(dict(stats)),
629                     'timestamp_sec': None
630                 })
631                 right_targets[tag] = target
632             else:
633                 left_targets[tag] = target
634
635         # search lower half
636         self.__range_search(left, middle, left_targets, results)
637
638         # search upper half only if the upper rate does not exceed
639         # 100%, this only happens when the first search at 100%
640         # yields a DR that is < target DR
641         if middle >= 100:
642             self.__targets_found(100, right_targets, results)
643         else:
644             self.__range_search(middle, right, right_targets, results)
645
646     def __run_search_iteration(self, rate):
647         # set load
648         self.modify_load(rate)
649
650         # poll interval stats and collect them
651         for stats in self.run_traffic():
652             self.interval_collector.add(stats)
653             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
654             if time_elapsed_ratio >= 1:
655                 self.cancel_traffic()
656         self.interval_collector.reset()
657
658         # get stats from the run
659         stats = self.runner.client.get_stats()
660         current_traffic_config = self.get_traffic_config()
661         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
662                                         stats['total_tx_rate'])
663         if warning is not None:
664             stats['warning'] = warning
665
666         # save reliable stats from whole iteration
667         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
668         LOG.info('Average drop rate: {}'.format(stats['overall']['drop_rate_percent']))
669
670         return stats, current_traffic_config['direction-total']
671
672     @staticmethod
673     def log_stats(stats):
674         report = {
675             'datetime': str(datetime.now()),
676             'tx_packets': stats['overall']['tx']['total_pkts'],
677             'rx_packets': stats['overall']['rx']['total_pkts'],
678             'drop_packets': stats['overall']['rx']['dropped_pkts'],
679             'drop_rate_percent': stats['overall']['drop_rate_percent']
680         }
681         LOG.info('TX: %(tx_packets)d; '
682                  'RX: %(rx_packets)d; '
683                  'Dropped: %(drop_packets)d; '
684                  'Drop rate: %(drop_rate_percent).4f%%',
685                  report)
686
687     def run_traffic(self):
688         stats = self.runner.run()
689         while self.runner.is_running:
690             self.log_stats(stats)
691             yield stats
692             stats = self.runner.poll_stats()
693             if stats is None:
694                 return
695         self.log_stats(stats)
696         LOG.info('Drop rate: {}'.format(stats['overall']['drop_rate_percent']))
697         yield stats
698
699     def cancel_traffic(self):
700         self.runner.stop()
701
702     def get_interface(self, port_index):
703         port = self.gen.port_handle[port_index]
704         tx, rx = 0, 0
705         if not self.config.no_traffic:
706             stats = self.get_stats()
707             if port in stats:
708                 tx, rx = int(stats[port]['tx']['total_pkts']), int(stats[port]['rx']['total_pkts'])
709         return Interface('traffic-generator', self.tool.lower(), tx, rx)
710
711     def get_traffic_config(self):
712         config = {}
713         load_total = 0.0
714         bps_total = 0.0
715         pps_total = 0.0
716         for idx, rate in enumerate(self.run_config['rates']):
717             key = 'direction-forward' if idx == 0 else 'direction-reverse'
718             config[key] = {
719                 'l2frame_size': self.run_config['l2frame_size'],
720                 'duration_sec': self.run_config['duration_sec']
721             }
722             config[key].update(rate)
723             config[key].update(self.__convert_rates(rate))
724             load_total += float(config[key]['rate_percent'])
725             bps_total += float(config[key]['rate_bps'])
726             pps_total += float(config[key]['rate_pps'])
727         config['direction-total'] = dict(config['direction-forward'])
728         config['direction-total'].update({
729             'rate_percent': load_total,
730             'rate_pps': pps_total,
731             'rate_bps': bps_total
732         })
733
734         return config
735
736     def get_run_config(self, results):
737         """Returns configuration which was used for the last run."""
738         r = {}
739         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
740             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
741             rx_rate = results["stats"][idx]["rx"]["total_pkts"] / self.config.duration_sec
742             r[key] = {
743                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
744                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
745                 "rx": self.__convert_rates({'rate_pps': rx_rate})
746             }
747
748         total = {}
749         for direction in ['orig', 'tx', 'rx']:
750             total[direction] = {}
751             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
752                 total[direction][unit] = sum(map(lambda x: float(x[direction][unit]), r.values()))
753
754         r['direction-total'] = total
755         return r
756
757     @staticmethod
758     def compare_tx_rates(required, actual):
759         threshold = 0.9
760         are_different = False
761         try:
762             if float(actual) / required < threshold:
763                 are_different = True
764         except ZeroDivisionError:
765             are_different = True
766
767         if are_different:
768             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
769                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
770                   "to achieve the requested TX rate.".format(r=required, a=actual)
771             LOG.info(msg)
772             return msg
773
774         return None
775
776     def get_per_direction_rate(self):
777         divisor = 2 if self.run_config['bidirectional'] else 1
778         if 'rate_percent' in self.current_total_rate:
779             # don't split rate if it's percentage
780             divisor = 1
781
782         return utils.divide_rate(self.current_total_rate, divisor)
783
784     def close(self):
785         try:
786             self.gen.stop_traffic()
787         except Exception:
788             pass
789         self.gen.clear_stats()
790         self.gen.cleanup()