NFVBENCH-152 Add service_mode method for debugging purpose
[nfvbench.git] / nfvbench / traffic_client.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 """Interface to the traffic generator clients including NDR/PDR binary search."""
16
17 from datetime import datetime
18 import socket
19 import struct
20 import time
21
22 from attrdict import AttrDict
23 import bitmath
24 from netaddr import IPNetwork
25 # pylint: disable=import-error
26 from trex.stl.api import Ether
27 from trex.stl.api import STLError
28 from trex.stl.api import UDP
29 # pylint: enable=import-error
30
31 from log import LOG
32 from packet_stats import InterfaceStats
33 from packet_stats import PacketPathStats
34 from stats_collector import IntervalCollector
35 from stats_collector import IterationCollector
36 import traffic_gen.traffic_utils as utils
37 from utils import cast_integer
38
39
40 class TrafficClientException(Exception):
41     """Generic traffic client exception."""
42
43     pass
44
45
46 class TrafficRunner(object):
47     """Serialize various steps required to run traffic."""
48
49     def __init__(self, client, duration_sec, interval_sec=0, service_mode=False):
50         """Create a traffic runner."""
51         self.client = client
52         self.start_time = None
53         self.duration_sec = duration_sec
54         self.interval_sec = interval_sec
55         self.service_mode = service_mode
56
57     def run(self):
58         """Clear stats and instruct the traffic generator to start generating traffic."""
59         if self.is_running():
60             return None
61         LOG.info('Running traffic generator')
62         self.client.gen.clear_stats()
63         # Debug use only : new '--service-mode' option available for the NFVBench command line.
64         # A read-only mode TRex console would be able to capture the generated traffic.
65         self.client.gen.set_service_mode(enabled=self.service_mode)
66         LOG.info('Service mode is %sabled', 'en' if self.service_mode else 'dis')
67         self.client.gen.start_traffic()
68         self.start_time = time.time()
69         return self.poll_stats()
70
71     def stop(self):
72         """Stop the current run and instruct the traffic generator to stop traffic."""
73         if self.is_running():
74             self.start_time = None
75             self.client.gen.stop_traffic()
76
77     def is_running(self):
78         """Check if a run is still pending."""
79         return self.start_time is not None
80
81     def time_elapsed(self):
82         """Return time elapsed since start of run."""
83         if self.is_running():
84             return time.time() - self.start_time
85         return self.duration_sec
86
87     def poll_stats(self):
88         """Poll latest stats from the traffic generator at fixed interval - sleeps if necessary.
89
90         return: latest stats or None if traffic is stopped
91         """
92         if not self.is_running():
93             return None
94         if self.client.skip_sleep():
95             self.stop()
96             return self.client.get_stats()
97         time_elapsed = self.time_elapsed()
98         if time_elapsed > self.duration_sec:
99             self.stop()
100             return None
101         time_left = self.duration_sec - time_elapsed
102         if self.interval_sec > 0.0:
103             if time_left <= self.interval_sec:
104                 time.sleep(time_left)
105                 self.stop()
106             else:
107                 time.sleep(self.interval_sec)
108         else:
109             time.sleep(self.duration_sec)
110             self.stop()
111         return self.client.get_stats()
112
113
114 class IpBlock(object):
115     """Manage a block of IP addresses."""
116
117     def __init__(self, base_ip, step_ip, count_ip):
118         """Create an IP block."""
119         self.base_ip_int = Device.ip_to_int(base_ip)
120         self.step = Device.ip_to_int(step_ip)
121         self.max_available = count_ip
122         self.next_free = 0
123
124     def get_ip(self, index=0):
125         """Return the IP address at given index."""
126         if index < 0 or index >= self.max_available:
127             raise IndexError('Index out of bounds: %d (max=%d)' % (index, self.max_available))
128         return Device.int_to_ip(self.base_ip_int + index * self.step)
129
130     def reserve_ip_range(self, count):
131         """Reserve a range of count consecutive IP addresses spaced by step."""
132         if self.next_free + count > self.max_available:
133             raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
134                              (self.next_free,
135                               self.max_available,
136                               count))
137         first_ip = self.get_ip(self.next_free)
138         last_ip = self.get_ip(self.next_free + count - 1)
139         self.next_free += count
140         return (first_ip, last_ip)
141
142     def reset_reservation(self):
143         """Reset all reservations and restart with a completely unused IP block."""
144         self.next_free = 0
145
146
147 class Device(object):
148     """Represent a port device and all information associated to it.
149
150     In the curent version we only support 2 port devices for the traffic generator
151     identified as port 0 or port 1.
152     """
153
154     def __init__(self, port, generator_config):
155         """Create a new device for a given port."""
156         self.generator_config = generator_config
157         self.chain_count = generator_config.service_chain_count
158         self.flow_count = generator_config.flow_count / 2
159         self.port = port
160         self.switch_port = generator_config.interfaces[port].get('switch_port', None)
161         self.vtep_vlan = None
162         self.vtep_src_mac = None
163         self.vxlan = False
164         self.pci = generator_config.interfaces[port].pci
165         self.mac = None
166         self.dest_macs = None
167         self.vtep_dst_mac = None
168         self.vtep_dst_ip = None
169         if generator_config.vteps is None:
170             self.vtep_src_ip = None
171         else:
172             self.vtep_src_ip = generator_config.vteps[port]
173         self.vnis = None
174         self.vlans = None
175         self.ip_addrs = generator_config.ip_addrs[port]
176         subnet = IPNetwork(self.ip_addrs)
177         self.ip = subnet.ip.format()
178         self.ip_addrs_step = generator_config.ip_addrs_step
179         self.ip_block = IpBlock(self.ip, self.ip_addrs_step, self.flow_count)
180         self.gw_ip_block = IpBlock(generator_config.gateway_ips[port],
181                                    generator_config.gateway_ip_addrs_step,
182                                    self.chain_count)
183         self.tg_gateway_ip_addrs = generator_config.tg_gateway_ip_addrs[port]
184         self.tg_gw_ip_block = IpBlock(self.tg_gateway_ip_addrs,
185                                       generator_config.tg_gateway_ip_addrs_step,
186                                       self.chain_count)
187         self.udp_src_port = generator_config.udp_src_port
188         self.udp_dst_port = generator_config.udp_dst_port
189
190     def set_mac(self, mac):
191         """Set the local MAC for this port device."""
192         if mac is None:
193             raise TrafficClientException('Trying to set traffic generator MAC address as None')
194         self.mac = mac
195
196     def get_peer_device(self):
197         """Get the peer device (device 0 -> device 1, or device 1 -> device 0)."""
198         return self.generator_config.devices[1 - self.port]
199
200     def set_vtep_dst_mac(self, dest_macs):
201         """Set the list of dest MACs indexed by the chain id.
202
203         This is only called in 2 cases:
204         - VM macs discovered using openstack API
205         - dest MACs provisioned in config file
206         """
207         self.vtep_dst_mac = map(str, dest_macs)
208
209     def set_dest_macs(self, dest_macs):
210         """Set the list of dest MACs indexed by the chain id.
211
212         This is only called in 2 cases:
213         - VM macs discovered using openstack API
214         - dest MACs provisioned in config file
215         """
216         self.dest_macs = map(str, dest_macs)
217
218     def get_dest_macs(self):
219         """Get the list of dest macs for this device.
220
221         If set_dest_macs was never called, assumes l2-loopback and return
222         a list of peer mac (as many as chains but normally only 1 chain)
223         """
224         if self.dest_macs:
225             return self.dest_macs
226         # assume this is l2-loopback
227         return [self.get_peer_device().mac] * self.chain_count
228
229     def set_vlans(self, vlans):
230         """Set the list of vlans to use indexed by the chain id."""
231         self.vlans = vlans
232         LOG.info("Port %d: VLANs %s", self.port, self.vlans)
233
234     def set_vtep_vlan(self, vlan):
235         """Set the vtep vlan to use indexed by specific port."""
236         self.vtep_vlan = vlan
237         self.vxlan = True
238         self.vlan_tagging = None
239         LOG.info("Port %d: VTEP VLANs %s", self.port, self.vtep_vlan)
240
241     def set_vxlan_endpoints(self, src_ip, dst_ip):
242         self.vtep_dst_ip = dst_ip
243         self.vtep_src_ip = src_ip
244         LOG.info("Port %d: src_vtep %s, dst_vtep %s", self.port,
245                  self.vtep_src_ip, self.vtep_dst_ip)
246
247     def set_vxlans(self, vnis):
248         self.vnis = vnis
249         LOG.info("Port %d: VNIs %s", self.port, self.vnis)
250
251     def set_gw_ip(self, gateway_ip):
252         self.gw_ip_block = IpBlock(gateway_ip,
253                                    self.generator_config.gateway_ip_addrs_step,
254                                    self.chain_count)
255
256     def get_gw_ip(self, chain_index):
257         """Retrieve the IP address assigned for the gateway of a given chain."""
258         return self.gw_ip_block.get_ip(chain_index)
259
260     def get_stream_configs(self):
261         """Get the stream config for a given chain on this device.
262
263         Called by the traffic generator driver to program the traffic generator properly
264         before generating traffic
265         """
266         configs = []
267         # exact flow count for each chain is calculated as follows:
268         # - all chains except the first will have the same flow count
269         #   calculated as (total_flows + chain_count - 1) / chain_count
270         # - the first chain will have the remainder
271         # example 11 flows and 3 chains => 3, 4, 4
272         flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
273         cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
274         peer = self.get_peer_device()
275         self.ip_block.reset_reservation()
276         peer.ip_block.reset_reservation()
277         dest_macs = self.get_dest_macs()
278
279         for chain_idx in xrange(self.chain_count):
280             src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
281             dst_ip_first, dst_ip_last = peer.ip_block.reserve_ip_range(cur_chain_flow_count)
282
283             configs.append({
284                 'count': cur_chain_flow_count,
285                 'mac_src': self.mac,
286                 'mac_dst': dest_macs[chain_idx],
287                 'ip_src_addr': src_ip_first,
288                 'ip_src_addr_max': src_ip_last,
289                 'ip_src_count': cur_chain_flow_count,
290                 'ip_dst_addr': dst_ip_first,
291                 'ip_dst_addr_max': dst_ip_last,
292                 'ip_dst_count': cur_chain_flow_count,
293                 'ip_addrs_step': self.ip_addrs_step,
294                 'udp_src_port': self.udp_src_port,
295                 'udp_dst_port': self.udp_dst_port,
296                 'mac_discovery_gw': self.get_gw_ip(chain_idx),
297                 'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
298                 'ip_dst_tg_gw': peer.tg_gw_ip_block.get_ip(chain_idx),
299                 'vlan_tag': self.vlans[chain_idx] if self.vlans else None,
300                 'vxlan': self.vxlan,
301                 'vtep_vlan': self.vtep_vlan if self.vtep_vlan else None,
302                 'vtep_src_mac': self.mac if self.vxlan is True else None,
303                 'vtep_dst_mac': self.vtep_dst_mac if self.vxlan is True else None,
304                 'vtep_dst_ip': self.vtep_dst_ip if self.vxlan is True else None,
305                 'vtep_src_ip': self.vtep_src_ip if self.vxlan is True else None,
306                 'net_vni': self.vnis[chain_idx] if self.vxlan is True else None
307             })
308             # after first chain, fall back to the flow count for all other chains
309             cur_chain_flow_count = flows_per_chain
310         return configs
311
312     @staticmethod
313     def ip_to_int(addr):
314         """Convert an IP address from string to numeric."""
315         return struct.unpack("!I", socket.inet_aton(addr))[0]
316
317     @staticmethod
318     def int_to_ip(nvalue):
319         """Convert an IP address from numeric to string."""
320         return socket.inet_ntoa(struct.pack("!I", nvalue))
321
322
323 class GeneratorConfig(object):
324     """Represents traffic configuration for currently running traffic profile."""
325
326     DEFAULT_IP_STEP = '0.0.0.1'
327     DEFAULT_SRC_DST_IP_STEP = '0.0.0.1'
328
329     def __init__(self, config):
330         """Create a generator config."""
331         self.config = config
332         # name of the generator profile (normally trex or dummy)
333         # pick the default one if not specified explicitly from cli options
334         if not config.generator_profile:
335             config.generator_profile = config.traffic_generator.default_profile
336         # pick up the profile dict based on the name
337         gen_config = self.__match_generator_profile(config.traffic_generator,
338                                                     config.generator_profile)
339         self.gen_config = gen_config
340         # copy over fields from the dict
341         self.tool = gen_config.tool
342         self.ip = gen_config.ip
343         # overrides on config.cores and config.mbuf_factor
344         if config.cores:
345             self.cores = config.cores
346         else:
347             self.cores = gen_config.get('cores', 1)
348         self.mbuf_factor = config.mbuf_factor
349         self.mbuf_64 = config.mbuf_64
350         self.hdrh = not config.disable_hdrh
351         if gen_config.intf_speed:
352             # interface speed is overriden from config
353             self.intf_speed = bitmath.parse_string(gen_config.intf_speed.replace('ps', '')).bits
354         else:
355             # interface speed is discovered/provided by the traffic generator
356             self.intf_speed = 0
357         self.name = gen_config.name
358         self.zmq_pub_port = gen_config.get('zmq_pub_port', 4500)
359         self.zmq_rpc_port = gen_config.get('zmq_rpc_port', 4501)
360         self.limit_memory = gen_config.get('limit_memory', 1024)
361         self.software_mode = gen_config.get('software_mode', False)
362         self.interfaces = gen_config.interfaces
363         if self.interfaces[0].port != 0 or self.interfaces[1].port != 1:
364             raise TrafficClientException('Invalid port order/id in generator_profile.interfaces')
365         self.service_chain = config.service_chain
366         self.service_chain_count = config.service_chain_count
367         self.flow_count = config.flow_count
368         self.host_name = gen_config.host_name
369
370         self.tg_gateway_ip_addrs = gen_config.tg_gateway_ip_addrs
371         self.ip_addrs = gen_config.ip_addrs
372         self.ip_addrs_step = gen_config.ip_addrs_step or self.DEFAULT_SRC_DST_IP_STEP
373         self.tg_gateway_ip_addrs_step = \
374             gen_config.tg_gateway_ip_addrs_step or self.DEFAULT_IP_STEP
375         self.gateway_ip_addrs_step = gen_config.gateway_ip_addrs_step or self.DEFAULT_IP_STEP
376         self.gateway_ips = gen_config.gateway_ip_addrs
377         self.udp_src_port = gen_config.udp_src_port
378         self.udp_dst_port = gen_config.udp_dst_port
379         self.vteps = gen_config.get('vteps')
380         self.devices = [Device(port, self) for port in [0, 1]]
381         # This should normally always be [0, 1]
382         self.ports = [device.port for device in self.devices]
383
384         # check that pci is not empty
385         if not gen_config.interfaces[0].get('pci', None) or \
386            not gen_config.interfaces[1].get('pci', None):
387             raise TrafficClientException("configuration interfaces pci fields cannot be empty")
388
389         self.pcis = [tgif['pci'] for tgif in gen_config.interfaces]
390         self.vlan_tagging = config.vlan_tagging
391
392         # needed for result/summarizer
393         config['tg-name'] = gen_config.name
394         config['tg-tool'] = self.tool
395
396     def to_json(self):
397         """Get json form to display the content into the overall result dict."""
398         return dict(self.gen_config)
399
400     def set_dest_macs(self, port_index, dest_macs):
401         """Set the list of dest MACs indexed by the chain id on given port.
402
403         port_index: the port for which dest macs must be set
404         dest_macs: a list of dest MACs indexed by chain id
405         """
406         if len(dest_macs) < self.config.service_chain_count:
407             raise TrafficClientException('Dest MAC list %s must have %d entries' %
408                                          (dest_macs, self.config.service_chain_count))
409         # only pass the first scc dest MACs
410         self.devices[port_index].set_dest_macs(dest_macs[:self.config.service_chain_count])
411         LOG.info('Port %d: dst MAC %s', port_index, [str(mac) for mac in dest_macs])
412
413     def set_vtep_dest_macs(self, port_index, dest_macs):
414         """Set the list of dest MACs indexed by the chain id on given port.
415
416         port_index: the port for which dest macs must be set
417         dest_macs: a list of dest MACs indexed by chain id
418         """
419         if len(dest_macs) != self.config.service_chain_count:
420             raise TrafficClientException('Dest MAC list %s must have %d entries' %
421                                          (dest_macs, self.config.service_chain_count))
422         self.devices[port_index].set_vtep_dst_mac(dest_macs)
423         LOG.info('Port %d: vtep dst MAC %s', port_index, set([str(mac) for mac in dest_macs]))
424
425     def get_dest_macs(self):
426         """Return the list of dest macs indexed by port."""
427         return [dev.get_dest_macs() for dev in self.devices]
428
429     def set_vlans(self, port_index, vlans):
430         """Set the list of vlans to use indexed by the chain id on given port.
431
432         port_index: the port for which VLANs must be set
433         vlans: a  list of vlan lists indexed by chain id
434         """
435         if len(vlans) != self.config.service_chain_count:
436             raise TrafficClientException('VLAN list %s must have %d entries' %
437                                          (vlans, self.config.service_chain_count))
438         self.devices[port_index].set_vlans(vlans)
439
440     def set_vxlans(self, port_index, vxlans):
441         """Set the list of vxlans (VNIs) to use indexed by the chain id on given port.
442
443         port_index: the port for which VXLANs must be set
444         VXLANs: a  list of VNIs lists indexed by chain id
445         """
446         if len(vxlans) != self.config.service_chain_count:
447             raise TrafficClientException('VXLAN list %s must have %d entries' %
448                                          (vxlans, self.config.service_chain_count))
449         self.devices[port_index].set_vxlans(vxlans)
450
451     def set_vtep_vlan(self, port_index, vlan):
452         """Set the vtep vlan to use indexed by the chain id on given port.
453         port_index: the port for which VLAN must be set
454         """
455         self.devices[port_index].set_vtep_vlan(vlan)
456
457     def set_vxlan_endpoints(self, port_index, src_ip, dst_ip):
458         self.devices[port_index].set_vxlan_endpoints(src_ip, dst_ip)
459
460     @staticmethod
461     def __match_generator_profile(traffic_generator, generator_profile):
462         gen_config = AttrDict(traffic_generator)
463         gen_config.pop('default_profile')
464         gen_config.pop('generator_profile')
465         matching_profile = [profile for profile in traffic_generator.generator_profile if
466                             profile.name == generator_profile]
467         if len(matching_profile) != 1:
468             raise Exception('Traffic generator profile not found: ' + generator_profile)
469
470         gen_config.update(matching_profile[0])
471         return gen_config
472
473
474 class TrafficClient(object):
475     """Traffic generator client with NDR/PDR binary seearch."""
476
477     PORTS = [0, 1]
478
479     def __init__(self, config, notifier=None):
480         """Create a new TrafficClient instance.
481
482         config: nfvbench config
483         notifier: notifier (optional)
484
485         A new instance is created everytime the nfvbench config may have changed.
486         """
487         self.config = config
488         self.generator_config = GeneratorConfig(config)
489         self.tool = self.generator_config.tool
490         self.gen = self._get_generator()
491         self.notifier = notifier
492         self.interval_collector = None
493         self.iteration_collector = None
494         self.runner = TrafficRunner(self, self.config.duration_sec, self.config.interval_sec,
495                                     self.config.service_mode)
496         self.config.frame_sizes = self._get_frame_sizes()
497         self.run_config = {
498             'l2frame_size': None,
499             'duration_sec': self.config.duration_sec,
500             'bidirectional': True,
501             'rates': []  # to avoid unsbuscriptable-obj warning
502         }
503         self.current_total_rate = {'rate_percent': '10'}
504         if self.config.single_run:
505             self.current_total_rate = utils.parse_rate_str(self.config.rate)
506         self.ifstats = None
507         # Speed is either discovered when connecting to TG or set from config
508         # This variable is 0 if not yet discovered from TG or must be the speed of
509         # each interface in bits per second
510         self.intf_speed = self.generator_config.intf_speed
511
512     def _get_generator(self):
513         tool = self.tool.lower()
514         if tool == 'trex':
515             from traffic_gen import trex_gen
516             return trex_gen.TRex(self)
517         if tool == 'dummy':
518             from traffic_gen import dummy
519             return dummy.DummyTG(self)
520         raise TrafficClientException('Unsupported generator tool name:' + self.tool)
521
522     def skip_sleep(self):
523         """Skip all sleeps when doing unit testing with dummy TG.
524
525         Must be overriden using mock.patch
526         """
527         return False
528
529     def _get_frame_sizes(self):
530         traffic_profile_name = self.config.traffic.profile
531         matching_profiles = [profile for profile in self.config.traffic_profile if
532                              profile.name == traffic_profile_name]
533         if len(matching_profiles) > 1:
534             raise TrafficClientException('Multiple traffic profiles with name: ' +
535                                          traffic_profile_name)
536         elif not matching_profiles:
537             raise TrafficClientException('Cannot find traffic profile: ' + traffic_profile_name)
538         return matching_profiles[0].l2frame_size
539
540     def start_traffic_generator(self):
541         """Start the traffic generator process (traffic not started yet)."""
542         self.gen.connect()
543         # pick up the interface speed if it is not set from config
544         intf_speeds = self.gen.get_port_speed_gbps()
545         # convert Gbps unit into bps
546         tg_if_speed = bitmath.parse_string(str(intf_speeds[0]) + 'Gb').bits
547         if self.intf_speed:
548             # interface speed is overriden from config
549             if self.intf_speed != tg_if_speed:
550                 # Warn the user if the speed in the config is different
551                 LOG.warning('Interface speed provided is different from actual speed (%d Gbps)',
552                             intf_speeds[0])
553         else:
554             # interface speed not provisioned by config
555             self.intf_speed = tg_if_speed
556             # also update the speed in the tg config
557             self.generator_config.intf_speed = tg_if_speed
558
559         # Save the traffic generator local MAC
560         for mac, device in zip(self.gen.get_macs(), self.generator_config.devices):
561             device.set_mac(mac)
562
563     def setup(self):
564         """Set up the traffic client."""
565         self.gen.clear_stats()
566
567     def get_version(self):
568         """Get the traffic generator version."""
569         return self.gen.get_version()
570
571     def ensure_end_to_end(self):
572         """Ensure traffic generator receives packets it has transmitted.
573
574         This ensures end to end connectivity and also waits until VMs are ready to forward packets.
575
576         VMs that are started and in active state may not pass traffic yet. It is imperative to make
577         sure that all VMs are passing traffic in both directions before starting any benchmarking.
578         To verify this, we need to send at a low frequency bi-directional packets and make sure
579         that we receive all packets back from all VMs. The number of flows is equal to 2 times
580         the number of chains (1 per direction) and we need to make sure we receive packets coming
581         from exactly 2 x chain count different source MAC addresses.
582
583         Example:
584             PVP chain (1 VM per chain)
585             N = 10 (number of chains)
586             Flow count = 20 (number of flows)
587             If the number of unique source MAC addresses from received packets is 20 then
588             all 10 VMs 10 VMs are in operational state.
589         """
590         LOG.info('Starting traffic generator to ensure end-to-end connectivity')
591         # send 2pps on each chain and each direction
592         rate_pps = {'rate_pps': str(self.config.service_chain_count * 2)}
593         self.gen.create_traffic('64', [rate_pps, rate_pps], bidirectional=True, latency=False,
594                                 e2e=True)
595         # ensures enough traffic is coming back
596         retry_count = (self.config.check_traffic_time_sec +
597                        self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
598
599         # we expect to see packets coming from 2 unique MAC per chain
600         # because there can be flooding in the case of shared net
601         # we must verify that packets from the right VMs are received
602         # and not just count unique src MAC
603         # create a dict of (port, chain) tuples indexed by dest mac
604         mac_map = {}
605         for port, dest_macs in enumerate(self.generator_config.get_dest_macs()):
606             for chain, mac in enumerate(dest_macs):
607                 mac_map[mac] = (port, chain)
608         unique_src_mac_count = len(mac_map)
609         if self.config.vxlan and self.config.traffic_generator.vtep_vlan:
610             get_mac_id = lambda packet: packet['binary'][60:66]
611         elif self.config.vxlan:
612             get_mac_id = lambda packet: packet['binary'][56:62]
613         else:
614             get_mac_id = lambda packet: packet['binary'][6:12]
615         for it in xrange(retry_count):
616             self.gen.clear_stats()
617             self.gen.start_traffic()
618             self.gen.start_capture()
619             LOG.info('Captured unique src mac %d/%d, capturing return packets (retry %d/%d)...',
620                      unique_src_mac_count - len(mac_map), unique_src_mac_count,
621                      it + 1, retry_count)
622             if not self.skip_sleep():
623                 time.sleep(self.config.generic_poll_sec)
624             self.gen.stop_traffic()
625             self.gen.fetch_capture_packets()
626             self.gen.stop_capture()
627             for packet in self.gen.packet_list:
628                 mac_id = get_mac_id(packet)
629                 src_mac = ':'.join(["%02x" % ord(x) for x in mac_id])
630                 if src_mac in mac_map and self.is_udp(packet):
631                     port, chain = mac_map[src_mac]
632                     LOG.info('Received packet from mac: %s (chain=%d, port=%d)',
633                              src_mac, chain, port)
634                     mac_map.pop(src_mac, None)
635
636                 if not mac_map:
637                     LOG.info('End-to-end connectivity established')
638                     return
639             if self.config.l3_router and not self.config.no_arp:
640                 # In case of L3 traffic mode, routers are not able to route traffic
641                 # until VM interfaces are up and ARP requests are done
642                 LOG.info('Waiting for loopback service completely started...')
643                 LOG.info('Sending ARP request to assure end-to-end connectivity established')
644                 self.ensure_arp_successful()
645         raise TrafficClientException('End-to-end connectivity cannot be ensured')
646
647     def is_udp(self, packet):
648         pkt = Ether(packet['binary'])
649         return UDP in pkt
650
651     def ensure_arp_successful(self):
652         """Resolve all IP using ARP and throw an exception in case of failure."""
653         dest_macs = self.gen.resolve_arp()
654         if dest_macs:
655             # all dest macs are discovered, saved them into the generator config
656             if self.config.vxlan:
657                 self.generator_config.set_vtep_dest_macs(0, dest_macs[0])
658                 self.generator_config.set_vtep_dest_macs(1, dest_macs[1])
659             else:
660                 self.generator_config.set_dest_macs(0, dest_macs[0])
661                 self.generator_config.set_dest_macs(1, dest_macs[1])
662         else:
663             raise TrafficClientException('ARP cannot be resolved')
664
665     def set_traffic(self, frame_size, bidirectional):
666         """Reconfigure the traffic generator for a new frame size."""
667         self.run_config['bidirectional'] = bidirectional
668         self.run_config['l2frame_size'] = frame_size
669         self.run_config['rates'] = [self.get_per_direction_rate()]
670         if bidirectional:
671             self.run_config['rates'].append(self.get_per_direction_rate())
672         else:
673             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
674             if unidir_reverse_pps > 0:
675                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
676         # Fix for [NFVBENCH-67], convert the rate string to PPS
677         for idx, rate in enumerate(self.run_config['rates']):
678             if 'rate_pps' not in rate:
679                 self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
680
681         self.gen.clear_streamblock()
682         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
683
684     def _modify_load(self, load):
685         self.current_total_rate = {'rate_percent': str(load)}
686         rate_per_direction = self.get_per_direction_rate()
687
688         self.gen.modify_rate(rate_per_direction, False)
689         self.run_config['rates'][0] = rate_per_direction
690         if self.run_config['bidirectional']:
691             self.gen.modify_rate(rate_per_direction, True)
692             self.run_config['rates'][1] = rate_per_direction
693
694     def get_ndr_and_pdr(self):
695         """Start the NDR/PDR iteration and return the results."""
696         dst = 'Bidirectional' if self.run_config['bidirectional'] else 'Unidirectional'
697         targets = {}
698         if self.config.ndr_run:
699             LOG.info('*** Searching NDR for %s (%s)...', self.run_config['l2frame_size'], dst)
700             targets['ndr'] = self.config.measurement.NDR
701         if self.config.pdr_run:
702             LOG.info('*** Searching PDR for %s (%s)...', self.run_config['l2frame_size'], dst)
703             targets['pdr'] = self.config.measurement.PDR
704
705         self.run_config['start_time'] = time.time()
706         self.interval_collector = IntervalCollector(self.run_config['start_time'])
707         self.interval_collector.attach_notifier(self.notifier)
708         self.iteration_collector = IterationCollector(self.run_config['start_time'])
709         results = {}
710         self.__range_search(0.0, 200.0, targets, results)
711
712         results['iteration_stats'] = {
713             'ndr_pdr': self.iteration_collector.get()
714         }
715
716         if self.config.ndr_run:
717             LOG.info('NDR load: %s', results['ndr']['rate_percent'])
718             results['ndr']['time_taken_sec'] = \
719                 results['ndr']['timestamp_sec'] - self.run_config['start_time']
720             if self.config.pdr_run:
721                 LOG.info('PDR load: %s', results['pdr']['rate_percent'])
722                 results['pdr']['time_taken_sec'] = \
723                     results['pdr']['timestamp_sec'] - results['ndr']['timestamp_sec']
724         else:
725             LOG.info('PDR load: %s', results['pdr']['rate_percent'])
726             results['pdr']['time_taken_sec'] = \
727                 results['pdr']['timestamp_sec'] - self.run_config['start_time']
728         return results
729
730     def __get_dropped_rate(self, result):
731         dropped_pkts = result['rx']['dropped_pkts']
732         total_pkts = result['tx']['total_pkts']
733         if not total_pkts:
734             return float('inf')
735         return float(dropped_pkts) / total_pkts * 100
736
737     def get_stats(self):
738         """Collect final stats for previous run."""
739         stats = self.gen.get_stats()
740         retDict = {'total_tx_rate': stats['total_tx_rate']}
741         for port in self.PORTS:
742             retDict[port] = {'tx': {}, 'rx': {}}
743
744         tx_keys = ['total_pkts', 'total_pkt_bytes', 'pkt_rate', 'pkt_bit_rate']
745         rx_keys = tx_keys + ['dropped_pkts']
746
747         for port in self.PORTS:
748             for key in tx_keys:
749                 retDict[port]['tx'][key] = int(stats[port]['tx'][key])
750             for key in rx_keys:
751                 try:
752                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
753                 except ValueError:
754                     retDict[port]['rx'][key] = 0
755             retDict[port]['rx']['avg_delay_usec'] = cast_integer(
756                 stats[port]['rx']['avg_delay_usec'])
757             retDict[port]['rx']['min_delay_usec'] = cast_integer(
758                 stats[port]['rx']['min_delay_usec'])
759             retDict[port]['rx']['max_delay_usec'] = cast_integer(
760                 stats[port]['rx']['max_delay_usec'])
761             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
762
763         ports = sorted(retDict.keys())
764         if self.run_config['bidirectional']:
765             retDict['overall'] = {'tx': {}, 'rx': {}}
766             for key in tx_keys:
767                 retDict['overall']['tx'][key] = \
768                     retDict[ports[0]]['tx'][key] + retDict[ports[1]]['tx'][key]
769             for key in rx_keys:
770                 retDict['overall']['rx'][key] = \
771                     retDict[ports[0]]['rx'][key] + retDict[ports[1]]['rx'][key]
772             total_pkts = [retDict[ports[0]]['rx']['total_pkts'],
773                           retDict[ports[1]]['rx']['total_pkts']]
774             avg_delays = [retDict[ports[0]]['rx']['avg_delay_usec'],
775                           retDict[ports[1]]['rx']['avg_delay_usec']]
776             max_delays = [retDict[ports[0]]['rx']['max_delay_usec'],
777                           retDict[ports[1]]['rx']['max_delay_usec']]
778             min_delays = [retDict[ports[0]]['rx']['min_delay_usec'],
779                           retDict[ports[1]]['rx']['min_delay_usec']]
780             retDict['overall']['rx']['avg_delay_usec'] = utils.weighted_avg(total_pkts, avg_delays)
781             retDict['overall']['rx']['min_delay_usec'] = min(min_delays)
782             retDict['overall']['rx']['max_delay_usec'] = max(max_delays)
783             for key in ['pkt_bit_rate', 'pkt_rate']:
784                 for dirc in ['tx', 'rx']:
785                     retDict['overall'][dirc][key] /= 2.0
786         else:
787             retDict['overall'] = retDict[ports[0]]
788         retDict['overall']['drop_rate_percent'] = self.__get_dropped_rate(retDict['overall'])
789         return retDict
790
791     def __convert_rates(self, rate):
792         return utils.convert_rates(self.run_config['l2frame_size'],
793                                    rate,
794                                    self.intf_speed)
795
796     def __ndr_pdr_found(self, tag, load):
797         rates = self.__convert_rates({'rate_percent': load})
798         self.iteration_collector.add_ndr_pdr(tag, rates['rate_pps'])
799         last_stats = self.iteration_collector.peek()
800         self.interval_collector.add_ndr_pdr(tag, last_stats)
801
802     def __format_output_stats(self, stats):
803         for key in self.PORTS + ['overall']:
804             interface = stats[key]
805             stats[key] = {
806                 'tx_pkts': interface['tx']['total_pkts'],
807                 'rx_pkts': interface['rx']['total_pkts'],
808                 'drop_percentage': interface['drop_rate_percent'],
809                 'drop_pct': interface['rx']['dropped_pkts'],
810                 'avg_delay_usec': interface['rx']['avg_delay_usec'],
811                 'max_delay_usec': interface['rx']['max_delay_usec'],
812                 'min_delay_usec': interface['rx']['min_delay_usec'],
813             }
814
815         return stats
816
817     def __targets_found(self, rate, targets, results):
818         for tag, target in targets.iteritems():
819             LOG.info('Found %s (%s) load: %s', tag, target, rate)
820             self.__ndr_pdr_found(tag, rate)
821             results[tag]['timestamp_sec'] = time.time()
822
823     def __range_search(self, left, right, targets, results):
824         """Perform a binary search for a list of targets inside a [left..right] range or rate.
825
826         left    the left side of the range to search as a % the line rate (100 = 100% line rate)
827                 indicating the rate to send on each interface
828         right   the right side of the range to search as a % of line rate
829                 indicating the rate to send on each interface
830         targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
831                 ('ndr', 'pdr')
832         results a dict to store results
833         """
834         if not targets:
835             return
836         LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
837
838         # Terminate search when gap is less than load epsilon
839         if right - left < self.config.measurement.load_epsilon:
840             self.__targets_found(left, targets, results)
841             return
842
843         # Obtain the average drop rate in for middle load
844         middle = (left + right) / 2.0
845         try:
846             stats, rates = self.__run_search_iteration(middle)
847         except STLError:
848             LOG.exception("Got exception from traffic generator during binary search")
849             self.__targets_found(left, targets, results)
850             return
851         # Split target dicts based on the avg drop rate
852         left_targets = {}
853         right_targets = {}
854         for tag, target in targets.iteritems():
855             if stats['overall']['drop_rate_percent'] <= target:
856                 # record the best possible rate found for this target
857                 results[tag] = rates
858                 results[tag].update({
859                     'load_percent_per_direction': middle,
860                     'stats': self.__format_output_stats(dict(stats)),
861                     'timestamp_sec': None
862                 })
863                 right_targets[tag] = target
864             else:
865                 # initialize to 0 all fields of result for
866                 # the worst case scenario of the binary search (if ndr/pdr is not found)
867                 if tag not in results:
868                     results[tag] = dict.fromkeys(rates, 0)
869                     empty_stats = self.__format_output_stats(dict(stats))
870                     for key in empty_stats:
871                         if isinstance(empty_stats[key], dict):
872                             empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
873                         else:
874                             empty_stats[key] = 0
875                     results[tag].update({
876                         'load_percent_per_direction': 0,
877                         'stats': empty_stats,
878                         'timestamp_sec': None
879                     })
880                 left_targets[tag] = target
881
882         # search lower half
883         self.__range_search(left, middle, left_targets, results)
884
885         # search upper half only if the upper rate does not exceed
886         # 100%, this only happens when the first search at 100%
887         # yields a DR that is < target DR
888         if middle >= 100:
889             self.__targets_found(100, right_targets, results)
890         else:
891             self.__range_search(middle, right, right_targets, results)
892
893     def __run_search_iteration(self, rate):
894         """Run one iteration at the given rate level.
895
896         rate: the rate to send on each port in percent (0 to 100)
897         """
898         self._modify_load(rate)
899
900         # poll interval stats and collect them
901         for stats in self.run_traffic():
902             self.interval_collector.add(stats)
903             time_elapsed_ratio = self.runner.time_elapsed() / self.run_config['duration_sec']
904             if time_elapsed_ratio >= 1:
905                 self.cancel_traffic()
906                 if not self.skip_sleep():
907                     time.sleep(self.config.pause_sec)
908         self.interval_collector.reset()
909
910         # get stats from the run
911         stats = self.runner.client.get_stats()
912         current_traffic_config = self._get_traffic_config()
913         warning = self.compare_tx_rates(current_traffic_config['direction-total']['rate_pps'],
914                                         stats['total_tx_rate'])
915         if warning is not None:
916             stats['warning'] = warning
917
918         # save reliable stats from whole iteration
919         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
920         LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
921         return stats, current_traffic_config['direction-total']
922
923     @staticmethod
924     def log_stats(stats):
925         """Log estimated stats during run."""
926         report = {
927             'datetime': str(datetime.now()),
928             'tx_packets': stats['overall']['tx']['total_pkts'],
929             'rx_packets': stats['overall']['rx']['total_pkts'],
930             'drop_packets': stats['overall']['rx']['dropped_pkts'],
931             'drop_rate_percent': stats['overall']['drop_rate_percent']
932         }
933         LOG.info('TX: %(tx_packets)d; '
934                  'RX: %(rx_packets)d; '
935                  'Est. Dropped: %(drop_packets)d; '
936                  'Est. Drop rate: %(drop_rate_percent).4f%%',
937                  report)
938
939     def run_traffic(self):
940         """Start traffic and return intermediate stats for each interval."""
941         stats = self.runner.run()
942         while self.runner.is_running:
943             self.log_stats(stats)
944             yield stats
945             stats = self.runner.poll_stats()
946             if stats is None:
947                 return
948         self.log_stats(stats)
949         LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
950         yield stats
951
952     def cancel_traffic(self):
953         """Stop traffic."""
954         self.runner.stop()
955
956     def _get_traffic_config(self):
957         config = {}
958         load_total = 0.0
959         bps_total = 0.0
960         pps_total = 0.0
961         for idx, rate in enumerate(self.run_config['rates']):
962             key = 'direction-forward' if idx == 0 else 'direction-reverse'
963             config[key] = {
964                 'l2frame_size': self.run_config['l2frame_size'],
965                 'duration_sec': self.run_config['duration_sec']
966             }
967             config[key].update(rate)
968             config[key].update(self.__convert_rates(rate))
969             load_total += float(config[key]['rate_percent'])
970             bps_total += float(config[key]['rate_bps'])
971             pps_total += float(config[key]['rate_pps'])
972         config['direction-total'] = dict(config['direction-forward'])
973         config['direction-total'].update({
974             'rate_percent': load_total,
975             'rate_pps': cast_integer(pps_total),
976             'rate_bps': bps_total
977         })
978
979         return config
980
981     def get_run_config(self, results):
982         """Return configuration which was used for the last run."""
983         r = {}
984         # because we want each direction to have the far end RX rates,
985         # use the far end index (1-idx) to retrieve the RX rates
986         for idx, key in enumerate(["direction-forward", "direction-reverse"]):
987             tx_rate = results["stats"][idx]["tx"]["total_pkts"] / self.config.duration_sec
988             rx_rate = results["stats"][1 - idx]["rx"]["total_pkts"] / self.config.duration_sec
989             r[key] = {
990                 "orig": self.__convert_rates(self.run_config['rates'][idx]),
991                 "tx": self.__convert_rates({'rate_pps': tx_rate}),
992                 "rx": self.__convert_rates({'rate_pps': rx_rate})
993             }
994
995         total = {}
996         for direction in ['orig', 'tx', 'rx']:
997             total[direction] = {}
998             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
999                 total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
1000
1001         r['direction-total'] = total
1002         return r
1003
1004     def insert_interface_stats(self, pps_list):
1005         """Insert interface stats to a list of packet path stats.
1006
1007         pps_list: a list of packet path stats instances indexed by chain index
1008
1009         This function will insert the packet path stats for the traffic gen ports 0 and 1
1010         with itemized per chain tx/rx counters.
1011         There will be as many packet path stats as chains.
1012         Each packet path stats will have exactly 2 InterfaceStats for port 0 and port 1
1013         self.pps_list:
1014         [
1015         PacketPathStats(InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)),
1016         PacketPathStats(InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)),
1017         ...
1018         ]
1019         """
1020         def get_if_stats(chain_idx):
1021             return [InterfaceStats('p' + str(port), self.tool)
1022                     for port in range(2)]
1023         # keep the list of list of interface stats indexed by the chain id
1024         self.ifstats = [get_if_stats(chain_idx)
1025                         for chain_idx in range(self.config.service_chain_count)]
1026         # note that we need to make a copy of the ifs list so that any modification in the
1027         # list from pps will not change the list saved in self.ifstats
1028         self.pps_list = [PacketPathStats(list(ifs)) for ifs in self.ifstats]
1029         # insert the corresponding pps in the passed list
1030         pps_list.extend(self.pps_list)
1031
1032     def update_interface_stats(self, diff=False):
1033         """Update all interface stats.
1034
1035         diff: if False, simply refresh the interface stats values with latest values
1036               if True, diff the interface stats with the latest values
1037         Make sure that the interface stats inserted in insert_interface_stats() are updated
1038         with proper values.
1039         self.ifstats:
1040         [
1041         [InterfaceStats(chain 0, port 0), InterfaceStats(chain 0, port 1)],
1042         [InterfaceStats(chain 1, port 0), InterfaceStats(chain 1, port 1)],
1043         ...
1044         ]
1045         """
1046         if diff:
1047             stats = self.gen.get_stats()
1048             for chain_idx, ifs in enumerate(self.ifstats):
1049                 # each ifs has exactly 2 InterfaceStats and 2 Latency instances
1050                 # corresponding to the
1051                 # port 0 and port 1 for the given chain_idx
1052                 # Note that we cannot use self.pps_list[chain_idx].if_stats to pick the
1053                 # interface stats for the pps because it could have been modified to contain
1054                 # additional interface stats
1055                 self.gen.get_stream_stats(stats, ifs, self.pps_list[chain_idx].latencies, chain_idx)
1056             # special handling for vxlan
1057             # in case of vxlan, flow stats are not available so all rx counters will be
1058             # zeros when the total rx port counter is non zero.
1059             # in that case,
1060             for port in range(2):
1061                 total_rx = 0
1062                 for ifs in self.ifstats:
1063                     total_rx += ifs[port].rx
1064                 if total_rx == 0:
1065                     # check if the total port rx from Trex is also zero
1066                     port_rx = stats[port]['rx']['total_pkts']
1067                     if port_rx:
1068                         # the total rx for all chains from port level stats is non zero
1069                         # which means that the per-chain stats are not available
1070                         if len(self.ifstats) == 1:
1071                             # only one chain, simply report the port level rx to the chain rx stats
1072                             self.ifstats[0][port].rx = port_rx
1073                         else:
1074                             for ifs in self.ifstats:
1075                                 # mark this data as unavailable
1076                                 ifs[port].rx = None
1077                             # pitch in the total rx only in the last chain pps
1078                             self.ifstats[-1][port].rx_total = port_rx
1079
1080     @staticmethod
1081     def compare_tx_rates(required, actual):
1082         """Compare the actual TX rate to the required TX rate."""
1083         threshold = 0.9
1084         are_different = False
1085         try:
1086             if float(actual) / required < threshold:
1087                 are_different = True
1088         except ZeroDivisionError:
1089             are_different = True
1090
1091         if are_different:
1092             msg = "WARNING: There is a significant difference between requested TX rate ({r}) " \
1093                   "and actual TX rate ({a}). The traffic generator may not have sufficient CPU " \
1094                   "to achieve the requested TX rate.".format(r=required, a=actual)
1095             LOG.info(msg)
1096             return msg
1097
1098         return None
1099
1100     def get_per_direction_rate(self):
1101         """Get the rate for each direction."""
1102         divisor = 2 if self.run_config['bidirectional'] else 1
1103         if 'rate_percent' in self.current_total_rate:
1104             # don't split rate if it's percentage
1105             divisor = 1
1106
1107         return utils.divide_rate(self.current_total_rate, divisor)
1108
1109     def close(self):
1110         """Close this instance."""
1111         try:
1112             self.gen.stop_traffic()
1113         except Exception:
1114             pass
1115         self.gen.clear_stats()
1116         self.gen.cleanup()