Add L3 traffic management with Neutron routers
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: enable=import-error
30
31 from log import LOG
32 from packet_stats import InterfaceStats
33 from packet_stats import PacketPathStats
34 from stats_collector import IntervalCollector
35 from stats_collector import IterationCollector
36 import traffic_gen.traffic_utils as utils
37 from utils import cast_integer
38
39
40 class TrafficClientException(Exception):
41     """Generic traffic client exception."""
42
43     pass
44
45
46 class TrafficRunner(object):
47     """Serialize various steps required to run traffic."""
48
49     def __init__(self, client, duration_sec, interval_sec=0):
50         """Create a traffic runner."""
51         self.client = client
52         self.start_time = None
53         self.duration_sec = duration_sec
54         self.interval_sec = interval_sec
55
56     def run(self):
57         """Clear stats and instruct the traffic generator to start generating traffic."""
58         if self.is_running():
59             return None
60         LOG.info('Running traffic generator')
61         self.client.gen.clear_stats()
62         self.client.gen.start_traffic()
63         self.start_time = time.time()
64         return self.poll_stats()
65
66     def stop(self):
67         """Stop the current run and instruct the traffic generator to stop traffic."""
68         if self.is_running():
69             self.start_time = None
70             self.client.gen.stop_traffic()
71
72     def is_running(self):
73         """Check if a run is still pending."""
74         return self.start_time is not None
75
76     def time_elapsed(self):
77         """Return time elapsed since start of run."""
78         if self.is_running():
79             return time.time() - self.start_time
80         return self.duration_sec
81
82     def poll_stats(self):
83         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
84
85         return: latest stats or None if traffic is stopped
86         """
87         if not self.is_running():
88             return None
89         if self.client.skip_sleep():
90             self.stop()
91             return self.client.get_stats()
92         time_elapsed = self.time_elapsed()
93         if time_elapsed > self.duration_sec:
94             self.stop()
95             return None
96         time_left = self.duration_sec - time_elapsed
97         if self.interval_sec > 0.0:
98             if time_left <= self.interval_sec:
99                 time.sleep(time_left)
100                 self.stop()
101             else:
102                 time.sleep(self.interval_sec)
103         else:
104             time.sleep(self.duration_sec)
105             self.stop()
106         return self.client.get_stats()
107
108
109 class IpBlock(object):
110     """Manage a block of IP addresses."""
111
112     def __init__(self, base_ip, step_ip, count_ip):
113         """Create an IP block."""
114         self.base_ip_int = Device.ip_to_int(base_ip)
115         self.step = Device.ip_to_int(step_ip)
116         self.max_available = count_ip
117         self.next_free = 0
118
119     def get_ip(self, index=0):
120         """Return the IP address at given index."""
121         if index < 0 or index >= self.max_available:
122             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
123         return Device.int_to_ip(self.base_ip_int + index * self.step)
124
125     def reserve_ip_range(self, count):
126         """Reserve a range of count consecutive IP addresses spaced by step."""
127         if self.next_free + count > self.max_available:
128             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
129                              (self.next_free,
130                               self.max_available,
131                               count))
132         first_ip = self.get_ip(self.next_free)
133         last_ip = self.get_ip(self.next_free + count - 1)
134         self.next_free += count
135         return (first_ip, last_ip)
136
137     def reset_reservation(self):
138         """Reset all reservations and restart with a completely unused IP block."""
139         self.next_free = 0
140
141
142 class Device(object):
143     """Represent a port device and all information associated to it.
144
145     In the curent version we only support 2 port devices for the traffic generator
146     identified as port 0 or port 1.
147     """
148
149     def __init__(self, port, generator_config):
150         """Create a new device for a given port."""
151         self.generator_config = generator_config
152         self.chain_count = generator_config.service_chain_count
153         self.flow_count = generator_config.flow_count / 2
154         self.port = port
155         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
156         self.vtep_vlan = None
157         self.vtep_src_mac = None
158         self.vxlan = False
159         self.pci = generator_config.interfaces[port].pci
160         self.mac = None
161         self.dest_macs = None
162         self.vtep_dst_mac = None
163         self.vtep_dst_ip = None
164         if generator_config.vteps is None:
165             self.vtep_src_ip = None
166         else:
167             self.vtep_src_ip = generator_config.vteps[port]
168         self.vnis = None
169         self.vlans = None
170         self.ip_addrs = generator_config.ip_addrs[port]
171         subnet = IPNetwork(self.ip_addrs)
172         self.ip = subnet.ip.format()
173         self.ip_addrs_step = generator_config.ip_addrs_step
174         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
175         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
176                                    generator_config.gateway_ip_addrs_step,
177                                    self.chain_count)
178         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
179         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
180                                       generator_config.tg_gateway_ip_addrs_step,
181                                       self.chain_count)
182         self.udp_src_port = generator_config.udp_src_port
183         self.udp_dst_port = generator_config.udp_dst_port
184
185     def set_mac(self, mac):
186         """Set the local MAC for this port device."""
187         if mac is None:
188             raise TrafficClientException('Trying to set traffic generator MAC address as None')
189         self.mac = mac
190
191     def get_peer_device(self):
192         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
193         return self.generator_config.devices[1 - self.port]
194
195     def set_vtep_dst_mac(self, dest_macs):
196         """Set the list of dest MACs indexed by the chain id.
197
198         This is only called in 2 cases:
199         - VM macs discovered using openstack API
200         - dest MACs provisioned in config file
201         """
202         self.vtep_dst_mac = map(str, dest_macs)
203
204     def set_dest_macs(self, dest_macs):
205         """Set the list of dest MACs indexed by the chain id.
206
207         This is only called in 2 cases:
208         - VM macs discovered using openstack API
209         - dest MACs provisioned in config file
210         """
211         self.dest_macs = map(str, dest_macs)
212
213     def get_dest_macs(self):
214         """Get the list of dest macs for this device.
215
216         If set_dest_macs was never called, assumes l2-loopback and return
217         a list of peer mac (as many as chains but normally only 1 chain)
218         """
219         if self.dest_macs:
220             return self.dest_macs
221         # assume this is l2-loopback
222         return [self.get_peer_device().mac] * self.chain_count
223
224     def set_vlans(self, vlans):
225         """Set the list of vlans to use indexed by the chain id."""
226         self.vlans = vlans
227         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
228
229     def set_vtep_vlan(self, vlan):
230         """Set the vtep vlan to use indexed by specific port."""
231         self.vtep_vlan = vlan
232         self.vxlan = True
233         self.vlan_tagging = None
234         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
235
236     def set_vxlan_endpoints(self, src_ip, dst_ip):
237         self.vtep_dst_ip = dst_ip
238         self.vtep_src_ip = src_ip
239         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
240                  self.vtep_src_ip, self.vtep_dst_ip)
241
242     def set_vxlans(self, vnis):
243         self.vnis = vnis
244         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
245
246     def set_gw_ip(self, gateway_ip):
247         self.gw_ip_block = IpBlock(gateway_ip,
248                                    self.generator_config.gateway_ip_addrs_step,
249                                    self.chain_count)
250
251     def get_gw_ip(self, chain_index):
252         """Retrieve the IP address assigned for the gateway of a given chain."""
253         return self.gw_ip_block.get_ip(chain_index)
254
255     def get_stream_configs(self):
256         """Get the stream config for a given chain on this device.
257
258         Called by the traffic generator driver to program the traffic generator properly
259         before generating traffic
260         """
261         configs = []
262         # exact flow count for each chain is calculated as follows:
263         # - all chains except the first will have the same flow count
264         #   calculated as (total_flows + chain_count - 1) / chain_count
265         # - the first chain will have the remainder
266         # example 11 flows and 3 chains => 3, 4, 4
267         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
268         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
269         peer = self.get_peer_device()
270         self.ip_block.reset_reservation()
271         peer.ip_block.reset_reservation()
272         dest_macs = self.get_dest_macs()
273
274         for chain_idx in xrange(self.chain_count):
275             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
276             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
277
278             configs.append({
279                 'count': cur_chain_flow_count,
280                 'mac_src': self.mac,
281                 'mac_dst': dest_macs[chain_idx],
282                 'ip_src_addr': src_ip_first,
283                 'ip_src_addr_max': src_ip_last,
284                 'ip_src_count': cur_chain_flow_count,
285                 'ip_dst_addr': dst_ip_first,
286                 'ip_dst_addr_max': dst_ip_last,
287                 'ip_dst_count': cur_chain_flow_count,
288                 'ip_addrs_step': self.ip_addrs_step,
289                 'udp_src_port': self.udp_src_port,
290                 'udp_dst_port': self.udp_dst_port,
291                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
292                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
293                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
294                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
295                 'vxlan': self.vxlan,
296                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
297                 'vtep_src_mac': self.mac if self.vxlan is True else None,
298                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
299                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
300                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
301                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
302             })
303             # after first chain, fall back to the flow count for all other chains
304             cur_chain_flow_count = flows_per_chain
305         return configs
306
307     @staticmethod
308     def ip_to_int(addr):
309         """Convert an IP address from string to numeric."""
310         return struct.unpack("!I", socket.inet_aton(addr))[0]
311
312     @staticmethod
313     def int_to_ip(nvalue):
314         """Convert an IP address from numeric to string."""
315         return socket.inet_ntoa(struct.pack("!I", nvalue))
316
317
318 class GeneratorConfig(object):
319     """Represents traffic configuration for currently running traffic profile."""
320
321     DEFAULT_IP_STEP = '0.0.0.1'
322     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
323
324     def __init__(self, config):
325         """Create a generator config."""
326         self.config = config
327         # name of the generator profile (normally trex or dummy)
328         # pick the default one if not specified explicitly from cli options
329         if not config.generator_profile:
330             config.generator_profile = config.traffic_generator.default_profile
331         # pick up the profile dict based on the name
332         gen_config = self.__match_generator_profile(config.traffic_generator,
333                                                     config.generator_profile)
334         self.gen_config = gen_config
335         # copy over fields from the dict
336         self.tool = gen_config.tool
337         self.ip = gen_config.ip
338         # overrides on config.cores and config.mbuf_factor
339         if config.cores:
340             self.cores = config.cores
341         else:
342             self.cores = gen_config.get('cores', 1)
343         self.mbuf_factor = config.mbuf_factor
344         if gen_config.intf_speed:
345             # interface speed is overriden from config
346             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
347         else:
348             # interface speed is discovered/provided by the traffic generator
349             self.intf_speed = 0
350         self.name = gen_config.name
351         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
352         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
353         self.limit_memory = gen_config.get('limit_memory', 1024)
354         self.software_mode = gen_config.get('software_mode', False)
355         self.interfaces = gen_config.interfaces
356         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
357             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
358         if hasattr(gen_config, 'platform'):
359             self.platform = gen_config.platform
360         self.service_chain = config.service_chain
361         self.service_chain_count = config.service_chain_count
362         self.flow_count = config.flow_count
363         self.host_name = gen_config.host_name
364
365         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
366         self.ip_addrs = gen_config.ip_addrs
367         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
368         self.tg_gateway_ip_addrs_step = \
369             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
370         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
371         self.gateway_ips = gen_config.gateway_ip_addrs
372         self.udp_src_port = gen_config.udp_src_port
373         self.udp_dst_port = gen_config.udp_dst_port
374         self.vteps = gen_config.get('vteps')
375         self.devices = [Device(port, self) for port in [0, 1]]
376         # This should normally always be [0, 1]
377         self.ports = [device.port for device in self.devices]
378
379         # check that pci is not empty
380         if not gen_config.interfaces[0].get('pci', None) or \
381            not gen_config.interfaces[1].get('pci', None):
382             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
383
384         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
385         self.vlan_tagging = config.vlan_tagging
386
387         # needed for result/summarizer
388         config['tg-name'] = gen_config.name
389         config['tg-tool'] = self.tool
390
391     def to_json(self):
392         """Get json form to display the content into the overall result dict."""
393         return dict(self.gen_config)
394
395     def set_dest_macs(self, port_index, dest_macs):
396         """Set the list of dest MACs indexed by the chain id on given port.
397
398         port_index: the port for which dest macs must be set
399         dest_macs: a list of dest MACs indexed by chain id
400         """
401         if len(dest_macs) < self.config.service_chain_count:
402             raise TrafficClientException('Dest MAC list %s must have %d entries' %
403                                          (dest_macs, self.config.service_chain_count))
404         # only pass the first scc dest MACs
405         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
406         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
407
408     def set_vtep_dest_macs(self, port_index, dest_macs):
409         """Set the list of dest MACs indexed by the chain id on given port.
410
411         port_index: the port for which dest macs must be set
412         dest_macs: a list of dest MACs indexed by chain id
413         """
414         if len(dest_macs) != self.config.service_chain_count:
415             raise TrafficClientException('Dest MAC list %s must have %d entries' %
416                                          (dest_macs, self.config.service_chain_count))
417         self.devices[port_index].set_vtep_dst_mac(dest_macs)
418         LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
419
420     def get_dest_macs(self):
421         """Return the list of dest macs indexed by port."""
422         return [dev.get_dest_macs() for dev in self.devices]
423
424     def set_vlans(self, port_index, vlans):
425         """Set the list of vlans to use indexed by the chain id on given port.
426
427         port_index: the port for which VLANs must be set
428         vlans: a  list of vlan lists indexed by chain id
429         """
430         if len(vlans) != self.config.service_chain_count:
431             raise TrafficClientException('VLAN list %s must have %d entries' %
432                                          (vlans, self.config.service_chain_count))
433         self.devices[port_index].set_vlans(vlans)
434
435     def set_vxlans(self, port_index, vxlans):
436         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
437
438         port_index: the port for which VXLANs must be set
439         VXLANs: a  list of VNIs lists indexed by chain id
440         """
441         if len(vxlans) != self.config.service_chain_count:
442             raise TrafficClientException('VXLAN list %s must have %d entries' %
443                                          (vxlans, self.config.service_chain_count))
444         self.devices[port_index].set_vxlans(vxlans)
445
446     def set_vtep_vlan(self, port_index, vlan):
447         """Set the vtep vlan to use indexed by the chain id on given port.
448         port_index: the port for which VLAN must be set
449         """
450         self.devices[port_index].set_vtep_vlan(vlan)
451
452     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
453         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
454
455     @staticmethod
456     def __match_generator_profile(traffic_generator, generator_profile):
457         gen_config = AttrDict(traffic_generator)
458         gen_config.pop('default_profile')
459         gen_config.pop('generator_profile')
460         matching_profile = [profile for profile in traffic_generator.generator_profile if
461                             profile.name == generator_profile]
462         if len(matching_profile) != 1:
463             raise Exception('Traffic generator profile not found: ' + generator_profile)
464
465         gen_config.update(matching_profile[0])
466         return gen_config
467
468
469 class TrafficClient(object):
470     """Traffic generator client with NDR/PDR binary seearch."""
471
472     PORTS = [0, 1]
473
474     def __init__(self, config, notifier=None):
475         """Create a new TrafficClient instance.
476
477         config: nfvbench config
478         notifier: notifier (optional)
479
480         A new instance is created everytime the nfvbench config may have changed.
481         """
482         self.config = config
483         self.generator_config = GeneratorConfig(config)
484         self.tool = self.generator_config.tool
485         self.gen = self._get_generator()
486         self.notifier = notifier
487         self.interval_collector = None
488         self.iteration_collector = None
489         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec)
490         self.config.frame_sizes = self._get_frame_sizes()
491         self.run_config = {
492             'l2frame_size': None,
493             'duration_sec': self.config.duration_sec,
494             'bidirectional': True,
495             'rates': []  # to avoid unsbuscriptable-obj warning
496         }
497         self.current_total_rate = {'rate_percent': '10'}
498         if self.config.single_run:
499             self.current_total_rate = utils.parse_rate_str(self.config.rate)
500         self.ifstats = None
501         # Speed is either discovered when connecting to TG or set from config
502         # This variable is 0 if not yet discovered from TG or must be the speed of
503         # each interface in bits per second
504         self.intf_speed = self.generator_config.intf_speed
505
506     def _get_generator(self):
507         tool = self.tool.lower()
508         if tool == 'trex':
509             from traffic_gen import trex_gen
510             return trex_gen.TRex(self)
511         if tool == 'dummy':
512             from traffic_gen import dummy
513             return dummy.DummyTG(self)
514         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
515
516     def skip_sleep(self):
517         """Skip all sleeps when doing unit testing with dummy TG.
518
519         Must be overriden using mock.patch
520         """
521         return False
522
523     def _get_frame_sizes(self):
524         traffic_profile_name = self.config.traffic.profile
525         matching_profiles = [profile for profile in self.config.traffic_profile if
526                              profile.name == traffic_profile_name]
527         if len(matching_profiles) > 1:
528             raise TrafficClientException('Multiple traffic profiles with name: ' +
529                                          traffic_profile_name)
530         elif not matching_profiles:
531             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
532         return matching_profiles[0].l2frame_size
533
534     def start_traffic_generator(self):
535         """Start the traffic generator process (traffic not started yet)."""
536         self.gen.connect()
537         # pick up the interface speed if it is not set from config
538         intf_speeds = self.gen.get_port_speed_gbps()
539         # convert Gbps unit into bps
540         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
541         if self.intf_speed:
542             # interface speed is overriden from config
543             if self.intf_speed != tg_if_speed:
544                 # Warn the user if the speed in the config is different
545                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
546                             intf_speeds[0])
547         else:
548             # interface speed not provisioned by config
549             self.intf_speed = tg_if_speed
550             # also update the speed in the tg config
551             self.generator_config.intf_speed = tg_if_speed
552
553         # Save the traffic generator local MAC
554         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
555             device.set_mac(mac)
556
557     def setup(self):
558         """Set up the traffic client."""
559         self.gen.clear_stats()
560
561     def get_version(self):
562         """Get the traffic generator version."""
563         return self.gen.get_version()
564
565     def ensure_end_to_end(self):
566         """Ensure traffic generator receives packets it has transmitted.
567
568         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
569
570         VMs that are started and in active state may not pass traffic yet. It is imperative to make
571         sure that all VMs are passing traffic in both directions before starting any benchmarking.
572         To verify this, we need to send at a low frequency bi-directional packets and make sure
573         that we receive all packets back from all VMs. The number of flows is equal to 2 times
574         the number of chains (1 per direction) and we need to make sure we receive packets coming
575         from exactly 2 x chain count different source MAC addresses.
576
577         Example:
578             PVP chain (1 VM per chain)
579             N = 10 (number of chains)
580             Flow count = 20 (number of flows)
581             If the number of unique source MAC addresses from received packets is 20 then
582             all 10 VMs 10 VMs are in operational state.
583         """
584         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
585         # send 2pps on each chain and each direction
586         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
587         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
588                                 e2e=True)
589         # ensures enough traffic is coming back
590         retry_count = (self.config.check_traffic_time_sec +
591                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
592
593         # we expect to see packets coming from 2 unique MAC per chain
594         # because there can be flooding in the case of shared net
595         # we must verify that packets from the right VMs are received
596         # and not just count unique src MAC
597         # create a dict of (port, chain) tuples indexed by dest mac
598         mac_map = {}
599         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
600             for chain, mac in enumerate(dest_macs):
601                 mac_map[mac] = (port, chain)
602         unique_src_mac_count = len(mac_map)
603         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
604             get_mac_id = lambda packet: packet['binary'][60:66]
605         elif self.config.vxlan:
606             get_mac_id = lambda packet: packet['binary'][56:62]
607         else:
608             get_mac_id = lambda packet: packet['binary'][6:12]
609         for it in xrange(retry_count):
610             self.gen.clear_stats()
611             self.gen.start_traffic()
612             self.gen.start_capture()
613             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
614                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
615                      it + 1, retry_count)
616             if not self.skip_sleep():
617                 time.sleep(self.config.generic_poll_sec)
618             self.gen.stop_traffic()
619             self.gen.fetch_capture_packets()
620             self.gen.stop_capture()
621             for packet in self.gen.packet_list:
622                 mac_id = get_mac_id(packet)
623                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
624                 if src_mac in mac_map and self.is_udp(packet):
625                     port, chain = mac_map[src_mac]
626                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
627                              src_mac, chain, port)
628                     mac_map.pop(src_mac, None)
629
630                 if not mac_map:
631                     LOG.info('End-to-end connectivity established')
632                     return
633             if self.config.l3_router and not self.config.no_arp:
634                 # In case of L3 traffic mode, routers are not able to route traffic
635                 # until VM interfaces are up and ARP requests are done
636                 LOG.info('Waiting for loopback service completely started...')
637                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
638                 self.ensure_arp_successful()
639         raise TrafficClientException('End-to-end connectivity cannot be ensured')
640
641     def is_udp(self, packet):
642         pkt = Ether(packet['binary'])
643         return UDP in pkt
644
645     def ensure_arp_successful(self):
646         """Resolve all IP using ARP and throw an exception in case of failure."""
647         dest_macs = self.gen.resolve_arp()
648         if dest_macs:
649             # all dest macs are discovered, saved them into the generator config
650             if self.config.vxlan:
651                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
652                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
653             else:
654                 self.generator_config.set_dest_macs(0, dest_macs[0])
655                 self.generator_config.set_dest_macs(1, dest_macs[1])
656         else:
657             raise TrafficClientException('ARP cannot be resolved')
658
659     def set_traffic(self, frame_size, bidirectional):
660         """Reconfigure the traffic generator for a new frame size."""
661         self.run_config['bidirectional'] = bidirectional
662         self.run_config['l2frame_size'] = frame_size
663         self.run_config['rates'] = [self.get_per_direction_rate()]
664         if bidirectional:
665             self.run_config['rates'].append(self.get_per_direction_rate())
666         else:
667             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
668             if unidir_reverse_pps > 0:
669                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
670         # Fix for [NFVBENCH-67], convert the rate string to PPS
671         for idx, rate in enumerate(self.run_config['rates']):
672             if 'rate_pps' not in rate:
673                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
674
675         self.gen.clear_streamblock()
676         if not self.config.vxlan:
677             self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
678                                     latency=True)
679         else:
680             self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional,
681                                     latency=False)
682
683     def _modify_load(self, load):
684         self.current_total_rate = {'rate_percent': str(load)}
685         rate_per_direction = self.get_per_direction_rate()
686
687         self.gen.modify_rate(rate_per_direction, False)
688         self.run_config['rates'][0] = rate_per_direction
689         if self.run_config['bidirectional']:
690             self.gen.modify_rate(rate_per_direction, True)
691             self.run_config['rates'][1] = rate_per_direction
692
693     def get_ndr_and_pdr(self):
694         """Start the NDR/PDR iteration and return the results."""
695         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
696         targets = {}
697         if self.config.ndr_run:
698             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
699             targets['ndr'] = self.config.measurement.NDR
700         if self.config.pdr_run:
701             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
702             targets['pdr'] = self.config.measurement.PDR
703
704         self.run_config['start_time'] = time.time()
705         self.interval_collector = IntervalCollector(self.run_config['start_time'])
706         self.interval_collector.attach_notifier(self.notifier)
707         self.iteration_collector = IterationCollector(self.run_config['start_time'])
708         results = {}
709         self.__range_search(0.0, 200.0, targets, results)
710
711         results['iteration_stats'] = {
712             'ndr_pdr': self.iteration_collector.get()
713         }
714
715         if self.config.ndr_run:
716             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
717             results['ndr']['time_taken_sec'] = \
718                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
719             if self.config.pdr_run:
720                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
721                 results['pdr']['time_taken_sec'] = \
722                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
723         else:
724             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
725             results['pdr']['time_taken_sec'] = \
726                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
727         return results
728
729     def __get_dropped_rate(self, result):
730         dropped_pkts = result['rx']['dropped_pkts']
731         total_pkts = result['tx']['total_pkts']
732         if not total_pkts:
733             return float('inf')
734         return float(dropped_pkts) / total_pkts * 100
735
736     def get_stats(self):
737         """Collect final stats for previous run."""
738         stats = self.gen.get_stats()
739         retDict = {'total_tx_rate': stats['total_tx_rate']}
740         for port in self.PORTS:
741             retDict[port] = {'tx': {}, 'rx': {}}
742
743         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
744         rx_keys = tx_keys + ['dropped_pkts']
745
746         for port in self.PORTS:
747             for key in tx_keys:
748                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
749             for key in rx_keys:
750                 try:
751                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
752                 except ValueError:
753                     retDict[port]['rx'][key] = 0
754             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
755                 stats[port]['rx']['avg_delay_usec'])
756             retDict[port]['rx']['min_delay_usec'] = cast_integer(
757                 stats[port]['rx']['min_delay_usec'])
758             retDict[port]['rx']['max_delay_usec'] = cast_integer(
759                 stats[port]['rx']['max_delay_usec'])
760             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
761
762         ports = sorted(retDict.keys())
763         if self.run_config['bidirectional']:
764             retDict['overall'] = {'tx': {}, 'rx': {}}
765             for key in tx_keys:
766                 retDict['overall']['tx'][key] = \
767                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
768             for key in rx_keys:
769                 retDict['overall']['rx'][key] = \
770                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
771             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
772                           retDict[ports[1]]['rx']['total_pkts']]
773             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
774                           retDict[ports[1]]['rx']['avg_delay_usec']]
775             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
776                           retDict[ports[1]]['rx']['max_delay_usec']]
777             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
778                           retDict[ports[1]]['rx']['min_delay_usec']]
779             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
780             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
781             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
782             for key in ['pkt_bit_rate', 'pkt_rate']:
783                 for dirc in ['tx', 'rx']:
784                     retDict['overall'][dirc][key] /= 2.0
785         else:
786             retDict['overall'] = retDict[ports[0]]
787         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
788         return retDict
789
790     def __convert_rates(self, rate):
791         return utils.convert_rates(self.run_config['l2frame_size'],
792                                    rate,
793                                    self.intf_speed)
794
795     def __ndr_pdr_found(self, tag, load):
796         rates = self.__convert_rates({'rate_percent': load})
797         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
798         last_stats = self.iteration_collector.peek()
799         self.interval_collector.add_ndr_pdr(tag, last_stats)
800
801     def __format_output_stats(self, stats):
802         for key in self.PORTS + ['overall']:
803             interface = stats[key]
804             stats[key] = {
805                 'tx_pkts': interface['tx']['total_pkts'],
806                 'rx_pkts': interface['rx']['total_pkts'],
807                 'drop_percentage': interface['drop_rate_percent'],
808                 'drop_pct': interface['rx']['dropped_pkts'],
809                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
810                 'max_delay_usec': interface['rx']['max_delay_usec'],
811                 'min_delay_usec': interface['rx']['min_delay_usec'],
812             }
813
814         return stats
815
816     def __targets_found(self, rate, targets, results):
817         for tag, target in targets.iteritems():
818             LOG.info('Found %s (%s) load: %s', tag, target, rate)
819             self.__ndr_pdr_found(tag, rate)
820             results[tag]['timestamp_sec'] = time.time()
821
822     def __range_search(self, left, right, targets, results):
823         """Perform a binary search for a list of targets inside a [left..right] range or rate.
824
825         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
826                 indicating the rate to send on each interface
827         right   the right side of the range to search as a % of line rate
828                 indicating the rate to send on each interface
829         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
830                 ('ndr', 'pdr')
831         results a dict to store results
832         """
833         if not targets:
834             return
835         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
836
837         # Terminate search when gap is less than load epsilon
838         if right - left < self.config.measurement.load_epsilon:
839             self.__targets_found(left, targets, results)
840             return
841
842         # Obtain the average drop rate in for middle load
843         middle = (left + right) / 2.0
844         try:
845             stats, rates = self.__run_search_iteration(middle)
846         except STLError:
847             LOG.exception("Got exception from traffic generator during binary search")
848             self.__targets_found(left, targets, results)
849             return
850         # Split target dicts based on the avg drop rate
851         left_targets = {}
852         right_targets = {}
853         for tag, target in targets.iteritems():
854             if stats['overall']['drop_rate_percent'] <= target:
855                 # record the best possible rate found for this target
856                 results[tag] = rates
857                 results[tag].update({
858                     'load_percent_per_direction': middle,
859                     'stats': self.__format_output_stats(dict(stats)),
860                     'timestamp_sec': None
861                 })
862                 right_targets[tag] = target
863             else:
864                 # initialize to 0 all fields of result for
865                 # the worst case scenario of the binary search (if ndr/pdr is not found)
866                 if tag not in results:
867                     results[tag] = dict.fromkeys(rates, 0)
868                     empty_stats = self.__format_output_stats(dict(stats))
869                     for key in empty_stats:
870                         if isinstance(empty_stats[key], dict):
871                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
872                         else:
873                             empty_stats[key] = 0
874                     results[tag].update({
875                         'load_percent_per_direction': 0,
876                         'stats': empty_stats,
877                         'timestamp_sec': None
878                     })
879                 left_targets[tag] = target
880
881         # search lower half
882         self.__range_search(left, middle, left_targets, results)
883
884         # search upper half only if the upper rate does not exceed
885         # 100%, this only happens when the first search at 100%
886         # yields a DR that is < target DR
887         if middle >= 100:
888             self.__targets_found(100, right_targets, results)
889         else:
890             self.__range_search(middle, right, right_targets, results)
891
892     def __run_search_iteration(self, rate):
893         """Run one iteration at the given rate level.
894
895         rate: the rate to send on each port in percent (0 to 100)
896         """
897         self._modify_load(rate)
898
899         # poll interval stats and collect them
900         for stats in self.run_traffic():
901             self.interval_collector.add(stats)
902             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
903             if time_elapsed_ratio >= 1:
904                 self.cancel_traffic()
905                 if not self.skip_sleep():
906                     time.sleep(self.config.pause_sec)
907         self.interval_collector.reset()
908
909         # get stats from the run
910         stats = self.runner.client.get_stats()
911         current_traffic_config = self._get_traffic_config()
912         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
913                                         stats['total_tx_rate'])
914         if warning is not None:
915             stats['warning'] = warning
916
917         # save reliable stats from whole iteration
918         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
919         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
920         return stats, current_traffic_config['direction-total']
921
922     @staticmethod
923     def log_stats(stats):
924         """Log estimated stats during run."""
925         report = {
926             'datetime': str(datetime.now()),
927             'tx_packets': stats['overall']['tx']['total_pkts'],
928             'rx_packets': stats['overall']['rx']['total_pkts'],
929             'drop_packets': stats['overall']['rx']['dropped_pkts'],
930             'drop_rate_percent': stats['overall']['drop_rate_percent']
931         }
932         LOG.info('TX: %(tx_packets)d; '
933                  'RX: %(rx_packets)d; '
934                  'Est. Dropped: %(drop_packets)d; '
935                  'Est. Drop rate: %(drop_rate_percent).4f%%',
936                  report)
937
938     def run_traffic(self):
939         """Start traffic and return intermediate stats for each interval."""
940         stats = self.runner.run()
941         while self.runner.is_running:
942             self.log_stats(stats)
943             yield stats
944             stats = self.runner.poll_stats()
945             if stats is None:
946                 return
947         self.log_stats(stats)
948         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
949         yield stats
950
951     def cancel_traffic(self):
952         """Stop traffic."""
953         self.runner.stop()
954
955     def _get_traffic_config(self):
956         config = {}
957         load_total = 0.0
958         bps_total = 0.0
959         pps_total = 0.0
960         for idx, rate in enumerate(self.run_config['rates']):
961             key = 'direction-forward' if idx == 0 else 'direction-reverse'
962             config[key] = {
963                 'l2frame_size': self.run_config['l2frame_size'],
964                 'duration_sec': self.run_config['duration_sec']
965             }
966             config[key].update(rate)
967             config[key].update(self.__convert_rates(rate))
968             load_total += float(config[key]['rate_percent'])
969             bps_total += float(config[key]['rate_bps'])
970             pps_total += float(config[key]['rate_pps'])
971         config['direction-total'] = dict(config['direction-forward'])
972         config['direction-total'].update({
973             'rate_percent': load_total,
974             'rate_pps': cast_integer(pps_total),
975             'rate_bps': bps_total
976         })
977
978         return config
979
980     def get_run_config(self, results):
981         """Return configuration which was used for the last run."""
982         r = {}
983         # because we want each direction to have the far end RX rates,
984         # use the far end index (1-idx) to retrieve the RX rates
985         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
986             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
987             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
988             r[key] = {
989                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
990                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
991                 "rx": self.__convert_rates({'rate_pps': rx_rate})
992             }
993
994         total = {}
995         for direction in ['orig', 'tx', 'rx']:
996             total[direction] = {}
997             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
998                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
999
1000         r['direction-total'] = total
1001         return r
1002
1003     def insert_interface_stats(self, pps_list):
1004         """Insert interface stats to a list of packet path stats.
1005
1006         pps_list: a list of packet path stats instances indexed by chain index
1007
1008         This function will insert the packet path stats for the traffic gen ports 0 and 1
1009         with itemized per chain tx/rx counters.
1010         There will be as many packet path stats as chains.
1011         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1012         self.pps_list:
1013         [
1014         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1015         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1016         ...
1017         ]
1018         """
1019         def get_if_stats(chain_idx):
1020             return [InterfaceStats('p' + str(port), self.tool)
1021                     for port in range(2)]
1022         # keep the list of list of interface stats indexed by the chain id
1023         self.ifstats = [get_if_stats(chain_idx)
1024                         for chain_idx in range(self.config.service_chain_count)]
1025         # note that we need to make a copy of the ifs list so that any modification in the
1026         # list from pps will not change the list saved in self.ifstats
1027         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1028         # insert the corresponding pps in the passed list
1029         pps_list.extend(self.pps_list)
1030
1031     def update_interface_stats(self, diff=False):
1032         """Update all interface stats.
1033
1034         diff: if False, simply refresh the interface stats values with latest values
1035               if True, diff the interface stats with the latest values
1036         Make sure that the interface stats inserted in insert_interface_stats() are updated
1037         with proper values.
1038         self.ifstats:
1039         [
1040         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1041         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1042         ...
1043         ]
1044         """
1045         if diff:
1046             stats = self.gen.get_stats()
1047             for chain_idx, ifs in enumerate(self.ifstats):
1048                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1049                 # corresponding to the
1050                 # port 0 and port 1 for the given chain_idx
1051                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1052                 # interface stats for the pps because it could have been modified to contain
1053                 # additional interface stats
1054                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1055             # special handling for vxlan
1056             # in case of vxlan, flow stats are not available so all rx counters will be
1057             # zeros when the total rx port counter is non zero.
1058             # in that case,
1059             for port in range(2):
1060                 total_rx = 0
1061                 for ifs in self.ifstats:
1062                     total_rx += ifs[port].rx
1063                 if total_rx == 0:
1064                     # check if the total port rx from Trex is also zero
1065                     port_rx = stats[port]['rx']['total_pkts']
1066                     if port_rx:
1067                         # the total rx for all chains from port level stats is non zero
1068                         # which means that the per-chain stats are not available
1069                         if len(self.ifstats) == 1:
1070                             # only one chain, simply report the port level rx to the chain rx stats
1071                             self.ifstats[0][port].rx = port_rx
1072                         else:
1073                             for ifs in self.ifstats:
1074                                 # mark this data as unavailable
1075                                 ifs[port].rx = None
1076                             # pitch in the total rx only in the last chain pps
1077                             self.ifstats[-1][port].rx_total = port_rx
1078
1079     @staticmethod
1080     def compare_tx_rates(required, actual):
1081         """Compare the actual TX rate to the required TX rate."""
1082         threshold = 0.9
1083         are_different = False
1084         try:
1085             if float(actual) / required < threshold:
1086                 are_different = True
1087         except ZeroDivisionError:
1088             are_different = True
1089
1090         if are_different:
1091             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1092                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1093                   "to achieve the requested TX rate.".format(r=required, a=actual)
1094             LOG.info(msg)
1095             return msg
1096
1097         return None
1098
1099     def get_per_direction_rate(self):
1100         """Get the rate for each direction."""
1101         divisor = 2 if self.run_config['bidirectional'] else 1
1102         if 'rate_percent' in self.current_total_rate:
1103             # don't split rate if it's percentage
1104             divisor = 1
1105
1106         return utils.divide_rate(self.current_total_rate, divisor)
1107
1108     def close(self):
1109         """Close this instance."""
1110         try:
1111             self.gen.stop_traffic()
1112         except Exception:
1113             pass
1114         self.gen.clear_stats()
1115         self.gen.cleanup()