NFVBENCH-118 VxLAN fixed rate: Trex far end port Rx counters are incorrect
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex_stl_lib.api import STLError
27 # pylint: enable=import-error
28
29 from log import LOG
30 from packet_stats import InterfaceStats
31 from packet_stats import PacketPathStats
32 from stats_collector import IntervalCollector
33 from stats_collector import IterationCollector
34 import traffic_gen.traffic_utils as utils
35 from utils import cast_integer
36
37
38 class TrafficClientException(Exception):
39     """Generic traffic client exception."""
40
41     pass
42
43
44 class TrafficRunner(object):
45     """Serialize various steps required to run traffic."""
46
47     def __init__(self, client, duration_sec, interval_sec=0):
48         """Create a traffic runner."""
49         self.client = client
50         self.start_time = None
51         self.duration_sec = duration_sec
52         self.interval_sec = interval_sec
53
54     def run(self):
55         """Clear stats and instruct the traffic generator to start generating traffic."""
56         if self.is_running():
57             return None
58         LOG.info('Running traffic generator')
59         self.client.gen.clear_stats()
60         self.client.gen.start_traffic()
61         self.start_time = time.time()
62         return self.poll_stats()
63
64     def stop(self):
65         """Stop the current run and instruct the traffic generator to stop traffic."""
66         if self.is_running():
67             self.start_time = None
68             self.client.gen.stop_traffic()
69
70     def is_running(self):
71         """Check if a run is still pending."""
72         return self.start_time is not None
73
74     def time_elapsed(self):
75         """Return time elapsed since start of run."""
76         if self.is_running():
77             return time.time() - self.start_time
78         return self.duration_sec
79
80     def poll_stats(self):
81         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
82
83         return: latest stats or None if traffic is stopped
84         """
85         if not self.is_running():
86             return None
87         if self.client.skip_sleep():
88             self.stop()
89             return self.client.get_stats()
90         time_elapsed = self.time_elapsed()
91         if time_elapsed > self.duration_sec:
92             self.stop()
93             return None
94         time_left = self.duration_sec - time_elapsed
95         if self.interval_sec > 0.0:
96             if time_left <= self.interval_sec:
97                 time.sleep(time_left)
98                 self.stop()
99             else:
100                 time.sleep(self.interval_sec)
101         else:
102             time.sleep(self.duration_sec)
103             self.stop()
104         return self.client.get_stats()
105
106
107 class IpBlock(object):
108     """Manage a block of IP addresses."""
109
110     def __init__(self, base_ip, step_ip, count_ip):
111         """Create an IP block."""
112         self.base_ip_int = Device.ip_to_int(base_ip)
113         self.step = Device.ip_to_int(step_ip)
114         self.max_available = count_ip
115         self.next_free = 0
116
117     def get_ip(self, index=0):
118         """Return the IP address at given index."""
119         if index < 0 or index >= self.max_available:
120             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
121         return Device.int_to_ip(self.base_ip_int + index * self.step)
122
123     def reserve_ip_range(self, count):
124         """Reserve a range of count consecutive IP addresses spaced by step."""
125         if self.next_free + count > self.max_available:
126             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
127                              (self.next_free,
128                               self.max_available,
129                               count))
130         first_ip = self.get_ip(self.next_free)
131         last_ip = self.get_ip(self.next_free + count - 1)
132         self.next_free += count
133         return (first_ip, last_ip)
134
135     def reset_reservation(self):
136         """Reset all reservations and restart with a completely unused IP block."""
137         self.next_free = 0
138
139
140 class Device(object):
141     """Represent a port device and all information associated to it.
142
143     In the curent version we only support 2 port devices for the traffic generator
144     identified as port 0 or port 1.
145     """
146
147     def __init__(self, port, generator_config):
148         """Create a new device for a given port."""
149         self.generator_config = generator_config
150         self.chain_count = generator_config.service_chain_count
151         self.flow_count = generator_config.flow_count / 2
152         self.port = port
153         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
154         self.vtep_vlan = None
155         self.vtep_src_mac = None
156         self.vxlan = False
157         self.pci = generator_config.interfaces[port].pci
158         self.mac = None
159         self.dest_macs = None
160         self.vtep_dst_mac = None
161         self.vtep_dst_ip = None
162         if generator_config.vteps is None:
163             self.vtep_src_ip = None
164         else:
165             self.vtep_src_ip = generator_config.vteps[port]
166         self.vnis = None
167         self.vlans = None
168         self.ip_addrs = generator_config.ip_addrs[port]
169         subnet = IPNetwork(self.ip_addrs)
170         self.ip = subnet.ip.format()
171         self.ip_addrs_step = generator_config.ip_addrs_step
172         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
173         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
174                                    generator_config.gateway_ip_addrs_step,
175                                    self.chain_count)
176         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
177         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
178                                       generator_config.tg_gateway_ip_addrs_step,
179                                       self.chain_count)
180         self.udp_src_port = generator_config.udp_src_port
181         self.udp_dst_port = generator_config.udp_dst_port
182
183     def set_mac(self, mac):
184         """Set the local MAC for this port device."""
185         if mac is None:
186             raise TrafficClientException('Trying to set traffic generator MAC address as None')
187         self.mac = mac
188
189     def get_peer_device(self):
190         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
191         return self.generator_config.devices[1 - self.port]
192
193     def set_vtep_dst_mac(self, dest_macs):
194         """Set the list of dest MACs indexed by the chain id.
195
196         This is only called in 2 cases:
197         - VM macs discovered using openstack API
198         - dest MACs provisioned in config file
199         """
200         self.vtep_dst_mac = map(str, dest_macs)
201
202     def set_dest_macs(self, dest_macs):
203         """Set the list of dest MACs indexed by the chain id.
204
205         This is only called in 2 cases:
206         - VM macs discovered using openstack API
207         - dest MACs provisioned in config file
208         """
209         self.dest_macs = map(str, dest_macs)
210
211     def get_dest_macs(self):
212         """Get the list of dest macs for this device.
213
214         If set_dest_macs was never called, assumes l2-loopback and return
215         a list of peer mac (as many as chains but normally only 1 chain)
216         """
217         if self.dest_macs:
218             return self.dest_macs
219         # assume this is l2-loopback
220         return [self.get_peer_device().mac] * self.chain_count
221
222     def set_vlans(self, vlans):
223         """Set the list of vlans to use indexed by the chain id."""
224         self.vlans = vlans
225         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
226
227     def set_vtep_vlan(self, vlan):
228         """Set the vtep vlan to use indexed by specific port."""
229         self.vtep_vlan = vlan
230         self.vxlan = True
231         self.vlan_tagging = None
232         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
233
234     def set_vxlan_endpoints(self, src_ip, dst_ip):
235         self.vtep_dst_ip = dst_ip
236         self.vtep_src_ip = src_ip
237         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
238                  self.vtep_src_ip, self.vtep_dst_ip)
239
240     def set_vxlans(self, vnis):
241         self.vnis = vnis
242         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
243
244     def get_gw_ip(self, chain_index):
245         """Retrieve the IP address assigned for the gateway of a given chain."""
246         return self.gw_ip_block.get_ip(chain_index)
247
248     def get_stream_configs(self):
249         """Get the stream config for a given chain on this device.
250
251         Called by the traffic generator driver to program the traffic generator properly
252         before generating traffic
253         """
254         configs = []
255         # exact flow count for each chain is calculated as follows:
256         # - all chains except the first will have the same flow count
257         #   calculated as (total_flows + chain_count - 1) / chain_count
258         # - the first chain will have the remainder
259         # example 11 flows and 3 chains => 3, 4, 4
260         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
261         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
262         peer = self.get_peer_device()
263         self.ip_block.reset_reservation()
264         peer.ip_block.reset_reservation()
265         dest_macs = self.get_dest_macs()
266
267         for chain_idx in xrange(self.chain_count):
268             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
269             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
270
271             configs.append({
272                 'count': cur_chain_flow_count,
273                 'mac_src': self.mac,
274                 'mac_dst': dest_macs[chain_idx],
275                 'ip_src_addr': src_ip_first,
276                 'ip_src_addr_max': src_ip_last,
277                 'ip_src_count': cur_chain_flow_count,
278                 'ip_dst_addr': dst_ip_first,
279                 'ip_dst_addr_max': dst_ip_last,
280                 'ip_dst_count': cur_chain_flow_count,
281                 'ip_addrs_step': self.ip_addrs_step,
282                 'udp_src_port': self.udp_src_port,
283                 'udp_dst_port': self.udp_dst_port,
284                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
285                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
286                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
287                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
288                 'vxlan': self.vxlan,
289                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
290                 'vtep_src_mac': self.mac if self.vxlan is True else None,
291                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
292                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
293                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
294                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
295             })
296             # after first chain, fall back to the flow count for all other chains
297             cur_chain_flow_count = flows_per_chain
298         return configs
299
300     @staticmethod
301     def ip_to_int(addr):
302         """Convert an IP address from string to numeric."""
303         return struct.unpack("!I", socket.inet_aton(addr))[0]
304
305     @staticmethod
306     def int_to_ip(nvalue):
307         """Convert an IP address from numeric to string."""
308         return socket.inet_ntoa(struct.pack("!I", nvalue))
309
310
311 class GeneratorConfig(object):
312     """Represents traffic configuration for currently running traffic profile."""
313
314     DEFAULT_IP_STEP = '0.0.0.1'
315     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
316
317     def __init__(self, config):
318         """Create a generator config."""
319         self.config = config
320         # name of the generator profile (normally trex or dummy)
321         # pick the default one if not specified explicitly from cli options
322         if not config.generator_profile:
323             config.generator_profile = config.traffic_generator.default_profile
324         # pick up the profile dict based on the name
325         gen_config = self.__match_generator_profile(config.traffic_generator,
326                                                     config.generator_profile)
327         self.gen_config = gen_config
328         # copy over fields from the dict
329         self.tool = gen_config.tool
330         self.ip = gen_config.ip
331         # overrides on config.cores and config.mbuf_factor
332         if config.cores:
333             self.cores = config.cores
334         else:
335             self.cores = gen_config.get('cores', 1)
336         self.mbuf_factor = config.mbuf_factor
337         if gen_config.intf_speed:
338             # interface speed is overriden from config
339             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
340         else:
341             # interface speed is discovered/provided by the traffic generator
342             self.intf_speed = 0
343         self.software_mode = gen_config.get('software_mode', False)
344         self.interfaces = gen_config.interfaces
345         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
346             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
347
348         self.service_chain = config.service_chain
349         self.service_chain_count = config.service_chain_count
350         self.flow_count = config.flow_count
351         self.host_name = gen_config.host_name
352
353         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
354         self.ip_addrs = gen_config.ip_addrs
355         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
356         self.tg_gateway_ip_addrs_step = \
357             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
358         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
359         self.gateway_ips = gen_config.gateway_ip_addrs
360         self.udp_src_port = gen_config.udp_src_port
361         self.udp_dst_port = gen_config.udp_dst_port
362         self.vteps = gen_config.get('vteps')
363         self.devices = [Device(port, self) for port in [0, 1]]
364         # This should normally always be [0, 1]
365         self.ports = [device.port for device in self.devices]
366
367         # check that pci is not empty
368         if not gen_config.interfaces[0].get('pci', None) or \
369            not gen_config.interfaces[1].get('pci', None):
370             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
371
372         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
373         self.vlan_tagging = config.vlan_tagging
374
375         # needed for result/summarizer
376         config['tg-name'] = gen_config.name
377         config['tg-tool'] = self.tool
378
379     def to_json(self):
380         """Get json form to display the content into the overall result dict."""
381         return dict(self.gen_config)
382
383     def set_dest_macs(self, port_index, dest_macs):
384         """Set the list of dest MACs indexed by the chain id on given port.
385
386         port_index: the port for which dest macs must be set
387         dest_macs: a list of dest MACs indexed by chain id
388         """
389         if len(dest_macs) != self.config.service_chain_count:
390             raise TrafficClientException('Dest MAC list %s must have %d entries' %
391                                          (dest_macs, self.config.service_chain_count))
392         self.devices[port_index].set_dest_macs(dest_macs)
393         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
394
395     def set_vtep_dest_macs(self, port_index, dest_macs):
396         """Set the list of dest MACs indexed by the chain id on given port.
397
398         port_index: the port for which dest macs must be set
399         dest_macs: a list of dest MACs indexed by chain id
400         """
401         if len(dest_macs) != self.config.service_chain_count:
402             raise TrafficClientException('Dest MAC list %s must have %d entries' %
403                                          (dest_macs, self.config.service_chain_count))
404         self.devices[port_index].set_vtep_dst_mac(dest_macs)
405         LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
406
407     def get_dest_macs(self):
408         """Return the list of dest macs indexed by port."""
409         return [dev.get_dest_macs() for dev in self.devices]
410
411     def set_vlans(self, port_index, vlans):
412         """Set the list of vlans to use indexed by the chain id on given port.
413
414         port_index: the port for which VLANs must be set
415         vlans: a  list of vlan lists indexed by chain id
416         """
417         if len(vlans) != self.config.service_chain_count:
418             raise TrafficClientException('VLAN list %s must have %d entries' %
419                                          (vlans, self.config.service_chain_count))
420         self.devices[port_index].set_vlans(vlans)
421
422     def set_vxlans(self, port_index, vxlans):
423         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
424
425         port_index: the port for which VXLANs must be set
426         VXLANs: a  list of VNIs lists indexed by chain id
427         """
428         if len(vxlans) != self.config.service_chain_count:
429             raise TrafficClientException('VXLAN list %s must have %d entries' %
430                                          (vxlans, self.config.service_chain_count))
431         self.devices[port_index].set_vxlans(vxlans)
432
433     def set_vtep_vlan(self, port_index, vlan):
434         """Set the vtep vlan to use indexed by the chain id on given port.
435         port_index: the port for which VLAN must be set
436         """
437         self.devices[port_index].set_vtep_vlan(vlan)
438
439     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
440         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
441
442     @staticmethod
443     def __match_generator_profile(traffic_generator, generator_profile):
444         gen_config = AttrDict(traffic_generator)
445         gen_config.pop('default_profile')
446         gen_config.pop('generator_profile')
447         matching_profile = [profile for profile in traffic_generator.generator_profile if
448                             profile.name == generator_profile]
449         if len(matching_profile) != 1:
450             raise Exception('Traffic generator profile not found: ' + generator_profile)
451
452         gen_config.update(matching_profile[0])
453         return gen_config
454
455
456 class TrafficClient(object):
457     """Traffic generator client with NDR/PDR binary seearch."""
458
459     PORTS = [0, 1]
460
461     def __init__(self, config, notifier=None):
462         """Create a new TrafficClient instance.
463
464         config: nfvbench config
465         notifier: notifier (optional)
466
467         A new instance is created everytime the nfvbench config may have changed.
468         """
469         self.config = config
470         self.generator_config = GeneratorConfig(config)
471         self.tool = self.generator_config.tool
472         self.gen = self._get_generator()
473         self.notifier = notifier
474         self.interval_collector = None
475         self.iteration_collector = None
476         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
477         self.config.frame_sizes = self._get_frame_sizes()
478         self.run_config = {
479             'l2frame_size': None,
480             'duration_sec': self.config.duration_sec,
481             'bidirectional': True,
482             'rates': []  # to avoid unsbuscriptable-obj warning
483         }
484         self.current_total_rate = {'rate_percent': '10'}
485         if self.config.single_run:
486             self.current_total_rate = utils.parse_rate_str(self.config.rate)
487         self.ifstats = None
488         # Speed is either discovered when connecting to TG or set from config
489         # This variable is 0 if not yet discovered from TG or must be the speed of
490         # each interface in bits per second
491         self.intf_speed = self.generator_config.intf_speed
492
493     def _get_generator(self):
494         tool = self.tool.lower()
495         if tool == 'trex':
496             from traffic_gen import trex
497             return trex.TRex(self)
498         if tool == 'dummy':
499             from traffic_gen import dummy
500             return dummy.DummyTG(self)
501         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
502
503     def skip_sleep(self):
504         """Skip all sleeps when doing unit testing with dummy TG.
505
506         Must be overriden using mock.patch
507         """
508         return False
509
510     def _get_frame_sizes(self):
511         traffic_profile_name = self.config.traffic.profile
512         matching_profiles = [profile for profile in self.config.traffic_profile if
513                              profile.name == traffic_profile_name]
514         if len(matching_profiles) > 1:
515             raise TrafficClientException('Multiple traffic profiles with name: ' +
516                                          traffic_profile_name)
517         elif not matching_profiles:
518             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
519         return matching_profiles[0].l2frame_size
520
521     def start_traffic_generator(self):
522         """Start the traffic generator process (traffic not started yet)."""
523         self.gen.connect()
524         # pick up the interface speed if it is not set from config
525         intf_speeds = self.gen.get_port_speed_gbps()
526         # convert Gbps unit into bps
527         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
528         if self.intf_speed:
529             # interface speed is overriden from config
530             if self.intf_speed != tg_if_speed:
531                 # Warn the user if the speed in the config is different
532                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
533                             intf_speeds[0])
534         else:
535             # interface speed not provisioned by config
536             self.intf_speed = tg_if_speed
537             # also update the speed in the tg config
538             self.generator_config.intf_speed = tg_if_speed
539
540         # Save the traffic generator local MAC
541         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
542             device.set_mac(mac)
543
544     def setup(self):
545         """Set up the traffic client."""
546         self.gen.clear_stats()
547
548     def get_version(self):
549         """Get the traffic generator version."""
550         return self.gen.get_version()
551
552     def ensure_end_to_end(self):
553         """Ensure traffic generator receives packets it has transmitted.
554
555         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
556
557         VMs that are started and in active state may not pass traffic yet. It is imperative to make
558         sure that all VMs are passing traffic in both directions before starting any benchmarking.
559         To verify this, we need to send at a low frequency bi-directional packets and make sure
560         that we receive all packets back from all VMs. The number of flows is equal to 2 times
561         the number of chains (1 per direction) and we need to make sure we receive packets coming
562         from exactly 2 x chain count different source MAC addresses.
563
564         Example:
565             PVP chain (1 VM per chain)
566             N = 10 (number of chains)
567             Flow count = 20 (number of flows)
568             If the number of unique source MAC addresses from received packets is 20 then
569             all 10 VMs 10 VMs are in operational state.
570         """
571         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
572         # send 2pps on each chain and each direction
573         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
574         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
575
576         # ensures enough traffic is coming back
577         retry_count = (self.config.check_traffic_time_sec +
578                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
579
580         # we expect to see packets coming from 2 unique MAC per chain
581         # because there can be flooding in the case of shared net
582         # we must verify that packets from the right VMs are received
583         # and not just count unique src MAC
584         # create a dict of (port, chain) tuples indexed by dest mac
585         mac_map = {}
586         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
587             for chain, mac in enumerate(dest_macs):
588                 mac_map[mac] = (port, chain)
589         unique_src_mac_count = len(mac_map)
590         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
591             get_mac_id = lambda packet: packet['binary'][60:66]
592         elif self.config.vxlan:
593             get_mac_id = lambda packet: packet['binary'][56:62]
594         else:
595             get_mac_id = lambda packet: packet['binary'][6:12]
596         for it in xrange(retry_count):
597             self.gen.clear_stats()
598             self.gen.start_traffic()
599             self.gen.start_capture()
600             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
601                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
602                      it + 1, retry_count)
603             if not self.skip_sleep():
604                 time.sleep(self.config.generic_poll_sec)
605             self.gen.stop_traffic()
606             self.gen.fetch_capture_packets()
607             self.gen.stop_capture()
608
609             for packet in self.gen.packet_list:
610                 mac_id = get_mac_id(packet)
611                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
612                 if src_mac in mac_map:
613                     port, chain = mac_map[src_mac]
614                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
615                              src_mac, chain, port)
616                     mac_map.pop(src_mac, None)
617
618                 if not mac_map:
619                     LOG.info('End-to-end connectivity established')
620                     return
621
622         raise TrafficClientException('End-to-end connectivity cannot be ensured')
623
624     def ensure_arp_successful(self):
625         """Resolve all IP using ARP and throw an exception in case of failure."""
626         dest_macs = self.gen.resolve_arp()
627         if dest_macs:
628             # all dest macs are discovered, saved them into the generator config
629             if self.config.vxlan:
630                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
631                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
632             else:
633                 self.generator_config.set_dest_macs(0, dest_macs[0])
634                 self.generator_config.set_dest_macs(1, dest_macs[1])
635         else:
636             raise TrafficClientException('ARP cannot be resolved')
637
638     def set_traffic(self, frame_size, bidirectional):
639         """Reconfigure the traffic generator for a new frame size."""
640         self.run_config['bidirectional'] = bidirectional
641         self.run_config['l2frame_size'] = frame_size
642         self.run_config['rates'] = [self.get_per_direction_rate()]
643         if bidirectional:
644             self.run_config['rates'].append(self.get_per_direction_rate())
645         else:
646             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
647             if unidir_reverse_pps > 0:
648                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
649         # Fix for [NFVBENCH-67], convert the rate string to PPS
650         for idx, rate in enumerate(self.run_config['rates']):
651             if 'rate_pps' not in rate:
652                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
653
654         self.gen.clear_streamblock()
655         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
656
657     def _modify_load(self, load):
658         self.current_total_rate = {'rate_percent': str(load)}
659         rate_per_direction = self.get_per_direction_rate()
660
661         self.gen.modify_rate(rate_per_direction, False)
662         self.run_config['rates'][0] = rate_per_direction
663         if self.run_config['bidirectional']:
664             self.gen.modify_rate(rate_per_direction, True)
665             self.run_config['rates'][1] = rate_per_direction
666
667     def get_ndr_and_pdr(self):
668         """Start the NDR/PDR iteration and return the results."""
669         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
670         targets = {}
671         if self.config.ndr_run:
672             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
673             targets['ndr'] = self.config.measurement.NDR
674         if self.config.pdr_run:
675             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
676             targets['pdr'] = self.config.measurement.PDR
677
678         self.run_config['start_time'] = time.time()
679         self.interval_collector = IntervalCollector(self.run_config['start_time'])
680         self.interval_collector.attach_notifier(self.notifier)
681         self.iteration_collector = IterationCollector(self.run_config['start_time'])
682         results = {}
683         self.__range_search(0.0, 200.0, targets, results)
684
685         results['iteration_stats'] = {
686             'ndr_pdr': self.iteration_collector.get()
687         }
688
689         if self.config.ndr_run:
690             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
691             results['ndr']['time_taken_sec'] = \
692                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
693             if self.config.pdr_run:
694                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
695                 results['pdr']['time_taken_sec'] = \
696                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
697         else:
698             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
699             results['pdr']['time_taken_sec'] = \
700                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
701         return results
702
703     def __get_dropped_rate(self, result):
704         dropped_pkts = result['rx']['dropped_pkts']
705         total_pkts = result['tx']['total_pkts']
706         if not total_pkts:
707             return float('inf')
708         return float(dropped_pkts) / total_pkts * 100
709
710     def get_stats(self):
711         """Collect final stats for previous run."""
712         stats = self.gen.get_stats()
713         retDict = {'total_tx_rate': stats['total_tx_rate']}
714         for port in self.PORTS:
715             retDict[port] = {'tx': {}, 'rx': {}}
716
717         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
718         rx_keys = tx_keys + ['dropped_pkts']
719
720         for port in self.PORTS:
721             for key in tx_keys:
722                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
723             for key in rx_keys:
724                 try:
725                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
726                 except ValueError:
727                     retDict[port]['rx'][key] = 0
728             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
729                 stats[port]['rx']['avg_delay_usec'])
730             retDict[port]['rx']['min_delay_usec'] = cast_integer(
731                 stats[port]['rx']['min_delay_usec'])
732             retDict[port]['rx']['max_delay_usec'] = cast_integer(
733                 stats[port]['rx']['max_delay_usec'])
734             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
735
736         ports = sorted(retDict.keys())
737         if self.run_config['bidirectional']:
738             retDict['overall'] = {'tx': {}, 'rx': {}}
739             for key in tx_keys:
740                 retDict['overall']['tx'][key] = \
741                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
742             for key in rx_keys:
743                 retDict['overall']['rx'][key] = \
744                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
745             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
746                           retDict[ports[1]]['rx']['total_pkts']]
747             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
748                           retDict[ports[1]]['rx']['avg_delay_usec']]
749             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
750                           retDict[ports[1]]['rx']['max_delay_usec']]
751             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
752                           retDict[ports[1]]['rx']['min_delay_usec']]
753             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
754             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
755             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
756             for key in ['pkt_bit_rate', 'pkt_rate']:
757                 for dirc in ['tx', 'rx']:
758                     retDict['overall'][dirc][key] /= 2.0
759         else:
760             retDict['overall'] = retDict[ports[0]]
761         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
762         return retDict
763
764     def __convert_rates(self, rate):
765         return utils.convert_rates(self.run_config['l2frame_size'],
766                                    rate,
767                                    self.intf_speed)
768
769     def __ndr_pdr_found(self, tag, load):
770         rates = self.__convert_rates({'rate_percent': load})
771         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
772         last_stats = self.iteration_collector.peek()
773         self.interval_collector.add_ndr_pdr(tag, last_stats)
774
775     def __format_output_stats(self, stats):
776         for key in self.PORTS + ['overall']:
777             interface = stats[key]
778             stats[key] = {
779                 'tx_pkts': interface['tx']['total_pkts'],
780                 'rx_pkts': interface['rx']['total_pkts'],
781                 'drop_percentage': interface['drop_rate_percent'],
782                 'drop_pct': interface['rx']['dropped_pkts'],
783                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
784                 'max_delay_usec': interface['rx']['max_delay_usec'],
785                 'min_delay_usec': interface['rx']['min_delay_usec'],
786             }
787
788         return stats
789
790     def __targets_found(self, rate, targets, results):
791         for tag, target in targets.iteritems():
792             LOG.info('Found %s (%s) load: %s', tag, target, rate)
793             self.__ndr_pdr_found(tag, rate)
794             results[tag]['timestamp_sec'] = time.time()
795
796     def __range_search(self, left, right, targets, results):
797         """Perform a binary search for a list of targets inside a [left..right] range or rate.
798
799         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
800                 indicating the rate to send on each interface
801         right   the right side of the range to search as a % of line rate
802                 indicating the rate to send on each interface
803         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
804                 ('ndr', 'pdr')
805         results a dict to store results
806         """
807         if not targets:
808             return
809         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
810
811         # Terminate search when gap is less than load epsilon
812         if right - left < self.config.measurement.load_epsilon:
813             self.__targets_found(left, targets, results)
814             return
815
816         # Obtain the average drop rate in for middle load
817         middle = (left + right) / 2.0
818         try:
819             stats, rates = self.__run_search_iteration(middle)
820         except STLError:
821             LOG.exception("Got exception from traffic generator during binary search")
822             self.__targets_found(left, targets, results)
823             return
824         # Split target dicts based on the avg drop rate
825         left_targets = {}
826         right_targets = {}
827         for tag, target in targets.iteritems():
828             if stats['overall']['drop_rate_percent'] <= target:
829                 # record the best possible rate found for this target
830                 results[tag] = rates
831                 results[tag].update({
832                     'load_percent_per_direction': middle,
833                     'stats': self.__format_output_stats(dict(stats)),
834                     'timestamp_sec': None
835                 })
836                 right_targets[tag] = target
837             else:
838                 # initialize to 0 all fields of result for
839                 # the worst case scenario of the binary search (if ndr/pdr is not found)
840                 if tag not in results:
841                     results[tag] = dict.fromkeys(rates, 0)
842                     empty_stats = self.__format_output_stats(dict(stats))
843                     for key in empty_stats:
844                         if isinstance(empty_stats[key], dict):
845                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
846                         else:
847                             empty_stats[key] = 0
848                     results[tag].update({
849                         'load_percent_per_direction': 0,
850                         'stats': empty_stats,
851                         'timestamp_sec': None
852                     })
853                 left_targets[tag] = target
854
855         # search lower half
856         self.__range_search(left, middle, left_targets, results)
857
858         # search upper half only if the upper rate does not exceed
859         # 100%, this only happens when the first search at 100%
860         # yields a DR that is < target DR
861         if middle >= 100:
862             self.__targets_found(100, right_targets, results)
863         else:
864             self.__range_search(middle, right, right_targets, results)
865
866     def __run_search_iteration(self, rate):
867         """Run one iteration at the given rate level.
868
869         rate: the rate to send on each port in percent (0 to 100)
870         """
871         self._modify_load(rate)
872
873         # poll interval stats and collect them
874         for stats in self.run_traffic():
875             self.interval_collector.add(stats)
876             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
877             if time_elapsed_ratio >= 1:
878                 self.cancel_traffic()
879                 if not self.skip_sleep():
880                     time.sleep(self.config.pause_sec)
881         self.interval_collector.reset()
882
883         # get stats from the run
884         stats = self.runner.client.get_stats()
885         current_traffic_config = self._get_traffic_config()
886         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
887                                         stats['total_tx_rate'])
888         if warning is not None:
889             stats['warning'] = warning
890
891         # save reliable stats from whole iteration
892         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
893         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
894         return stats, current_traffic_config['direction-total']
895
896     @staticmethod
897     def log_stats(stats):
898         """Log estimated stats during run."""
899         report = {
900             'datetime': str(datetime.now()),
901             'tx_packets': stats['overall']['tx']['total_pkts'],
902             'rx_packets': stats['overall']['rx']['total_pkts'],
903             'drop_packets': stats['overall']['rx']['dropped_pkts'],
904             'drop_rate_percent': stats['overall']['drop_rate_percent']
905         }
906         LOG.info('TX: %(tx_packets)d; '
907                  'RX: %(rx_packets)d; '
908                  'Est. Dropped: %(drop_packets)d; '
909                  'Est. Drop rate: %(drop_rate_percent).4f%%',
910                  report)
911
912     def run_traffic(self):
913         """Start traffic and return intermediate stats for each interval."""
914         stats = self.runner.run()
915         while self.runner.is_running:
916             self.log_stats(stats)
917             yield stats
918             stats = self.runner.poll_stats()
919             if stats is None:
920                 return
921         self.log_stats(stats)
922         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
923         yield stats
924
925     def cancel_traffic(self):
926         """Stop traffic."""
927         self.runner.stop()
928
929     def _get_traffic_config(self):
930         config = {}
931         load_total = 0.0
932         bps_total = 0.0
933         pps_total = 0.0
934         for idx, rate in enumerate(self.run_config['rates']):
935             key = 'direction-forward' if idx == 0 else 'direction-reverse'
936             config[key] = {
937                 'l2frame_size': self.run_config['l2frame_size'],
938                 'duration_sec': self.run_config['duration_sec']
939             }
940             config[key].update(rate)
941             config[key].update(self.__convert_rates(rate))
942             load_total += float(config[key]['rate_percent'])
943             bps_total += float(config[key]['rate_bps'])
944             pps_total += float(config[key]['rate_pps'])
945         config['direction-total'] = dict(config['direction-forward'])
946         config['direction-total'].update({
947             'rate_percent': load_total,
948             'rate_pps': cast_integer(pps_total),
949             'rate_bps': bps_total
950         })
951
952         return config
953
954     def get_run_config(self, results):
955         """Return configuration which was used for the last run."""
956         r = {}
957         # because we want each direction to have the far end RX rates,
958         # use the far end index (1-idx) to retrieve the RX rates
959         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
960             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
961             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
962             r[key] = {
963                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
964                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
965                 "rx": self.__convert_rates({'rate_pps': rx_rate})
966             }
967
968         total = {}
969         for direction in ['orig', 'tx', 'rx']:
970             total[direction] = {}
971             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
972                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
973
974         r['direction-total'] = total
975         return r
976
977     def insert_interface_stats(self, pps_list):
978         """Insert interface stats to a list of packet path stats.
979
980         pps_list: a list of packet path stats instances indexed by chain index
981
982         This function will insert the packet path stats for the traffic gen ports 0 and 1
983         with itemized per chain tx/rx counters.
984         There will be as many packet path stats as chains.
985         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
986         self.pps_list:
987         [
988         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
989         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
990         ...
991         ]
992         """
993         def get_if_stats(chain_idx):
994             return [InterfaceStats('p' + str(port), self.tool)
995                     for port in range(2)]
996         # keep the list of list of interface stats indexed by the chain id
997         self.ifstats = [get_if_stats(chain_idx)
998                         for chain_idx in range(self.config.service_chain_count)]
999         # note that we need to make a copy of the ifs list so that any modification in the
1000         # list from pps will not change the list saved in self.ifstats
1001         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1002         # insert the corresponding pps in the passed list
1003         pps_list.extend(self.pps_list)
1004
1005     def update_interface_stats(self, diff=False):
1006         """Update all interface stats.
1007
1008         diff: if False, simply refresh the interface stats values with latest values
1009               if True, diff the interface stats with the latest values
1010         Make sure that the interface stats inserted in insert_interface_stats() are updated
1011         with proper values.
1012         self.ifstats:
1013         [
1014         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1015         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1016         ...
1017         ]
1018         """
1019         if diff:
1020             stats = self.gen.get_stats()
1021             for chain_idx, ifs in enumerate(self.ifstats):
1022                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1023                 # corresponding to the
1024                 # port 0 and port 1 for the given chain_idx
1025                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1026                 # interface stats for the pps because it could have been modified to contain
1027                 # additional interface stats
1028                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1029             # special handling for vxlan
1030             # in case of vxlan, flow stats are not available so all rx counters will be
1031             # zeros when the total rx port counter is non zero.
1032             # in that case,
1033             for port in range(2):
1034                 total_rx = 0
1035                 for ifs in self.ifstats:
1036                     total_rx += ifs[port].rx
1037                 if total_rx == 0:
1038                     # check if the total port rx from Trex is also zero
1039                     port_rx = stats[port]['rx']['total_pkts']
1040                     if port_rx:
1041                         # the total rx for all chains from port level stats is non zero
1042                         # which means that the per-chain stats are not available
1043                         if len(self.ifstats) == 1:
1044                             # only one chain, simply report the port level rx to the chain rx stats
1045                             self.ifstats[0][port].rx = port_rx
1046                         else:
1047                             for ifs in self.ifstats:
1048                                 # mark this data as unavailable
1049                                 ifs[port].rx = None
1050                             # pitch in the total rx only in the last chain pps
1051                             self.ifstats[-1][port].rx_total = port_rx
1052
1053     @staticmethod
1054     def compare_tx_rates(required, actual):
1055         """Compare the actual TX rate to the required TX rate."""
1056         threshold = 0.9
1057         are_different = False
1058         try:
1059             if float(actual) / required < threshold:
1060                 are_different = True
1061         except ZeroDivisionError:
1062             are_different = True
1063
1064         if are_different:
1065             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1066                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1067                   "to achieve the requested TX rate.".format(r=required, a=actual)
1068             LOG.info(msg)
1069             return msg
1070
1071         return None
1072
1073     def get_per_direction_rate(self):
1074         """Get the rate for each direction."""
1075         divisor = 2 if self.run_config['bidirectional'] else 1
1076         if 'rate_percent' in self.current_total_rate:
1077             # don't split rate if it's percentage
1078             divisor = 1
1079
1080         return utils.divide_rate(self.current_total_rate, divisor)
1081
1082     def close(self):
1083         """Close this instance."""
1084         try:
1085             self.gen.stop_traffic()
1086         except Exception:
1087             pass
1088         self.gen.clear_stats()
1089         self.gen.cleanup()