1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
20 from collections import defaultdict
21 from itertools import count
22 from nfvbench.log import LOG
23 from nfvbench.specs import ChainType
24 from nfvbench.traffic_server import TRexTrafficServer
25 from nfvbench.utils import cast_integer
26 from nfvbench.utils import timeout
27 from nfvbench.utils import TimeoutError
28 from traffic_base import AbstractTrafficGenerator
29 from traffic_base import TrafficGeneratorException
30 import traffic_utils as utils
32 # pylint: disable=import-error
33 from trex_stl_lib.api import CTRexVmInsFixHwCs
34 from trex_stl_lib.api import Dot1Q
35 from trex_stl_lib.api import Ether
36 from trex_stl_lib.api import IP
37 from trex_stl_lib.api import STLClient
38 from trex_stl_lib.api import STLError
39 from trex_stl_lib.api import STLFlowLatencyStats
40 from trex_stl_lib.api import STLFlowStats
41 from trex_stl_lib.api import STLPktBuilder
42 from trex_stl_lib.api import STLScVmRaw
43 from trex_stl_lib.api import STLStream
44 from trex_stl_lib.api import STLTXCont
45 from trex_stl_lib.api import STLVmFixChecksumHw
46 from trex_stl_lib.api import STLVmFlowVar
47 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
48 from trex_stl_lib.api import STLVmWrFlowVar
49 from trex_stl_lib.api import UDP
50 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
53 # pylint: enable=import-error
56 class TRex(AbstractTrafficGenerator):
59 def __init__(self, runner):
60 AbstractTrafficGenerator.__init__(self, runner)
63 self.latencies = defaultdict(list)
64 self.stream_ids = defaultdict(list)
66 self.streamblock = defaultdict(list)
69 self.capture_id = None
72 def get_version(self):
73 return self.client.get_server_version()
75 def extract_stats(self, in_stats):
76 utils.nan_replace(in_stats)
80 for ph in self.port_handle:
84 'total_pkts': cast_integer(stats['opackets']),
85 'total_pkt_bytes': cast_integer(stats['obytes']),
86 'pkt_rate': cast_integer(stats['tx_pps']),
87 'pkt_bit_rate': cast_integer(stats['tx_bps'])
90 'total_pkts': cast_integer(stats['ipackets']),
91 'total_pkt_bytes': cast_integer(stats['ibytes']),
92 'pkt_rate': cast_integer(stats['rx_pps']),
93 'pkt_bit_rate': cast_integer(stats['rx_bps']),
94 'dropped_pkts': cast_integer(
95 stats['opackets'] - stats['ipackets'])
99 lat = self.__combine_latencies(in_stats, ph)
100 result[ph]['rx']['max_delay_usec'] = cast_integer(
101 lat['total_max']) if 'total_max' in lat else float('nan')
102 result[ph]['rx']['min_delay_usec'] = cast_integer(
103 lat['total_min']) if 'total_min' in lat else float('nan')
104 result[ph]['rx']['avg_delay_usec'] = cast_integer(
105 lat['average']) if 'average' in lat else float('nan')
106 total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
107 result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
110 def __combine_latencies(self, in_stats, port_handle):
111 """Traverses TRex result dictionary and combines chosen latency stats."""
112 if not self.latencies[port_handle]:
115 result = defaultdict(float)
116 result['total_min'] = float("inf")
117 for lat_id in self.latencies[port_handle]:
118 lat = in_stats['latency'][lat_id]
119 result['dropped_pkts'] += lat['err_cntrs']['dropped']
120 result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
121 result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
122 result['average'] += lat['latency']['average']
124 result['average'] /= len(self.latencies[port_handle])
128 def create_pkt(self, stream_cfg, l2frame_size):
130 pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
131 if stream_cfg['vlan_tag'] is not None:
132 # 50 = 14 (Ethernet II) + 4 (Vlan tag) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
133 pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
134 l2payload_size = int(l2frame_size) - 50
136 # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
137 l2payload_size = int(l2frame_size) - 46
138 payload = 'x' * l2payload_size
140 if stream_cfg['udp_src_port']:
141 udp_args['sport'] = int(stream_cfg['udp_src_port'])
142 if stream_cfg['udp_dst_port']:
143 udp_args['dport'] = int(stream_cfg['udp_dst_port'])
144 pkt_base /= IP() / UDP(**udp_args)
146 if stream_cfg['ip_addrs_step'] == 'random':
147 src_fv = STLVmFlowVarRepetableRandom(
149 min_value=stream_cfg['ip_src_addr'],
150 max_value=stream_cfg['ip_src_addr_max'],
152 seed=random.randint(0, 32767),
153 limit=stream_cfg['ip_src_count'])
154 dst_fv = STLVmFlowVarRepetableRandom(
156 min_value=stream_cfg['ip_dst_addr'],
157 max_value=stream_cfg['ip_dst_addr_max'],
159 seed=random.randint(0, 32767),
160 limit=stream_cfg['ip_dst_count'])
162 src_fv = STLVmFlowVar(
164 min_value=stream_cfg['ip_src_addr'],
165 max_value=stream_cfg['ip_src_addr'],
168 step=stream_cfg['ip_addrs_step'])
169 dst_fv = STLVmFlowVar(
171 min_value=stream_cfg['ip_dst_addr'],
172 max_value=stream_cfg['ip_dst_addr_max'],
175 step=stream_cfg['ip_addrs_step'])
179 STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
181 STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
182 STLVmFixChecksumHw(l3_offset="IP",
184 l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
187 return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
189 def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
192 if l2frame == 'IMIX':
193 min_size = 64 if stream_cfg['vlan_tag'] is None else 68
194 self.adjust_imix_min_size(min_size)
195 for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
196 pkt = self.create_pkt(stream_cfg, l2_frame_size)
197 streams.append(STLStream(packet=pkt,
199 flow_stats=STLFlowStats(
200 pg_id=self.stream_ids[port_handle]),
201 mode=STLTXCont(pps=ratio)))
204 idx_lat = self.id.next()
205 pkt = self.create_pkt(stream_cfg, self.imix_avg_l2_size)
206 sl = STLStream(packet=pkt,
208 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
209 mode=STLTXCont(pps=self.LATENCY_PPS))
212 pkt = self.create_pkt(stream_cfg, l2frame)
213 streams.append(STLStream(packet=pkt,
214 flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
218 idx_lat = self.id.next()
219 streams.append(STLStream(packet=pkt,
220 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
221 mode=STLTXCont(pps=self.LATENCY_PPS)))
224 self.latencies[port_handle].append(idx_lat)
232 def __connect(self, client):
235 def __connect_after_start(self):
236 # after start, Trex may take a bit of time to initialize
237 # so we need to retry a few times
238 for it in xrange(self.config.generic_retry_count):
241 self.client.connect()
243 except Exception as ex:
244 if it == (self.config.generic_retry_count - 1):
246 LOG.info("Retrying connection to TRex (%s)...", ex.message)
249 LOG.info("Connecting to TRex...")
250 server_ip = self.config.generator_config.ip
252 # Connect to TRex server
253 self.client = STLClient(server=server_ip)
255 self.__connect(self.client)
256 except (TimeoutError, STLError) as e:
257 if server_ip == '127.0.0.1':
259 self.__start_server()
260 self.__connect_after_start()
261 except (TimeoutError, STLError) as e:
262 LOG.error('Cannot connect to TRex')
263 LOG.error(traceback.format_exc())
264 logpath = '/tmp/trex.log'
265 if os.path.isfile(logpath):
266 # Wait for TRex to finish writing error message
268 for _ in xrange(self.config.generic_retry_count):
269 size = os.path.getsize(logpath)
270 if size == last_size:
271 # probably not writing anymore
275 with open(logpath, 'r') as f:
279 raise TrafficGeneratorException(message)
281 raise TrafficGeneratorException(e.message)
283 ports = list(self.config.generator_config.ports)
284 self.port_handle = ports
286 self.client.reset(ports)
289 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
294 def __set_l3_mode(self):
295 self.client.set_service_mode(ports=self.port_handle, enabled=True)
296 for port, device in zip(self.port_handle, self.config.generator_config.devices):
298 self.client.set_l3_mode(port=port,
299 src_ipv4=device.tg_gateway_ip,
300 dst_ipv4=device.dst.gateway_ip,
301 vlan=device.vlan_tag if device.vlan_tagging else None)
303 # TRex tries to resolve ARP already, doesn't have to be successful yet
305 self.client.set_service_mode(ports=self.port_handle, enabled=False)
307 def __set_l2_mode(self):
308 self.client.set_service_mode(ports=self.port_handle, enabled=True)
309 for port, device in zip(self.port_handle, self.config.generator_config.devices):
310 for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
311 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
312 self.client.set_service_mode(ports=self.port_handle, enabled=False)
314 def __start_server(self):
315 server = TRexTrafficServer()
316 server.run_server(self.config.generator_config, self.config.vlan_tagging)
318 def resolve_arp(self):
319 self.client.set_service_mode(ports=self.port_handle)
320 LOG.info('Polling ARP until successful')
323 for port, device in zip(self.port_handle, self.config.generator_config.devices):
324 ctx = self.client.create_service_ctx(port=port)
328 src_ip=cfg['ip_src_tg_gw'],
329 dst_ip=cfg['mac_discovery_gw'],
330 vlan=device.vlan_tag if device.vlan_tagging else None)
331 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
334 for _ in xrange(self.config.generic_retry_count):
339 LOG.error(traceback.format_exc())
342 self.arps[port] = [arp.get_record().dst_mac for arp in arps
343 if arp.get_record().dst_mac is not None]
345 if len(self.arps[port]) == self.config.service_chain_count:
347 LOG.info('ARP resolved successfully for port %s', port)
350 failed = [arp.get_record().dst_ip for arp in arps
351 if arp.get_record().dst_mac is None]
352 LOG.info('Retrying ARP for: %s (%d / %d)',
353 failed, attempt, self.config.generic_retry_count)
354 time.sleep(self.config.generic_poll_sec)
356 self.client.set_service_mode(ports=self.port_handle, enabled=False)
357 return resolved == len(self.port_handle)
359 def config_interface(self):
362 def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
363 """Check if rate provided by user is above requirements. Applies only if latency is True."""
364 intf_speed = self.config.generator_config.intf_speed
370 r = utils.convert_rates(l2frame_size, rate, intf_speed)
371 total_rate += int(r['rate_pps'])
374 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
375 # rate must be enough for latency stream and at least 1 pps for base stream per chain
376 required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
377 result = utils.convert_rates(l2frame_size,
378 {'rate_pps': required_rate},
380 result['result'] = total_rate >= required_rate
383 return {'result': True}
385 def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
386 r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
388 raise TrafficGeneratorException(
389 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
390 .format(pps=r['rate_pps'],
392 load=r['rate_percent']))
394 stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
395 for d in self.config.generator_config.devices]
396 self.rates = [utils.to_rate_str(rate) for rate in rates]
398 for ph in self.port_handle:
399 # generate one pg_id for each direction
400 self.stream_ids[ph] = self.id.next()
402 for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
403 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
404 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
405 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
407 self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
411 if len(self.rates) > 1:
412 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
416 latency=bidirectional and latency))
418 for ph in self.port_handle:
419 self.client.add_streams(self.streamblock[ph], ports=ph)
420 LOG.info('Created traffic stream for port %s.', ph)
422 def clear_streamblock(self):
423 self.streamblock = defaultdict(list)
424 self.latencies = defaultdict(list)
425 self.stream_ids = defaultdict(list)
427 self.client.reset(self.port_handle)
428 LOG.info('Cleared all existing streams.')
431 stats = self.client.get_stats()
432 return self.extract_stats(stats)
435 return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
437 def clear_stats(self):
439 self.client.clear_stats()
441 def start_traffic(self):
442 for port, rate in zip(self.port_handle, self.rates):
443 self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
445 def stop_traffic(self):
446 self.client.stop(ports=self.port_handle)
448 def start_capture(self):
451 self.client.set_service_mode(ports=self.port_handle)
452 self.capture_id = self.client.start_capture(rx_ports=self.port_handle)
454 def fetch_capture_packets(self):
456 self.packet_list = []
457 self.client.fetch_capture_packets(capture_id=self.capture_id['id'],
458 output=self.packet_list)
460 def stop_capture(self):
462 self.client.stop_capture(capture_id=self.capture_id['id'])
463 self.capture_id = None
464 self.client.set_service_mode(ports=self.port_handle, enabled=False)
469 self.client.reset(self.port_handle)
470 self.client.disconnect()
472 # TRex does not like a reset while in disconnected state