NFVBENCH-113 Add direct support for trex cores as an cli/config option
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex_stl_lib.api import STLError
27 # pylint: enable=import-error
28
29 from log import LOG
30 from packet_stats import InterfaceStats
31 from packet_stats import PacketPathStats
32 from stats_collector import IntervalCollector
33 from stats_collector import IterationCollector
34 import traffic_gen.traffic_utils as utils
35 from utils import cast_integer
36
37
38 class TrafficClientException(Exception):
39     """Generic traffic client exception."""
40
41     pass
42
43
44 class TrafficRunner(object):
45     """Serialize various steps required to run traffic."""
46
47     def __init__(self, client, duration_sec, interval_sec=0):
48         """Create a traffic runner."""
49         self.client = client
50         self.start_time = None
51         self.duration_sec = duration_sec
52         self.interval_sec = interval_sec
53
54     def run(self):
55         """Clear stats and instruct the traffic generator to start generating traffic."""
56         if self.is_running():
57             return None
58         LOG.info('Running traffic generator')
59         self.client.gen.clear_stats()
60         self.client.gen.start_traffic()
61         self.start_time = time.time()
62         return self.poll_stats()
63
64     def stop(self):
65         """Stop the current run and instruct the traffic generator to stop traffic."""
66         if self.is_running():
67             self.start_time = None
68             self.client.gen.stop_traffic()
69
70     def is_running(self):
71         """Check if a run is still pending."""
72         return self.start_time is not None
73
74     def time_elapsed(self):
75         """Return time elapsed since start of run."""
76         if self.is_running():
77             return time.time() - self.start_time
78         return self.duration_sec
79
80     def poll_stats(self):
81         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
82
83         return: latest stats or None if traffic is stopped
84         """
85         if not self.is_running():
86             return None
87         if self.client.skip_sleep():
88             self.stop()
89             return self.client.get_stats()
90         time_elapsed = self.time_elapsed()
91         if time_elapsed > self.duration_sec:
92             self.stop()
93             return None
94         time_left = self.duration_sec - time_elapsed
95         if self.interval_sec > 0.0:
96             if time_left <= self.interval_sec:
97                 time.sleep(time_left)
98                 self.stop()
99             else:
100                 time.sleep(self.interval_sec)
101         else:
102             time.sleep(self.duration_sec)
103             self.stop()
104         return self.client.get_stats()
105
106
107 class IpBlock(object):
108     """Manage a block of IP addresses."""
109
110     def __init__(self, base_ip, step_ip, count_ip):
111         """Create an IP block."""
112         self.base_ip_int = Device.ip_to_int(base_ip)
113         self.step = Device.ip_to_int(step_ip)
114         self.max_available = count_ip
115         self.next_free = 0
116
117     def get_ip(self, index=0):
118         """Return the IP address at given index."""
119         if index < 0 or index >= self.max_available:
120             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
121         return Device.int_to_ip(self.base_ip_int + index * self.step)
122
123     def reserve_ip_range(self, count):
124         """Reserve a range of count consecutive IP addresses spaced by step."""
125         if self.next_free + count > self.max_available:
126             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
127                              (self.next_free,
128                               self.max_available,
129                               count))
130         first_ip = self.get_ip(self.next_free)
131         last_ip = self.get_ip(self.next_free + count - 1)
132         self.next_free += count
133         return (first_ip, last_ip)
134
135     def reset_reservation(self):
136         """Reset all reservations and restart with a completely unused IP block."""
137         self.next_free = 0
138
139
140 class Device(object):
141     """Represent a port device and all information associated to it.
142
143     In the curent version we only support 2 port devices for the traffic generator
144     identified as port 0 or port 1.
145     """
146
147     def __init__(self, port, generator_config):
148         """Create a new device for a given port."""
149         self.generator_config = generator_config
150         self.chain_count = generator_config.service_chain_count
151         self.flow_count = generator_config.flow_count / 2
152         self.port = port
153         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
154         self.vtep_vlan = None
155         self.vtep_src_mac = None
156         self.vxlan = False
157         self.pci = generator_config.interfaces[port].pci
158         self.mac = None
159         self.dest_macs = None
160         self.vtep_dst_mac = None
161         self.vtep_dst_ip = None
162         if generator_config.vteps is None:
163             self.vtep_src_ip = None
164         else:
165             self.vtep_src_ip = generator_config.vteps[port]
166         self.vnis = None
167         self.vlans = None
168         self.ip_addrs = generator_config.ip_addrs[port]
169         subnet = IPNetwork(self.ip_addrs)
170         self.ip = subnet.ip.format()
171         self.ip_addrs_step = generator_config.ip_addrs_step
172         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
173         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
174                                    generator_config.gateway_ip_addrs_step,
175                                    self.chain_count)
176         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
177         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
178                                       generator_config.tg_gateway_ip_addrs_step,
179                                       self.chain_count)
180         self.udp_src_port = generator_config.udp_src_port
181         self.udp_dst_port = generator_config.udp_dst_port
182
183     def set_mac(self, mac):
184         """Set the local MAC for this port device."""
185         if mac is None:
186             raise TrafficClientException('Trying to set traffic generator MAC address as None')
187         self.mac = mac
188
189     def get_peer_device(self):
190         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
191         return self.generator_config.devices[1 - self.port]
192
193     def set_vtep_dst_mac(self, dest_macs):
194         """Set the list of dest MACs indexed by the chain id.
195
196         This is only called in 2 cases:
197         - VM macs discovered using openstack API
198         - dest MACs provisioned in config file
199         """
200         self.vtep_dst_mac = map(str, dest_macs)
201
202     def set_dest_macs(self, dest_macs):
203         """Set the list of dest MACs indexed by the chain id.
204
205         This is only called in 2 cases:
206         - VM macs discovered using openstack API
207         - dest MACs provisioned in config file
208         """
209         self.dest_macs = map(str, dest_macs)
210
211     def get_dest_macs(self):
212         """Get the list of dest macs for this device.
213
214         If set_dest_macs was never called, assumes l2-loopback and return
215         a list of peer mac (as many as chains but normally only 1 chain)
216         """
217         if self.dest_macs:
218             return self.dest_macs
219         # assume this is l2-loopback
220         return [self.get_peer_device().mac] * self.chain_count
221
222     def set_vlans(self, vlans):
223         """Set the list of vlans to use indexed by the chain id."""
224         self.vlans = vlans
225         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
226
227     def set_vtep_vlan(self, vlan):
228         """Set the vtep vlan to use indexed by specific port."""
229         self.vtep_vlan = vlan
230         self.vxlan = True
231         self.vlan_tagging = None
232         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
233
234     def set_vxlan_endpoints(self, src_ip, dst_ip):
235         self.vtep_dst_ip = dst_ip
236         self.vtep_src_ip = src_ip
237         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
238                  self.vtep_src_ip, self.vtep_dst_ip)
239
240     def set_vxlans(self, vnis):
241         self.vnis = vnis
242         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
243
244     def get_gw_ip(self, chain_index):
245         """Retrieve the IP address assigned for the gateway of a given chain."""
246         return self.gw_ip_block.get_ip(chain_index)
247
248     def get_stream_configs(self):
249         """Get the stream config for a given chain on this device.
250
251         Called by the traffic generator driver to program the traffic generator properly
252         before generating traffic
253         """
254         configs = []
255         # exact flow count for each chain is calculated as follows:
256         # - all chains except the first will have the same flow count
257         #   calculated as (total_flows + chain_count - 1) / chain_count
258         # - the first chain will have the remainder
259         # example 11 flows and 3 chains => 3, 4, 4
260         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
261         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
262         peer = self.get_peer_device()
263         self.ip_block.reset_reservation()
264         peer.ip_block.reset_reservation()
265         dest_macs = self.get_dest_macs()
266
267         for chain_idx in xrange(self.chain_count):
268             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
269             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
270
271             configs.append({
272                 'count': cur_chain_flow_count,
273                 'mac_src': self.mac,
274                 'mac_dst': dest_macs[chain_idx],
275                 'ip_src_addr': src_ip_first,
276                 'ip_src_addr_max': src_ip_last,
277                 'ip_src_count': cur_chain_flow_count,
278                 'ip_dst_addr': dst_ip_first,
279                 'ip_dst_addr_max': dst_ip_last,
280                 'ip_dst_count': cur_chain_flow_count,
281                 'ip_addrs_step': self.ip_addrs_step,
282                 'udp_src_port': self.udp_src_port,
283                 'udp_dst_port': self.udp_dst_port,
284                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
285                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
286                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
287                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
288                 'vxlan': self.vxlan,
289                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
290                 'vtep_src_mac': self.mac if self.vxlan is True else None,
291                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
292                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
293                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
294                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
295             })
296             # after first chain, fall back to the flow count for all other chains
297             cur_chain_flow_count = flows_per_chain
298         return configs
299
300     @staticmethod
301     def ip_to_int(addr):
302         """Convert an IP address from string to numeric."""
303         return struct.unpack("!I", socket.inet_aton(addr))[0]
304
305     @staticmethod
306     def int_to_ip(nvalue):
307         """Convert an IP address from numeric to string."""
308         return socket.inet_ntoa(struct.pack("!I", nvalue))
309
310
311 class GeneratorConfig(object):
312     """Represents traffic configuration for currently running traffic profile."""
313
314     DEFAULT_IP_STEP = '0.0.0.1'
315     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
316
317     def __init__(self, config):
318         """Create a generator config."""
319         self.config = config
320         # name of the generator profile (normally trex or dummy)
321         # pick the default one if not specified explicitly from cli options
322         if not config.generator_profile:
323             config.generator_profile = config.traffic_generator.default_profile
324         # pick up the profile dict based on the name
325         gen_config = self.__match_generator_profile(config.traffic_generator,
326                                                     config.generator_profile)
327         self.gen_config = gen_config
328         # copy over fields from the dict
329         self.tool = gen_config.tool
330         self.ip = gen_config.ip
331         # overrides on config.cores and config.mbuf_factor
332         if config.cores:
333             self.cores = config.cores
334         else:
335             self.cores = gen_config.get('cores', 1)
336         self.mbuf_factor = config.mbuf_factor
337         if gen_config.intf_speed:
338             # interface speed is overriden from config
339             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
340         else:
341             # interface speed is discovered/provided by the traffic generator
342             self.intf_speed = 0
343         self.software_mode = gen_config.get('software_mode', False)
344         self.interfaces = gen_config.interfaces
345         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
346             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
347
348         self.service_chain = config.service_chain
349         self.service_chain_count = config.service_chain_count
350         self.flow_count = config.flow_count
351         self.host_name = gen_config.host_name
352
353         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
354         self.ip_addrs = gen_config.ip_addrs
355         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
356         self.tg_gateway_ip_addrs_step = \
357             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
358         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
359         self.gateway_ips = gen_config.gateway_ip_addrs
360         self.udp_src_port = gen_config.udp_src_port
361         self.udp_dst_port = gen_config.udp_dst_port
362         self.vteps = gen_config.get('vteps')
363         self.vnis = gen_config.get('vnis')
364         self.devices = [Device(port, self) for port in [0, 1]]
365         # This should normally always be [0, 1]
366         self.ports = [device.port for device in self.devices]
367
368         # check that pci is not empty
369         if not gen_config.interfaces[0].get('pci', None) or \
370            not gen_config.interfaces[1].get('pci', None):
371             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
372
373         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
374         self.vlan_tagging = config.vlan_tagging
375
376         # needed for result/summarizer
377         config['tg-name'] = gen_config.name
378         config['tg-tool'] = self.tool
379
380     def to_json(self):
381         """Get json form to display the content into the overall result dict."""
382         return dict(self.gen_config)
383
384     def set_dest_macs(self, port_index, dest_macs):
385         """Set the list of dest MACs indexed by the chain id on given port.
386
387         port_index: the port for which dest macs must be set
388         dest_macs: a list of dest MACs indexed by chain id
389         """
390         if len(dest_macs) != self.config.service_chain_count:
391             raise TrafficClientException('Dest MAC list %s must have %d entries' %
392                                          (dest_macs, self.config.service_chain_count))
393         self.devices[port_index].set_dest_macs(dest_macs)
394         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
395
396     def set_vtep_dest_macs(self, port_index, dest_macs):
397         """Set the list of dest MACs indexed by the chain id on given port.
398
399         port_index: the port for which dest macs must be set
400         dest_macs: a list of dest MACs indexed by chain id
401         """
402         if len(dest_macs) != self.config.service_chain_count:
403             raise TrafficClientException('Dest MAC list %s must have %d entries' %
404                                          (dest_macs, self.config.service_chain_count))
405         self.devices[port_index].set_vtep_dst_mac(dest_macs)
406         LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
407
408     def get_dest_macs(self):
409         """Return the list of dest macs indexed by port."""
410         return [dev.get_dest_macs() for dev in self.devices]
411
412     def set_vlans(self, port_index, vlans):
413         """Set the list of vlans to use indexed by the chain id on given port.
414
415         port_index: the port for which VLANs must be set
416         vlans: a  list of vlan lists indexed by chain id
417         """
418         if len(vlans) != self.config.service_chain_count:
419             raise TrafficClientException('VLAN list %s must have %d entries' %
420                                          (vlans, self.config.service_chain_count))
421         self.devices[port_index].set_vlans(vlans)
422
423     def set_vxlans(self, port_index, vxlans):
424         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
425
426         port_index: the port for which VXLANs must be set
427         VXLANs: a  list of VNIs lists indexed by chain id
428         """
429         if len(vxlans) != self.config.service_chain_count:
430             raise TrafficClientException('VXLAN list %s must have %d entries' %
431                                          (vxlans, self.config.service_chain_count))
432         self.devices[port_index].set_vxlans(vxlans)
433
434     def set_vtep_vlan(self, port_index, vlan):
435         """Set the vtep vlan to use indexed by the chain id on given port.
436         port_index: the port for which VLAN must be set
437         """
438         self.devices[port_index].set_vtep_vlan(vlan)
439
440     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
441         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
442
443     @staticmethod
444     def __match_generator_profile(traffic_generator, generator_profile):
445         gen_config = AttrDict(traffic_generator)
446         gen_config.pop('default_profile')
447         gen_config.pop('generator_profile')
448         matching_profile = [profile for profile in traffic_generator.generator_profile if
449                             profile.name == generator_profile]
450         if len(matching_profile) != 1:
451             raise Exception('Traffic generator profile not found: ' + generator_profile)
452
453         gen_config.update(matching_profile[0])
454         return gen_config
455
456
457 class TrafficClient(object):
458     """Traffic generator client with NDR/PDR binary seearch."""
459
460     PORTS = [0, 1]
461
462     def __init__(self, config, notifier=None):
463         """Create a new TrafficClient instance.
464
465         config: nfvbench config
466         notifier: notifier (optional)
467
468         A new instance is created everytime the nfvbench config may have changed.
469         """
470         self.config = config
471         self.generator_config = GeneratorConfig(config)
472         self.tool = self.generator_config.tool
473         self.gen = self._get_generator()
474         self.notifier = notifier
475         self.interval_collector = None
476         self.iteration_collector = None
477         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
478         self.config.frame_sizes = self._get_frame_sizes()
479         self.run_config = {
480             'l2frame_size': None,
481             'duration_sec': self.config.duration_sec,
482             'bidirectional': True,
483             'rates': []  # to avoid unsbuscriptable-obj warning
484         }
485         self.current_total_rate = {'rate_percent': '10'}
486         if self.config.single_run:
487             self.current_total_rate = utils.parse_rate_str(self.config.rate)
488         self.ifstats = None
489         # Speed is either discovered when connecting to TG or set from config
490         # This variable is 0 if not yet discovered from TG or must be the speed of
491         # each interface in bits per second
492         self.intf_speed = self.generator_config.intf_speed
493
494     def _get_generator(self):
495         tool = self.tool.lower()
496         if tool == 'trex':
497             from traffic_gen import trex
498             return trex.TRex(self)
499         if tool == 'dummy':
500             from traffic_gen import dummy
501             return dummy.DummyTG(self)
502         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
503
504     def skip_sleep(self):
505         """Skip all sleeps when doing unit testing with dummy TG.
506
507         Must be overriden using mock.patch
508         """
509         return False
510
511     def _get_frame_sizes(self):
512         traffic_profile_name = self.config.traffic.profile
513         matching_profiles = [profile for profile in self.config.traffic_profile if
514                              profile.name == traffic_profile_name]
515         if len(matching_profiles) > 1:
516             raise TrafficClientException('Multiple traffic profiles with name: ' +
517                                          traffic_profile_name)
518         elif not matching_profiles:
519             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
520         return matching_profiles[0].l2frame_size
521
522     def start_traffic_generator(self):
523         """Start the traffic generator process (traffic not started yet)."""
524         self.gen.connect()
525         # pick up the interface speed if it is not set from config
526         intf_speeds = self.gen.get_port_speed_gbps()
527         # convert Gbps unit into bps
528         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
529         if self.intf_speed:
530             # interface speed is overriden from config
531             if self.intf_speed != tg_if_speed:
532                 # Warn the user if the speed in the config is different
533                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
534                             intf_speeds[0])
535         else:
536             # interface speed not provisioned by config
537             self.intf_speed = tg_if_speed
538             # also update the speed in the tg config
539             self.generator_config.intf_speed = tg_if_speed
540
541         # Save the traffic generator local MAC
542         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
543             device.set_mac(mac)
544
545     def setup(self):
546         """Set up the traffic client."""
547         self.gen.clear_stats()
548
549     def get_version(self):
550         """Get the traffic generator version."""
551         return self.gen.get_version()
552
553     def ensure_end_to_end(self):
554         """Ensure traffic generator receives packets it has transmitted.
555
556         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
557
558         VMs that are started and in active state may not pass traffic yet. It is imperative to make
559         sure that all VMs are passing traffic in both directions before starting any benchmarking.
560         To verify this, we need to send at a low frequency bi-directional packets and make sure
561         that we receive all packets back from all VMs. The number of flows is equal to 2 times
562         the number of chains (1 per direction) and we need to make sure we receive packets coming
563         from exactly 2 x chain count different source MAC addresses.
564
565         Example:
566             PVP chain (1 VM per chain)
567             N = 10 (number of chains)
568             Flow count = 20 (number of flows)
569             If the number of unique source MAC addresses from received packets is 20 then
570             all 10 VMs 10 VMs are in operational state.
571         """
572         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
573         # send 2pps on each chain and each direction
574         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
575         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False)
576
577         # ensures enough traffic is coming back
578         retry_count = (self.config.check_traffic_time_sec +
579                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
580
581         # we expect to see packets coming from 2 unique MAC per chain
582         # because there can be flooding in the case of shared net
583         # we must verify that packets from the right VMs are received
584         # and not just count unique src MAC
585         # create a dict of (port, chain) tuples indexed by dest mac
586         mac_map = {}
587         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
588             for chain, mac in enumerate(dest_macs):
589                 mac_map[mac] = (port, chain)
590         unique_src_mac_count = len(mac_map)
591         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
592             get_mac_id = lambda packet: packet['binary'][60:66]
593         elif self.config.vxlan:
594             get_mac_id = lambda packet: packet['binary'][56:62]
595         else:
596             get_mac_id = lambda packet: packet['binary'][6:12]
597         for it in xrange(retry_count):
598             self.gen.clear_stats()
599             self.gen.start_traffic()
600             self.gen.start_capture()
601             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
602                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
603                      it + 1, retry_count)
604             if not self.skip_sleep():
605                 time.sleep(self.config.generic_poll_sec)
606             self.gen.stop_traffic()
607             self.gen.fetch_capture_packets()
608             self.gen.stop_capture()
609
610             for packet in self.gen.packet_list:
611                 mac_id = get_mac_id(packet)
612                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
613                 if src_mac in mac_map:
614                     port, chain = mac_map[src_mac]
615                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
616                              src_mac, chain, port)
617                     mac_map.pop(src_mac, None)
618
619                 if not mac_map:
620                     LOG.info('End-to-end connectivity established')
621                     return
622
623         raise TrafficClientException('End-to-end connectivity cannot be ensured')
624
625     def ensure_arp_successful(self):
626         """Resolve all IP using ARP and throw an exception in case of failure."""
627         dest_macs = self.gen.resolve_arp()
628         if dest_macs:
629             # all dest macs are discovered, saved them into the generator config
630             if self.config.vxlan:
631                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
632                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
633             else:
634                 self.generator_config.set_dest_macs(0, dest_macs[0])
635                 self.generator_config.set_dest_macs(1, dest_macs[1])
636         else:
637             raise TrafficClientException('ARP cannot be resolved')
638
639     def set_traffic(self, frame_size, bidirectional):
640         """Reconfigure the traffic generator for a new frame size."""
641         self.run_config['bidirectional'] = bidirectional
642         self.run_config['l2frame_size'] = frame_size
643         self.run_config['rates'] = [self.get_per_direction_rate()]
644         if bidirectional:
645             self.run_config['rates'].append(self.get_per_direction_rate())
646         else:
647             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
648             if unidir_reverse_pps > 0:
649                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
650         # Fix for [NFVBENCH-67], convert the rate string to PPS
651         for idx, rate in enumerate(self.run_config['rates']):
652             if 'rate_pps' not in rate:
653                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
654
655         self.gen.clear_streamblock()
656         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
657
658     def _modify_load(self, load):
659         self.current_total_rate = {'rate_percent': str(load)}
660         rate_per_direction = self.get_per_direction_rate()
661
662         self.gen.modify_rate(rate_per_direction, False)
663         self.run_config['rates'][0] = rate_per_direction
664         if self.run_config['bidirectional']:
665             self.gen.modify_rate(rate_per_direction, True)
666             self.run_config['rates'][1] = rate_per_direction
667
668     def get_ndr_and_pdr(self):
669         """Start the NDR/PDR iteration and return the results."""
670         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
671         targets = {}
672         if self.config.ndr_run:
673             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
674             targets['ndr'] = self.config.measurement.NDR
675         if self.config.pdr_run:
676             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
677             targets['pdr'] = self.config.measurement.PDR
678
679         self.run_config['start_time'] = time.time()
680         self.interval_collector = IntervalCollector(self.run_config['start_time'])
681         self.interval_collector.attach_notifier(self.notifier)
682         self.iteration_collector = IterationCollector(self.run_config['start_time'])
683         results = {}
684         self.__range_search(0.0, 200.0, targets, results)
685
686         results['iteration_stats'] = {
687             'ndr_pdr': self.iteration_collector.get()
688         }
689
690         if self.config.ndr_run:
691             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
692             results['ndr']['time_taken_sec'] = \
693                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
694             if self.config.pdr_run:
695                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
696                 results['pdr']['time_taken_sec'] = \
697                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
698         else:
699             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
700             results['pdr']['time_taken_sec'] = \
701                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
702         return results
703
704     def __get_dropped_rate(self, result):
705         dropped_pkts = result['rx']['dropped_pkts']
706         total_pkts = result['tx']['total_pkts']
707         if not total_pkts:
708             return float('inf')
709         return float(dropped_pkts) / total_pkts * 100
710
711     def get_stats(self):
712         """Collect final stats for previous run."""
713         stats = self.gen.get_stats()
714         retDict = {'total_tx_rate': stats['total_tx_rate']}
715         for port in self.PORTS:
716             retDict[port] = {'tx': {}, 'rx': {}}
717
718         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
719         rx_keys = tx_keys + ['dropped_pkts']
720
721         for port in self.PORTS:
722             for key in tx_keys:
723                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
724             for key in rx_keys:
725                 try:
726                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
727                 except ValueError:
728                     retDict[port]['rx'][key] = 0
729             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
730                 stats[port]['rx']['avg_delay_usec'])
731             retDict[port]['rx']['min_delay_usec'] = cast_integer(
732                 stats[port]['rx']['min_delay_usec'])
733             retDict[port]['rx']['max_delay_usec'] = cast_integer(
734                 stats[port]['rx']['max_delay_usec'])
735             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
736
737         ports = sorted(retDict.keys())
738         if self.run_config['bidirectional']:
739             retDict['overall'] = {'tx': {}, 'rx': {}}
740             for key in tx_keys:
741                 retDict['overall']['tx'][key] = \
742                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
743             for key in rx_keys:
744                 retDict['overall']['rx'][key] = \
745                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
746             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
747                           retDict[ports[1]]['rx']['total_pkts']]
748             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
749                           retDict[ports[1]]['rx']['avg_delay_usec']]
750             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
751                           retDict[ports[1]]['rx']['max_delay_usec']]
752             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
753                           retDict[ports[1]]['rx']['min_delay_usec']]
754             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
755             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
756             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
757             for key in ['pkt_bit_rate', 'pkt_rate']:
758                 for dirc in ['tx', 'rx']:
759                     retDict['overall'][dirc][key] /= 2.0
760         else:
761             retDict['overall'] = retDict[ports[0]]
762         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
763         return retDict
764
765     def __convert_rates(self, rate):
766         return utils.convert_rates(self.run_config['l2frame_size'],
767                                    rate,
768                                    self.intf_speed)
769
770     def __ndr_pdr_found(self, tag, load):
771         rates = self.__convert_rates({'rate_percent': load})
772         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
773         last_stats = self.iteration_collector.peek()
774         self.interval_collector.add_ndr_pdr(tag, last_stats)
775
776     def __format_output_stats(self, stats):
777         for key in self.PORTS + ['overall']:
778             interface = stats[key]
779             stats[key] = {
780                 'tx_pkts': interface['tx']['total_pkts'],
781                 'rx_pkts': interface['rx']['total_pkts'],
782                 'drop_percentage': interface['drop_rate_percent'],
783                 'drop_pct': interface['rx']['dropped_pkts'],
784                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
785                 'max_delay_usec': interface['rx']['max_delay_usec'],
786                 'min_delay_usec': interface['rx']['min_delay_usec'],
787             }
788
789         return stats
790
791     def __targets_found(self, rate, targets, results):
792         for tag, target in targets.iteritems():
793             LOG.info('Found %s (%s) load: %s', tag, target, rate)
794             self.__ndr_pdr_found(tag, rate)
795             results[tag]['timestamp_sec'] = time.time()
796
797     def __range_search(self, left, right, targets, results):
798         """Perform a binary search for a list of targets inside a [left..right] range or rate.
799
800         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
801                 indicating the rate to send on each interface
802         right   the right side of the range to search as a % of line rate
803                 indicating the rate to send on each interface
804         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
805                 ('ndr', 'pdr')
806         results a dict to store results
807         """
808         if not targets:
809             return
810         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
811
812         # Terminate search when gap is less than load epsilon
813         if right - left < self.config.measurement.load_epsilon:
814             self.__targets_found(left, targets, results)
815             return
816
817         # Obtain the average drop rate in for middle load
818         middle = (left + right) / 2.0
819         try:
820             stats, rates = self.__run_search_iteration(middle)
821         except STLError:
822             LOG.exception("Got exception from traffic generator during binary search")
823             self.__targets_found(left, targets, results)
824             return
825         # Split target dicts based on the avg drop rate
826         left_targets = {}
827         right_targets = {}
828         for tag, target in targets.iteritems():
829             if stats['overall']['drop_rate_percent'] <= target:
830                 # record the best possible rate found for this target
831                 results[tag] = rates
832                 results[tag].update({
833                     'load_percent_per_direction': middle,
834                     'stats': self.__format_output_stats(dict(stats)),
835                     'timestamp_sec': None
836                 })
837                 right_targets[tag] = target
838             else:
839                 # initialize to 0 all fields of result for
840                 # the worst case scenario of the binary search (if ndr/pdr is not found)
841                 if tag not in results:
842                     results[tag] = dict.fromkeys(rates, 0)
843                     empty_stats = self.__format_output_stats(dict(stats))
844                     for key in empty_stats:
845                         if isinstance(empty_stats[key], dict):
846                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
847                         else:
848                             empty_stats[key] = 0
849                     results[tag].update({
850                         'load_percent_per_direction': 0,
851                         'stats': empty_stats,
852                         'timestamp_sec': None
853                     })
854                 left_targets[tag] = target
855
856         # search lower half
857         self.__range_search(left, middle, left_targets, results)
858
859         # search upper half only if the upper rate does not exceed
860         # 100%, this only happens when the first search at 100%
861         # yields a DR that is < target DR
862         if middle >= 100:
863             self.__targets_found(100, right_targets, results)
864         else:
865             self.__range_search(middle, right, right_targets, results)
866
867     def __run_search_iteration(self, rate):
868         """Run one iteration at the given rate level.
869
870         rate: the rate to send on each port in percent (0 to 100)
871         """
872         self._modify_load(rate)
873
874         # poll interval stats and collect them
875         for stats in self.run_traffic():
876             self.interval_collector.add(stats)
877             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
878             if time_elapsed_ratio >= 1:
879                 self.cancel_traffic()
880                 if not self.skip_sleep():
881                     time.sleep(self.config.pause_sec)
882         self.interval_collector.reset()
883
884         # get stats from the run
885         stats = self.runner.client.get_stats()
886         current_traffic_config = self._get_traffic_config()
887         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
888                                         stats['total_tx_rate'])
889         if warning is not None:
890             stats['warning'] = warning
891
892         # save reliable stats from whole iteration
893         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
894         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
895         return stats, current_traffic_config['direction-total']
896
897     @staticmethod
898     def log_stats(stats):
899         """Log estimated stats during run."""
900         report = {
901             'datetime': str(datetime.now()),
902             'tx_packets': stats['overall']['tx']['total_pkts'],
903             'rx_packets': stats['overall']['rx']['total_pkts'],
904             'drop_packets': stats['overall']['rx']['dropped_pkts'],
905             'drop_rate_percent': stats['overall']['drop_rate_percent']
906         }
907         LOG.info('TX: %(tx_packets)d; '
908                  'RX: %(rx_packets)d; '
909                  'Est. Dropped: %(drop_packets)d; '
910                  'Est. Drop rate: %(drop_rate_percent).4f%%',
911                  report)
912
913     def run_traffic(self):
914         """Start traffic and return intermediate stats for each interval."""
915         stats = self.runner.run()
916         while self.runner.is_running:
917             self.log_stats(stats)
918             yield stats
919             stats = self.runner.poll_stats()
920             if stats is None:
921                 return
922         self.log_stats(stats)
923         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
924         yield stats
925
926     def cancel_traffic(self):
927         """Stop traffic."""
928         self.runner.stop()
929
930     def _get_traffic_config(self):
931         config = {}
932         load_total = 0.0
933         bps_total = 0.0
934         pps_total = 0.0
935         for idx, rate in enumerate(self.run_config['rates']):
936             key = 'direction-forward' if idx == 0 else 'direction-reverse'
937             config[key] = {
938                 'l2frame_size': self.run_config['l2frame_size'],
939                 'duration_sec': self.run_config['duration_sec']
940             }
941             config[key].update(rate)
942             config[key].update(self.__convert_rates(rate))
943             load_total += float(config[key]['rate_percent'])
944             bps_total += float(config[key]['rate_bps'])
945             pps_total += float(config[key]['rate_pps'])
946         config['direction-total'] = dict(config['direction-forward'])
947         config['direction-total'].update({
948             'rate_percent': load_total,
949             'rate_pps': cast_integer(pps_total),
950             'rate_bps': bps_total
951         })
952
953         return config
954
955     def get_run_config(self, results):
956         """Return configuration which was used for the last run."""
957         r = {}
958         # because we want each direction to have the far end RX rates,
959         # use the far end index (1-idx) to retrieve the RX rates
960         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
961             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
962             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
963             r[key] = {
964                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
965                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
966                 "rx": self.__convert_rates({'rate_pps': rx_rate})
967             }
968
969         total = {}
970         for direction in ['orig', 'tx', 'rx']:
971             total[direction] = {}
972             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
973                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
974
975         r['direction-total'] = total
976         return r
977
978     def insert_interface_stats(self, pps_list):
979         """Insert interface stats to a list of packet path stats.
980
981         pps_list: a list of packet path stats instances indexed by chain index
982
983         This function will insert the packet path stats for the traffic gen ports 0 and 1
984         with itemized per chain tx/rx counters.
985         There will be as many packet path stats as chains.
986         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
987         self.pps_list:
988         [
989         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
990         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
991         ...
992         ]
993         """
994         def get_if_stats(chain_idx):
995             return [InterfaceStats('p' + str(port), self.tool)
996                     for port in range(2)]
997         # keep the list of list of interface stats indexed by the chain id
998         self.ifstats = [get_if_stats(chain_idx)
999                         for chain_idx in range(self.config.service_chain_count)]
1000         # note that we need to make a copy of the ifs list so that any modification in the
1001         # list from pps will not change the list saved in self.ifstats
1002         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1003         # insert the corresponding pps in the passed list
1004         pps_list.extend(self.pps_list)
1005
1006     def update_interface_stats(self, diff=False):
1007         """Update all interface stats.
1008
1009         diff: if False, simply refresh the interface stats values with latest values
1010               if True, diff the interface stats with the latest values
1011         Make sure that the interface stats inserted in insert_interface_stats() are updated
1012         with proper values.
1013         self.ifstats:
1014         [
1015         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1016         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1017         ...
1018         ]
1019         """
1020         if diff:
1021             stats = self.gen.get_stats()
1022             for chain_idx, ifs in enumerate(self.ifstats):
1023                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1024                 # corresponding to the
1025                 # port 0 and port 1 for the given chain_idx
1026                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1027                 # interface stats for the pps because it could have been modified to contain
1028                 # additional interface stats
1029                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1030
1031
1032     @staticmethod
1033     def compare_tx_rates(required, actual):
1034         """Compare the actual TX rate to the required TX rate."""
1035         threshold = 0.9
1036         are_different = False
1037         try:
1038             if float(actual) / required < threshold:
1039                 are_different = True
1040         except ZeroDivisionError:
1041             are_different = True
1042
1043         if are_different:
1044             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1045                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1046                   "to achieve the requested TX rate.".format(r=required, a=actual)
1047             LOG.info(msg)
1048             return msg
1049
1050         return None
1051
1052     def get_per_direction_rate(self):
1053         """Get the rate for each direction."""
1054         divisor = 2 if self.run_config['bidirectional'] else 1
1055         if 'rate_percent' in self.current_total_rate:
1056             # don't split rate if it's percentage
1057             divisor = 1
1058
1059         return utils.divide_rate(self.current_total_rate, divisor)
1060
1061     def close(self):
1062         """Close this instance."""
1063         try:
1064             self.gen.stop_traffic()
1065         except Exception:
1066             pass
1067         self.gen.clear_stats()
1068         self.gen.cleanup()