Merge "Add support for VXLAN latency"
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: enable=import-error
30
31 from log import LOG
32 from packet_stats import InterfaceStats
33 from packet_stats import PacketPathStats
34 from stats_collector import IntervalCollector
35 from stats_collector import IterationCollector
36 import traffic_gen.traffic_utils as utils
37 from utils import cast_integer
38
39
40 class TrafficClientException(Exception):
41     """Generic traffic client exception."""
42
43     pass
44
45
46 class TrafficRunner(object):
47     """Serialize various steps required to run traffic."""
48
49     def __init__(self, client, duration_sec, interval_sec=0):
50         """Create a traffic runner."""
51         self.client = client
52         self.start_time = None
53         self.duration_sec = duration_sec
54         self.interval_sec = interval_sec
55
56     def run(self):
57         """Clear stats and instruct the traffic generator to start generating traffic."""
58         if self.is_running():
59             return None
60         LOG.info('Running traffic generator')
61         self.client.gen.clear_stats()
62         self.client.gen.start_traffic()
63         self.start_time = time.time()
64         return self.poll_stats()
65
66     def stop(self):
67         """Stop the current run and instruct the traffic generator to stop traffic."""
68         if self.is_running():
69             self.start_time = None
70             self.client.gen.stop_traffic()
71
72     def is_running(self):
73         """Check if a run is still pending."""
74         return self.start_time is not None
75
76     def time_elapsed(self):
77         """Return time elapsed since start of run."""
78         if self.is_running():
79             return time.time() - self.start_time
80         return self.duration_sec
81
82     def poll_stats(self):
83         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
84
85         return: latest stats or None if traffic is stopped
86         """
87         if not self.is_running():
88             return None
89         if self.client.skip_sleep():
90             self.stop()
91             return self.client.get_stats()
92         time_elapsed = self.time_elapsed()
93         if time_elapsed > self.duration_sec:
94             self.stop()
95             return None
96         time_left = self.duration_sec - time_elapsed
97         if self.interval_sec > 0.0:
98             if time_left <= self.interval_sec:
99                 time.sleep(time_left)
100                 self.stop()
101             else:
102                 time.sleep(self.interval_sec)
103         else:
104             time.sleep(self.duration_sec)
105             self.stop()
106         return self.client.get_stats()
107
108
109 class IpBlock(object):
110     """Manage a block of IP addresses."""
111
112     def __init__(self, base_ip, step_ip, count_ip):
113         """Create an IP block."""
114         self.base_ip_int = Device.ip_to_int(base_ip)
115         self.step = Device.ip_to_int(step_ip)
116         self.max_available = count_ip
117         self.next_free = 0
118
119     def get_ip(self, index=0):
120         """Return the IP address at given index."""
121         if index < 0 or index >= self.max_available:
122             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
123         return Device.int_to_ip(self.base_ip_int + index * self.step)
124
125     def reserve_ip_range(self, count):
126         """Reserve a range of count consecutive IP addresses spaced by step."""
127         if self.next_free + count > self.max_available:
128             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
129                              (self.next_free,
130                               self.max_available,
131                               count))
132         first_ip = self.get_ip(self.next_free)
133         last_ip = self.get_ip(self.next_free + count - 1)
134         self.next_free += count
135         return (first_ip, last_ip)
136
137     def reset_reservation(self):
138         """Reset all reservations and restart with a completely unused IP block."""
139         self.next_free = 0
140
141
142 class Device(object):
143     """Represent a port device and all information associated to it.
144
145     In the curent version we only support 2 port devices for the traffic generator
146     identified as port 0 or port 1.
147     """
148
149     def __init__(self, port, generator_config):
150         """Create a new device for a given port."""
151         self.generator_config = generator_config
152         self.chain_count = generator_config.service_chain_count
153         self.flow_count = generator_config.flow_count / 2
154         self.port = port
155         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
156         self.vtep_vlan = None
157         self.vtep_src_mac = None
158         self.vxlan = False
159         self.pci = generator_config.interfaces[port].pci
160         self.mac = None
161         self.dest_macs = None
162         self.vtep_dst_mac = None
163         self.vtep_dst_ip = None
164         if generator_config.vteps is None:
165             self.vtep_src_ip = None
166         else:
167             self.vtep_src_ip = generator_config.vteps[port]
168         self.vnis = None
169         self.vlans = None
170         self.ip_addrs = generator_config.ip_addrs[port]
171         subnet = IPNetwork(self.ip_addrs)
172         self.ip = subnet.ip.format()
173         self.ip_addrs_step = generator_config.ip_addrs_step
174         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
175         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
176                                    generator_config.gateway_ip_addrs_step,
177                                    self.chain_count)
178         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
179         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
180                                       generator_config.tg_gateway_ip_addrs_step,
181                                       self.chain_count)
182         self.udp_src_port = generator_config.udp_src_port
183         self.udp_dst_port = generator_config.udp_dst_port
184
185     def set_mac(self, mac):
186         """Set the local MAC for this port device."""
187         if mac is None:
188             raise TrafficClientException('Trying to set traffic generator MAC address as None')
189         self.mac = mac
190
191     def get_peer_device(self):
192         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
193         return self.generator_config.devices[1 - self.port]
194
195     def set_vtep_dst_mac(self, dest_macs):
196         """Set the list of dest MACs indexed by the chain id.
197
198         This is only called in 2 cases:
199         - VM macs discovered using openstack API
200         - dest MACs provisioned in config file
201         """
202         self.vtep_dst_mac = map(str, dest_macs)
203
204     def set_dest_macs(self, dest_macs):
205         """Set the list of dest MACs indexed by the chain id.
206
207         This is only called in 2 cases:
208         - VM macs discovered using openstack API
209         - dest MACs provisioned in config file
210         """
211         self.dest_macs = map(str, dest_macs)
212
213     def get_dest_macs(self):
214         """Get the list of dest macs for this device.
215
216         If set_dest_macs was never called, assumes l2-loopback and return
217         a list of peer mac (as many as chains but normally only 1 chain)
218         """
219         if self.dest_macs:
220             return self.dest_macs
221         # assume this is l2-loopback
222         return [self.get_peer_device().mac] * self.chain_count
223
224     def set_vlans(self, vlans):
225         """Set the list of vlans to use indexed by the chain id."""
226         self.vlans = vlans
227         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
228
229     def set_vtep_vlan(self, vlan):
230         """Set the vtep vlan to use indexed by specific port."""
231         self.vtep_vlan = vlan
232         self.vxlan = True
233         self.vlan_tagging = None
234         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
235
236     def set_vxlan_endpoints(self, src_ip, dst_ip):
237         self.vtep_dst_ip = dst_ip
238         self.vtep_src_ip = src_ip
239         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
240                  self.vtep_src_ip, self.vtep_dst_ip)
241
242     def set_vxlans(self, vnis):
243         self.vnis = vnis
244         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
245
246     def set_gw_ip(self, gateway_ip):
247         self.gw_ip_block = IpBlock(gateway_ip,
248                                    self.generator_config.gateway_ip_addrs_step,
249                                    self.chain_count)
250
251     def get_gw_ip(self, chain_index):
252         """Retrieve the IP address assigned for the gateway of a given chain."""
253         return self.gw_ip_block.get_ip(chain_index)
254
255     def get_stream_configs(self):
256         """Get the stream config for a given chain on this device.
257
258         Called by the traffic generator driver to program the traffic generator properly
259         before generating traffic
260         """
261         configs = []
262         # exact flow count for each chain is calculated as follows:
263         # - all chains except the first will have the same flow count
264         #   calculated as (total_flows + chain_count - 1) / chain_count
265         # - the first chain will have the remainder
266         # example 11 flows and 3 chains => 3, 4, 4
267         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
268         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
269         peer = self.get_peer_device()
270         self.ip_block.reset_reservation()
271         peer.ip_block.reset_reservation()
272         dest_macs = self.get_dest_macs()
273
274         for chain_idx in xrange(self.chain_count):
275             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
276             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
277
278             configs.append({
279                 'count': cur_chain_flow_count,
280                 'mac_src': self.mac,
281                 'mac_dst': dest_macs[chain_idx],
282                 'ip_src_addr': src_ip_first,
283                 'ip_src_addr_max': src_ip_last,
284                 'ip_src_count': cur_chain_flow_count,
285                 'ip_dst_addr': dst_ip_first,
286                 'ip_dst_addr_max': dst_ip_last,
287                 'ip_dst_count': cur_chain_flow_count,
288                 'ip_addrs_step': self.ip_addrs_step,
289                 'udp_src_port': self.udp_src_port,
290                 'udp_dst_port': self.udp_dst_port,
291                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
292                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
293                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
294                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
295                 'vxlan': self.vxlan,
296                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
297                 'vtep_src_mac': self.mac if self.vxlan is True else None,
298                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
299                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
300                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
301                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
302             })
303             # after first chain, fall back to the flow count for all other chains
304             cur_chain_flow_count = flows_per_chain
305         return configs
306
307     @staticmethod
308     def ip_to_int(addr):
309         """Convert an IP address from string to numeric."""
310         return struct.unpack("!I", socket.inet_aton(addr))[0]
311
312     @staticmethod
313     def int_to_ip(nvalue):
314         """Convert an IP address from numeric to string."""
315         return socket.inet_ntoa(struct.pack("!I", nvalue))
316
317
318 class GeneratorConfig(object):
319     """Represents traffic configuration for currently running traffic profile."""
320
321     DEFAULT_IP_STEP = '0.0.0.1'
322     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
323
324     def __init__(self, config):
325         """Create a generator config."""
326         self.config = config
327         # name of the generator profile (normally trex or dummy)
328         # pick the default one if not specified explicitly from cli options
329         if not config.generator_profile:
330             config.generator_profile = config.traffic_generator.default_profile
331         # pick up the profile dict based on the name
332         gen_config = self.__match_generator_profile(config.traffic_generator,
333                                                     config.generator_profile)
334         self.gen_config = gen_config
335         # copy over fields from the dict
336         self.tool = gen_config.tool
337         self.ip = gen_config.ip
338         # overrides on config.cores and config.mbuf_factor
339         if config.cores:
340             self.cores = config.cores
341         else:
342             self.cores = gen_config.get('cores', 1)
343         self.mbuf_factor = config.mbuf_factor
344         self.mbuf_64 = config.mbuf_64
345         self.hdrh = not config.disable_hdrh
346         if gen_config.intf_speed:
347             # interface speed is overriden from config
348             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
349         else:
350             # interface speed is discovered/provided by the traffic generator
351             self.intf_speed = 0
352         self.name = gen_config.name
353         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
354         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
355         self.limit_memory = gen_config.get('limit_memory', 1024)
356         self.software_mode = gen_config.get('software_mode', False)
357         self.interfaces = gen_config.interfaces
358         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
359             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
360         self.service_chain = config.service_chain
361         self.service_chain_count = config.service_chain_count
362         self.flow_count = config.flow_count
363         self.host_name = gen_config.host_name
364
365         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
366         self.ip_addrs = gen_config.ip_addrs
367         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
368         self.tg_gateway_ip_addrs_step = \
369             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
370         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
371         self.gateway_ips = gen_config.gateway_ip_addrs
372         self.udp_src_port = gen_config.udp_src_port
373         self.udp_dst_port = gen_config.udp_dst_port
374         self.vteps = gen_config.get('vteps')
375         self.devices = [Device(port, self) for port in [0, 1]]
376         # This should normally always be [0, 1]
377         self.ports = [device.port for device in self.devices]
378
379         # check that pci is not empty
380         if not gen_config.interfaces[0].get('pci', None) or \
381            not gen_config.interfaces[1].get('pci', None):
382             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
383
384         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
385         self.vlan_tagging = config.vlan_tagging
386
387         # needed for result/summarizer
388         config['tg-name'] = gen_config.name
389         config['tg-tool'] = self.tool
390
391     def to_json(self):
392         """Get json form to display the content into the overall result dict."""
393         return dict(self.gen_config)
394
395     def set_dest_macs(self, port_index, dest_macs):
396         """Set the list of dest MACs indexed by the chain id on given port.
397
398         port_index: the port for which dest macs must be set
399         dest_macs: a list of dest MACs indexed by chain id
400         """
401         if len(dest_macs) < self.config.service_chain_count:
402             raise TrafficClientException('Dest MAC list %s must have %d entries' %
403                                          (dest_macs, self.config.service_chain_count))
404         # only pass the first scc dest MACs
405         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
406         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
407
408     def set_vtep_dest_macs(self, port_index, dest_macs):
409         """Set the list of dest MACs indexed by the chain id on given port.
410
411         port_index: the port for which dest macs must be set
412         dest_macs: a list of dest MACs indexed by chain id
413         """
414         if len(dest_macs) != self.config.service_chain_count:
415             raise TrafficClientException('Dest MAC list %s must have %d entries' %
416                                          (dest_macs, self.config.service_chain_count))
417         self.devices[port_index].set_vtep_dst_mac(dest_macs)
418         LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
419
420     def get_dest_macs(self):
421         """Return the list of dest macs indexed by port."""
422         return [dev.get_dest_macs() for dev in self.devices]
423
424     def set_vlans(self, port_index, vlans):
425         """Set the list of vlans to use indexed by the chain id on given port.
426
427         port_index: the port for which VLANs must be set
428         vlans: a  list of vlan lists indexed by chain id
429         """
430         if len(vlans) != self.config.service_chain_count:
431             raise TrafficClientException('VLAN list %s must have %d entries' %
432                                          (vlans, self.config.service_chain_count))
433         self.devices[port_index].set_vlans(vlans)
434
435     def set_vxlans(self, port_index, vxlans):
436         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
437
438         port_index: the port for which VXLANs must be set
439         VXLANs: a  list of VNIs lists indexed by chain id
440         """
441         if len(vxlans) != self.config.service_chain_count:
442             raise TrafficClientException('VXLAN list %s must have %d entries' %
443                                          (vxlans, self.config.service_chain_count))
444         self.devices[port_index].set_vxlans(vxlans)
445
446     def set_vtep_vlan(self, port_index, vlan):
447         """Set the vtep vlan to use indexed by the chain id on given port.
448         port_index: the port for which VLAN must be set
449         """
450         self.devices[port_index].set_vtep_vlan(vlan)
451
452     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
453         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
454
455     @staticmethod
456     def __match_generator_profile(traffic_generator, generator_profile):
457         gen_config = AttrDict(traffic_generator)
458         gen_config.pop('default_profile')
459         gen_config.pop('generator_profile')
460         matching_profile = [profile for profile in traffic_generator.generator_profile if
461                             profile.name == generator_profile]
462         if len(matching_profile) != 1:
463             raise Exception('Traffic generator profile not found: ' + generator_profile)
464
465         gen_config.update(matching_profile[0])
466         return gen_config
467
468
469 class TrafficClient(object):
470     """Traffic generator client with NDR/PDR binary seearch."""
471
472     PORTS = [0, 1]
473
474     def __init__(self, config, notifier=None):
475         """Create a new TrafficClient instance.
476
477         config: nfvbench config
478         notifier: notifier (optional)
479
480         A new instance is created everytime the nfvbench config may have changed.
481         """
482         self.config = config
483         self.generator_config = GeneratorConfig(config)
484         self.tool = self.generator_config.tool
485         self.gen = self._get_generator()
486         self.notifier = notifier
487         self.interval_collector = None
488         self.iteration_collector = None
489         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
490         self.config.frame_sizes = self._get_frame_sizes()
491         self.run_config = {
492             'l2frame_size': None,
493             'duration_sec': self.config.duration_sec,
494             'bidirectional': True,
495             'rates': []  # to avoid unsbuscriptable-obj warning
496         }
497         self.current_total_rate = {'rate_percent': '10'}
498         if self.config.single_run:
499             self.current_total_rate = utils.parse_rate_str(self.config.rate)
500         self.ifstats = None
501         # Speed is either discovered when connecting to TG or set from config
502         # This variable is 0 if not yet discovered from TG or must be the speed of
503         # each interface in bits per second
504         self.intf_speed = self.generator_config.intf_speed
505
506     def _get_generator(self):
507         tool = self.tool.lower()
508         if tool == 'trex':
509             from traffic_gen import trex_gen
510             return trex_gen.TRex(self)
511         if tool == 'dummy':
512             from traffic_gen import dummy
513             return dummy.DummyTG(self)
514         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
515
516     def skip_sleep(self):
517         """Skip all sleeps when doing unit testing with dummy TG.
518
519         Must be overriden using mock.patch
520         """
521         return False
522
523     def _get_frame_sizes(self):
524         traffic_profile_name = self.config.traffic.profile
525         matching_profiles = [profile for profile in self.config.traffic_profile if
526                              profile.name == traffic_profile_name]
527         if len(matching_profiles) > 1:
528             raise TrafficClientException('Multiple traffic profiles with name: ' +
529                                          traffic_profile_name)
530         elif not matching_profiles:
531             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
532         return matching_profiles[0].l2frame_size
533
534     def start_traffic_generator(self):
535         """Start the traffic generator process (traffic not started yet)."""
536         self.gen.connect()
537         # pick up the interface speed if it is not set from config
538         intf_speeds = self.gen.get_port_speed_gbps()
539         # convert Gbps unit into bps
540         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
541         if self.intf_speed:
542             # interface speed is overriden from config
543             if self.intf_speed != tg_if_speed:
544                 # Warn the user if the speed in the config is different
545                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
546                             intf_speeds[0])
547         else:
548             # interface speed not provisioned by config
549             self.intf_speed = tg_if_speed
550             # also update the speed in the tg config
551             self.generator_config.intf_speed = tg_if_speed
552
553         # Save the traffic generator local MAC
554         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
555             device.set_mac(mac)
556
557     def setup(self):
558         """Set up the traffic client."""
559         self.gen.clear_stats()
560
561     def get_version(self):
562         """Get the traffic generator version."""
563         return self.gen.get_version()
564
565     def ensure_end_to_end(self):
566         """Ensure traffic generator receives packets it has transmitted.
567
568         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
569
570         VMs that are started and in active state may not pass traffic yet. It is imperative to make
571         sure that all VMs are passing traffic in both directions before starting any benchmarking.
572         To verify this, we need to send at a low frequency bi-directional packets and make sure
573         that we receive all packets back from all VMs. The number of flows is equal to 2 times
574         the number of chains (1 per direction) and we need to make sure we receive packets coming
575         from exactly 2 x chain count different source MAC addresses.
576
577         Example:
578             PVP chain (1 VM per chain)
579             N = 10 (number of chains)
580             Flow count = 20 (number of flows)
581             If the number of unique source MAC addresses from received packets is 20 then
582             all 10 VMs 10 VMs are in operational state.
583         """
584         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
585         # send 2pps on each chain and each direction
586         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
587         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
588                                 e2e=True)
589         # ensures enough traffic is coming back
590         retry_count = (self.config.check_traffic_time_sec +
591                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
592
593         # we expect to see packets coming from 2 unique MAC per chain
594         # because there can be flooding in the case of shared net
595         # we must verify that packets from the right VMs are received
596         # and not just count unique src MAC
597         # create a dict of (port, chain) tuples indexed by dest mac
598         mac_map = {}
599         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
600             for chain, mac in enumerate(dest_macs):
601                 mac_map[mac] = (port, chain)
602         unique_src_mac_count = len(mac_map)
603         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
604             get_mac_id = lambda packet: packet['binary'][60:66]
605         elif self.config.vxlan:
606             get_mac_id = lambda packet: packet['binary'][56:62]
607         else:
608             get_mac_id = lambda packet: packet['binary'][6:12]
609         for it in xrange(retry_count):
610             self.gen.clear_stats()
611             self.gen.start_traffic()
612             self.gen.start_capture()
613             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
614                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
615                      it + 1, retry_count)
616             if not self.skip_sleep():
617                 time.sleep(self.config.generic_poll_sec)
618             self.gen.stop_traffic()
619             self.gen.fetch_capture_packets()
620             self.gen.stop_capture()
621             for packet in self.gen.packet_list:
622                 mac_id = get_mac_id(packet)
623                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
624                 if src_mac in mac_map and self.is_udp(packet):
625                     port, chain = mac_map[src_mac]
626                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
627                              src_mac, chain, port)
628                     mac_map.pop(src_mac, None)
629
630                 if not mac_map:
631                     LOG.info('End-to-end connectivity established')
632                     return
633             if self.config.l3_router and not self.config.no_arp:
634                 # In case of L3 traffic mode, routers are not able to route traffic
635                 # until VM interfaces are up and ARP requests are done
636                 LOG.info('Waiting for loopback service completely started...')
637                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
638                 self.ensure_arp_successful()
639         raise TrafficClientException('End-to-end connectivity cannot be ensured')
640
641     def is_udp(self, packet):
642         pkt = Ether(packet['binary'])
643         return UDP in pkt
644
645     def ensure_arp_successful(self):
646         """Resolve all IP using ARP and throw an exception in case of failure."""
647         dest_macs = self.gen.resolve_arp()
648         if dest_macs:
649             # all dest macs are discovered, saved them into the generator config
650             if self.config.vxlan:
651                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
652                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
653             else:
654                 self.generator_config.set_dest_macs(0, dest_macs[0])
655                 self.generator_config.set_dest_macs(1, dest_macs[1])
656         else:
657             raise TrafficClientException('ARP cannot be resolved')
658
659     def set_traffic(self, frame_size, bidirectional):
660         """Reconfigure the traffic generator for a new frame size."""
661         self.run_config['bidirectional'] = bidirectional
662         self.run_config['l2frame_size'] = frame_size
663         self.run_config['rates'] = [self.get_per_direction_rate()]
664         if bidirectional:
665             self.run_config['rates'].append(self.get_per_direction_rate())
666         else:
667             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
668             if unidir_reverse_pps > 0:
669                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
670         # Fix for [NFVBENCH-67], convert the rate string to PPS
671         for idx, rate in enumerate(self.run_config['rates']):
672             if 'rate_pps' not in rate:
673                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
674
675         self.gen.clear_streamblock()
676         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
677
678     def _modify_load(self, load):
679         self.current_total_rate = {'rate_percent': str(load)}
680         rate_per_direction = self.get_per_direction_rate()
681
682         self.gen.modify_rate(rate_per_direction, False)
683         self.run_config['rates'][0] = rate_per_direction
684         if self.run_config['bidirectional']:
685             self.gen.modify_rate(rate_per_direction, True)
686             self.run_config['rates'][1] = rate_per_direction
687
688     def get_ndr_and_pdr(self):
689         """Start the NDR/PDR iteration and return the results."""
690         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
691         targets = {}
692         if self.config.ndr_run:
693             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
694             targets['ndr'] = self.config.measurement.NDR
695         if self.config.pdr_run:
696             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
697             targets['pdr'] = self.config.measurement.PDR
698
699         self.run_config['start_time'] = time.time()
700         self.interval_collector = IntervalCollector(self.run_config['start_time'])
701         self.interval_collector.attach_notifier(self.notifier)
702         self.iteration_collector = IterationCollector(self.run_config['start_time'])
703         results = {}
704         self.__range_search(0.0, 200.0, targets, results)
705
706         results['iteration_stats'] = {
707             'ndr_pdr': self.iteration_collector.get()
708         }
709
710         if self.config.ndr_run:
711             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
712             results['ndr']['time_taken_sec'] = \
713                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
714             if self.config.pdr_run:
715                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
716                 results['pdr']['time_taken_sec'] = \
717                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
718         else:
719             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
720             results['pdr']['time_taken_sec'] = \
721                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
722         return results
723
724     def __get_dropped_rate(self, result):
725         dropped_pkts = result['rx']['dropped_pkts']
726         total_pkts = result['tx']['total_pkts']
727         if not total_pkts:
728             return float('inf')
729         return float(dropped_pkts) / total_pkts * 100
730
731     def get_stats(self):
732         """Collect final stats for previous run."""
733         stats = self.gen.get_stats()
734         retDict = {'total_tx_rate': stats['total_tx_rate']}
735         for port in self.PORTS:
736             retDict[port] = {'tx': {}, 'rx': {}}
737
738         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
739         rx_keys = tx_keys + ['dropped_pkts']
740
741         for port in self.PORTS:
742             for key in tx_keys:
743                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
744             for key in rx_keys:
745                 try:
746                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
747                 except ValueError:
748                     retDict[port]['rx'][key] = 0
749             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
750                 stats[port]['rx']['avg_delay_usec'])
751             retDict[port]['rx']['min_delay_usec'] = cast_integer(
752                 stats[port]['rx']['min_delay_usec'])
753             retDict[port]['rx']['max_delay_usec'] = cast_integer(
754                 stats[port]['rx']['max_delay_usec'])
755             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
756
757         ports = sorted(retDict.keys())
758         if self.run_config['bidirectional']:
759             retDict['overall'] = {'tx': {}, 'rx': {}}
760             for key in tx_keys:
761                 retDict['overall']['tx'][key] = \
762                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
763             for key in rx_keys:
764                 retDict['overall']['rx'][key] = \
765                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
766             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
767                           retDict[ports[1]]['rx']['total_pkts']]
768             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
769                           retDict[ports[1]]['rx']['avg_delay_usec']]
770             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
771                           retDict[ports[1]]['rx']['max_delay_usec']]
772             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
773                           retDict[ports[1]]['rx']['min_delay_usec']]
774             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
775             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
776             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
777             for key in ['pkt_bit_rate', 'pkt_rate']:
778                 for dirc in ['tx', 'rx']:
779                     retDict['overall'][dirc][key] /= 2.0
780         else:
781             retDict['overall'] = retDict[ports[0]]
782         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
783         return retDict
784
785     def __convert_rates(self, rate):
786         return utils.convert_rates(self.run_config['l2frame_size'],
787                                    rate,
788                                    self.intf_speed)
789
790     def __ndr_pdr_found(self, tag, load):
791         rates = self.__convert_rates({'rate_percent': load})
792         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
793         last_stats = self.iteration_collector.peek()
794         self.interval_collector.add_ndr_pdr(tag, last_stats)
795
796     def __format_output_stats(self, stats):
797         for key in self.PORTS + ['overall']:
798             interface = stats[key]
799             stats[key] = {
800                 'tx_pkts': interface['tx']['total_pkts'],
801                 'rx_pkts': interface['rx']['total_pkts'],
802                 'drop_percentage': interface['drop_rate_percent'],
803                 'drop_pct': interface['rx']['dropped_pkts'],
804                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
805                 'max_delay_usec': interface['rx']['max_delay_usec'],
806                 'min_delay_usec': interface['rx']['min_delay_usec'],
807             }
808
809         return stats
810
811     def __targets_found(self, rate, targets, results):
812         for tag, target in targets.iteritems():
813             LOG.info('Found %s (%s) load: %s', tag, target, rate)
814             self.__ndr_pdr_found(tag, rate)
815             results[tag]['timestamp_sec'] = time.time()
816
817     def __range_search(self, left, right, targets, results):
818         """Perform a binary search for a list of targets inside a [left..right] range or rate.
819
820         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
821                 indicating the rate to send on each interface
822         right   the right side of the range to search as a % of line rate
823                 indicating the rate to send on each interface
824         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
825                 ('ndr', 'pdr')
826         results a dict to store results
827         """
828         if not targets:
829             return
830         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
831
832         # Terminate search when gap is less than load epsilon
833         if right - left < self.config.measurement.load_epsilon:
834             self.__targets_found(left, targets, results)
835             return
836
837         # Obtain the average drop rate in for middle load
838         middle = (left + right) / 2.0
839         try:
840             stats, rates = self.__run_search_iteration(middle)
841         except STLError:
842             LOG.exception("Got exception from traffic generator during binary search")
843             self.__targets_found(left, targets, results)
844             return
845         # Split target dicts based on the avg drop rate
846         left_targets = {}
847         right_targets = {}
848         for tag, target in targets.iteritems():
849             if stats['overall']['drop_rate_percent'] <= target:
850                 # record the best possible rate found for this target
851                 results[tag] = rates
852                 results[tag].update({
853                     'load_percent_per_direction': middle,
854                     'stats': self.__format_output_stats(dict(stats)),
855                     'timestamp_sec': None
856                 })
857                 right_targets[tag] = target
858             else:
859                 # initialize to 0 all fields of result for
860                 # the worst case scenario of the binary search (if ndr/pdr is not found)
861                 if tag not in results:
862                     results[tag] = dict.fromkeys(rates, 0)
863                     empty_stats = self.__format_output_stats(dict(stats))
864                     for key in empty_stats:
865                         if isinstance(empty_stats[key], dict):
866                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
867                         else:
868                             empty_stats[key] = 0
869                     results[tag].update({
870                         'load_percent_per_direction': 0,
871                         'stats': empty_stats,
872                         'timestamp_sec': None
873                     })
874                 left_targets[tag] = target
875
876         # search lower half
877         self.__range_search(left, middle, left_targets, results)
878
879         # search upper half only if the upper rate does not exceed
880         # 100%, this only happens when the first search at 100%
881         # yields a DR that is < target DR
882         if middle >= 100:
883             self.__targets_found(100, right_targets, results)
884         else:
885             self.__range_search(middle, right, right_targets, results)
886
887     def __run_search_iteration(self, rate):
888         """Run one iteration at the given rate level.
889
890         rate: the rate to send on each port in percent (0 to 100)
891         """
892         self._modify_load(rate)
893
894         # poll interval stats and collect them
895         for stats in self.run_traffic():
896             self.interval_collector.add(stats)
897             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
898             if time_elapsed_ratio >= 1:
899                 self.cancel_traffic()
900                 if not self.skip_sleep():
901                     time.sleep(self.config.pause_sec)
902         self.interval_collector.reset()
903
904         # get stats from the run
905         stats = self.runner.client.get_stats()
906         current_traffic_config = self._get_traffic_config()
907         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
908                                         stats['total_tx_rate'])
909         if warning is not None:
910             stats['warning'] = warning
911
912         # save reliable stats from whole iteration
913         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
914         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
915         return stats, current_traffic_config['direction-total']
916
917     @staticmethod
918     def log_stats(stats):
919         """Log estimated stats during run."""
920         report = {
921             'datetime': str(datetime.now()),
922             'tx_packets': stats['overall']['tx']['total_pkts'],
923             'rx_packets': stats['overall']['rx']['total_pkts'],
924             'drop_packets': stats['overall']['rx']['dropped_pkts'],
925             'drop_rate_percent': stats['overall']['drop_rate_percent']
926         }
927         LOG.info('TX: %(tx_packets)d; '
928                  'RX: %(rx_packets)d; '
929                  'Est. Dropped: %(drop_packets)d; '
930                  'Est. Drop rate: %(drop_rate_percent).4f%%',
931                  report)
932
933     def run_traffic(self):
934         """Start traffic and return intermediate stats for each interval."""
935         stats = self.runner.run()
936         while self.runner.is_running:
937             self.log_stats(stats)
938             yield stats
939             stats = self.runner.poll_stats()
940             if stats is None:
941                 return
942         self.log_stats(stats)
943         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
944         yield stats
945
946     def cancel_traffic(self):
947         """Stop traffic."""
948         self.runner.stop()
949
950     def _get_traffic_config(self):
951         config = {}
952         load_total = 0.0
953         bps_total = 0.0
954         pps_total = 0.0
955         for idx, rate in enumerate(self.run_config['rates']):
956             key = 'direction-forward' if idx == 0 else 'direction-reverse'
957             config[key] = {
958                 'l2frame_size': self.run_config['l2frame_size'],
959                 'duration_sec': self.run_config['duration_sec']
960             }
961             config[key].update(rate)
962             config[key].update(self.__convert_rates(rate))
963             load_total += float(config[key]['rate_percent'])
964             bps_total += float(config[key]['rate_bps'])
965             pps_total += float(config[key]['rate_pps'])
966         config['direction-total'] = dict(config['direction-forward'])
967         config['direction-total'].update({
968             'rate_percent': load_total,
969             'rate_pps': cast_integer(pps_total),
970             'rate_bps': bps_total
971         })
972
973         return config
974
975     def get_run_config(self, results):
976         """Return configuration which was used for the last run."""
977         r = {}
978         # because we want each direction to have the far end RX rates,
979         # use the far end index (1-idx) to retrieve the RX rates
980         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
981             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
982             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
983             r[key] = {
984                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
985                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
986                 "rx": self.__convert_rates({'rate_pps': rx_rate})
987             }
988
989         total = {}
990         for direction in ['orig', 'tx', 'rx']:
991             total[direction] = {}
992             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
993                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
994
995         r['direction-total'] = total
996         return r
997
998     def insert_interface_stats(self, pps_list):
999         """Insert interface stats to a list of packet path stats.
1000
1001         pps_list: a list of packet path stats instances indexed by chain index
1002
1003         This function will insert the packet path stats for the traffic gen ports 0 and 1
1004         with itemized per chain tx/rx counters.
1005         There will be as many packet path stats as chains.
1006         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1007         self.pps_list:
1008         [
1009         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1010         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1011         ...
1012         ]
1013         """
1014         def get_if_stats(chain_idx):
1015             return [InterfaceStats('p' + str(port), self.tool)
1016                     for port in range(2)]
1017         # keep the list of list of interface stats indexed by the chain id
1018         self.ifstats = [get_if_stats(chain_idx)
1019                         for chain_idx in range(self.config.service_chain_count)]
1020         # note that we need to make a copy of the ifs list so that any modification in the
1021         # list from pps will not change the list saved in self.ifstats
1022         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1023         # insert the corresponding pps in the passed list
1024         pps_list.extend(self.pps_list)
1025
1026     def update_interface_stats(self, diff=False):
1027         """Update all interface stats.
1028
1029         diff: if False, simply refresh the interface stats values with latest values
1030               if True, diff the interface stats with the latest values
1031         Make sure that the interface stats inserted in insert_interface_stats() are updated
1032         with proper values.
1033         self.ifstats:
1034         [
1035         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1036         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1037         ...
1038         ]
1039         """
1040         if diff:
1041             stats = self.gen.get_stats()
1042             for chain_idx, ifs in enumerate(self.ifstats):
1043                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1044                 # corresponding to the
1045                 # port 0 and port 1 for the given chain_idx
1046                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1047                 # interface stats for the pps because it could have been modified to contain
1048                 # additional interface stats
1049                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1050             # special handling for vxlan
1051             # in case of vxlan, flow stats are not available so all rx counters will be
1052             # zeros when the total rx port counter is non zero.
1053             # in that case,
1054             for port in range(2):
1055                 total_rx = 0
1056                 for ifs in self.ifstats:
1057                     total_rx += ifs[port].rx
1058                 if total_rx == 0:
1059                     # check if the total port rx from Trex is also zero
1060                     port_rx = stats[port]['rx']['total_pkts']
1061                     if port_rx:
1062                         # the total rx for all chains from port level stats is non zero
1063                         # which means that the per-chain stats are not available
1064                         if len(self.ifstats) == 1:
1065                             # only one chain, simply report the port level rx to the chain rx stats
1066                             self.ifstats[0][port].rx = port_rx
1067                         else:
1068                             for ifs in self.ifstats:
1069                                 # mark this data as unavailable
1070                                 ifs[port].rx = None
1071                             # pitch in the total rx only in the last chain pps
1072                             self.ifstats[-1][port].rx_total = port_rx
1073
1074     @staticmethod
1075     def compare_tx_rates(required, actual):
1076         """Compare the actual TX rate to the required TX rate."""
1077         threshold = 0.9
1078         are_different = False
1079         try:
1080             if float(actual) / required < threshold:
1081                 are_different = True
1082         except ZeroDivisionError:
1083             are_different = True
1084
1085         if are_different:
1086             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1087                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1088                   "to achieve the requested TX rate.".format(r=required, a=actual)
1089             LOG.info(msg)
1090             return msg
1091
1092         return None
1093
1094     def get_per_direction_rate(self):
1095         """Get the rate for each direction."""
1096         divisor = 2 if self.run_config['bidirectional'] else 1
1097         if 'rate_percent' in self.current_total_rate:
1098             # don't split rate if it's percentage
1099             divisor = 1
1100
1101         return utils.divide_rate(self.current_total_rate, divisor)
1102
1103     def close(self):
1104         """Close this instance."""
1105         try:
1106             self.gen.stop_traffic()
1107         except Exception:
1108             pass
1109         self.gen.clear_stats()
1110         self.gen.cleanup()