1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
20 from collections import defaultdict
21 from itertools import count
22 from nfvbench.log import LOG
23 from nfvbench.specs import ChainType
24 from nfvbench.traffic_server import TRexTrafficServer
25 from nfvbench.utils import cast_integer
26 from nfvbench.utils import timeout
27 from nfvbench.utils import TimeoutError
28 from traffic_base import AbstractTrafficGenerator
29 from traffic_base import TrafficGeneratorException
30 import traffic_utils as utils
32 # pylint: disable=import-error
33 from trex_stl_lib.api import CTRexVmInsFixHwCs
34 from trex_stl_lib.api import Dot1Q
35 from trex_stl_lib.api import Ether
36 from trex_stl_lib.api import IP
37 from trex_stl_lib.api import STLClient
38 from trex_stl_lib.api import STLError
39 from trex_stl_lib.api import STLFlowLatencyStats
40 from trex_stl_lib.api import STLFlowStats
41 from trex_stl_lib.api import STLPktBuilder
42 from trex_stl_lib.api import STLScVmRaw
43 from trex_stl_lib.api import STLStream
44 from trex_stl_lib.api import STLTXCont
45 from trex_stl_lib.api import STLVmFixChecksumHw
46 from trex_stl_lib.api import STLVmFlowVar
47 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
48 from trex_stl_lib.api import STLVmWrFlowVar
49 from trex_stl_lib.api import UDP
50 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
53 # pylint: enable=import-error
56 class TRex(AbstractTrafficGenerator):
59 def __init__(self, runner):
60 AbstractTrafficGenerator.__init__(self, runner)
63 self.latencies = defaultdict(list)
64 self.stream_ids = defaultdict(list)
66 self.streamblock = defaultdict(list)
69 self.capture_id = None
72 def get_version(self):
73 return self.client.get_server_version()
75 def extract_stats(self, in_stats):
76 utils.nan_replace(in_stats)
80 for ph in self.port_handle:
84 'total_pkts': cast_integer(stats['opackets']),
85 'total_pkt_bytes': cast_integer(stats['obytes']),
86 'pkt_rate': cast_integer(stats['tx_pps']),
87 'pkt_bit_rate': cast_integer(stats['tx_bps'])
90 'total_pkts': cast_integer(stats['ipackets']),
91 'total_pkt_bytes': cast_integer(stats['ibytes']),
92 'pkt_rate': cast_integer(stats['rx_pps']),
93 'pkt_bit_rate': cast_integer(stats['rx_bps']),
94 'dropped_pkts': cast_integer(
95 stats['opackets'] - stats['ipackets'])
99 lat = self.__combine_latencies(in_stats, ph)
100 result[ph]['rx']['max_delay_usec'] = cast_integer(
101 lat['total_max']) if 'total_max' in lat else float('nan')
102 result[ph]['rx']['min_delay_usec'] = cast_integer(
103 lat['total_min']) if 'total_min' in lat else float('nan')
104 result[ph]['rx']['avg_delay_usec'] = cast_integer(
105 lat['average']) if 'average' in lat else float('nan')
106 total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
107 result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
110 def __combine_latencies(self, in_stats, port_handle):
111 """Traverses TRex result dictionary and combines chosen latency stats."""
112 if not self.latencies[port_handle]:
115 result = defaultdict(float)
116 result['total_min'] = float("inf")
117 for lat_id in self.latencies[port_handle]:
118 lat = in_stats['latency'][lat_id]
119 result['dropped_pkts'] += lat['err_cntrs']['dropped']
120 result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
121 result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
122 result['average'] += lat['latency']['average']
124 result['average'] /= len(self.latencies[port_handle])
128 def create_pkt(self, stream_cfg, l2frame_size):
130 pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
131 if stream_cfg['vlan_tag'] is not None:
132 # 50 = 14 (Ethernet II) + 4 (Vlan tag) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
133 pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
134 l2payload_size = int(l2frame_size) - 50
136 # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
137 l2payload_size = int(l2frame_size) - 46
138 payload = 'x' * l2payload_size
140 if stream_cfg['udp_src_port']:
141 udp_args['sport'] = int(stream_cfg['udp_src_port'])
142 if stream_cfg['udp_dst_port']:
143 udp_args['dport'] = int(stream_cfg['udp_dst_port'])
144 pkt_base /= IP() / UDP(**udp_args)
146 if stream_cfg['ip_addrs_step'] == 'random':
147 src_fv = STLVmFlowVarRepetableRandom(
149 min_value=stream_cfg['ip_src_addr'],
150 max_value=stream_cfg['ip_src_addr_max'],
152 seed=random.randint(0, 32767),
153 limit=stream_cfg['ip_src_count'])
154 dst_fv = STLVmFlowVarRepetableRandom(
156 min_value=stream_cfg['ip_dst_addr'],
157 max_value=stream_cfg['ip_dst_addr_max'],
159 seed=random.randint(0, 32767),
160 limit=stream_cfg['ip_dst_count'])
162 src_fv = STLVmFlowVar(
164 min_value=stream_cfg['ip_src_addr'],
165 max_value=stream_cfg['ip_src_addr'],
168 step=stream_cfg['ip_addrs_step'])
169 dst_fv = STLVmFlowVar(
171 min_value=stream_cfg['ip_dst_addr'],
172 max_value=stream_cfg['ip_dst_addr_max'],
175 step=stream_cfg['ip_addrs_step'])
179 STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
181 STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
182 STLVmFixChecksumHw(l3_offset="IP",
184 l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
187 return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
189 def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
192 if l2frame == 'IMIX':
193 min_size = 64 if stream_cfg['vlan_tag'] is None else 68
194 self.adjust_imix_min_size(min_size)
195 for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
196 pkt = self.create_pkt(stream_cfg, l2_frame_size)
197 streams.append(STLStream(packet=pkt,
199 flow_stats=STLFlowStats(
200 pg_id=self.stream_ids[port_handle]),
201 mode=STLTXCont(pps=ratio)))
204 idx_lat = self.id.next()
205 sl = STLStream(packet=pkt,
207 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
208 mode=STLTXCont(pps=self.LATENCY_PPS))
211 pkt = self.create_pkt(stream_cfg, l2frame)
212 streams.append(STLStream(packet=pkt,
213 flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
217 idx_lat = self.id.next()
218 streams.append(STLStream(packet=pkt,
219 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
220 mode=STLTXCont(pps=self.LATENCY_PPS)))
223 self.latencies[port_handle].append(idx_lat)
231 def __connect(self, client):
234 def __connect_after_start(self):
235 # after start, Trex may take a bit of time to initialize
236 # so we need to retry a few times
237 for it in xrange(self.config.generic_retry_count):
240 self.client.connect()
242 except Exception as ex:
243 if it == (self.config.generic_retry_count - 1):
245 LOG.info("Retrying connection to TRex (%s)...", ex.message)
248 LOG.info("Connecting to TRex...")
249 server_ip = self.config.generator_config.ip
251 # Connect to TRex server
252 self.client = STLClient(server=server_ip)
254 self.__connect(self.client)
255 except (TimeoutError, STLError) as e:
256 if server_ip == '127.0.0.1':
258 self.__start_server()
259 self.__connect_after_start()
260 except (TimeoutError, STLError) as e:
261 LOG.error('Cannot connect to TRex')
262 LOG.error(traceback.format_exc())
263 logpath = '/tmp/trex.log'
264 if os.path.isfile(logpath):
265 # Wait for TRex to finish writing error message
267 for _ in xrange(self.config.generic_retry_count):
268 size = os.path.getsize(logpath)
269 if size == last_size:
270 # probably not writing anymore
274 with open(logpath, 'r') as f:
278 raise TrafficGeneratorException(message)
280 raise TrafficGeneratorException(e.message)
282 ports = list(self.config.generator_config.ports)
283 self.port_handle = ports
285 self.client.reset(ports)
288 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
293 def __set_l3_mode(self):
294 self.client.set_service_mode(ports=self.port_handle, enabled=True)
295 for port, device in zip(self.port_handle, self.config.generator_config.devices):
297 self.client.set_l3_mode(port=port,
298 src_ipv4=device.tg_gateway_ip,
299 dst_ipv4=device.dst.gateway_ip,
300 vlan=device.vlan_tag if device.vlan_tagging else None)
302 # TRex tries to resolve ARP already, doesn't have to be successful yet
304 self.client.set_service_mode(ports=self.port_handle, enabled=False)
306 def __set_l2_mode(self):
307 self.client.set_service_mode(ports=self.port_handle, enabled=True)
308 for port, device in zip(self.port_handle, self.config.generator_config.devices):
309 for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
310 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
311 self.client.set_service_mode(ports=self.port_handle, enabled=False)
313 def __start_server(self):
314 server = TRexTrafficServer()
315 server.run_server(self.config.generator_config, self.config.vlan_tagging)
317 def resolve_arp(self):
318 self.client.set_service_mode(ports=self.port_handle)
319 LOG.info('Polling ARP until successful')
322 for port, device in zip(self.port_handle, self.config.generator_config.devices):
323 ctx = self.client.create_service_ctx(port=port)
327 src_ip=cfg['ip_src_tg_gw'],
328 dst_ip=cfg['mac_discovery_gw'],
329 vlan=device.vlan_tag if device.vlan_tagging else None)
330 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
333 for _ in xrange(self.config.generic_retry_count):
338 LOG.error(traceback.format_exc())
341 self.arps[port] = [arp.get_record().dst_mac for arp in arps
342 if arp.get_record().dst_mac is not None]
344 if len(self.arps[port]) == self.config.service_chain_count:
346 LOG.info('ARP resolved successfully for port %s', port)
349 failed = [arp.get_record().dst_ip for arp in arps
350 if arp.get_record().dst_mac is None]
351 LOG.info('Retrying ARP for: %s (%d / %d)',
352 failed, attempt, self.config.generic_retry_count)
353 time.sleep(self.config.generic_poll_sec)
355 self.client.set_service_mode(ports=self.port_handle, enabled=False)
356 return resolved == len(self.port_handle)
358 def config_interface(self):
361 def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
362 """Check if rate provided by user is above requirements. Applies only if latency is True."""
363 intf_speed = self.config.generator_config.intf_speed
369 r = utils.convert_rates(l2frame_size, rate, intf_speed)
370 total_rate += int(r['rate_pps'])
373 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
374 # rate must be enough for latency stream and at least 1 pps for base stream per chain
375 required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
376 result = utils.convert_rates(l2frame_size,
377 {'rate_pps': required_rate},
379 result['result'] = total_rate >= required_rate
382 return {'result': True}
384 def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
385 r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
387 raise TrafficGeneratorException(
388 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
389 .format(pps=r['rate_pps'],
391 load=r['rate_percent']))
393 stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
394 for d in self.config.generator_config.devices]
395 self.rates = [utils.to_rate_str(rate) for rate in rates]
397 for ph in self.port_handle:
398 # generate one pg_id for each direction
399 self.stream_ids[ph] = self.id.next()
401 for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
402 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
403 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
404 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
406 self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
410 if len(self.rates) > 1:
411 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
415 latency=bidirectional and latency))
417 for ph in self.port_handle:
418 self.client.add_streams(self.streamblock[ph], ports=ph)
419 LOG.info('Created traffic stream for port %s.', ph)
421 def clear_streamblock(self):
422 self.streamblock = defaultdict(list)
423 self.latencies = defaultdict(list)
424 self.stream_ids = defaultdict(list)
426 self.client.reset(self.port_handle)
427 LOG.info('Cleared all existing streams.')
430 stats = self.client.get_stats()
431 return self.extract_stats(stats)
434 return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
436 def clear_stats(self):
438 self.client.clear_stats()
440 def start_traffic(self):
441 for port, rate in zip(self.port_handle, self.rates):
442 self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
444 def stop_traffic(self):
445 self.client.stop(ports=self.port_handle)
447 def start_capture(self):
450 self.client.set_service_mode(ports=self.port_handle)
451 self.capture_id = self.client.start_capture(rx_ports=self.port_handle)
453 def fetch_capture_packets(self):
455 self.packet_list = []
456 self.client.fetch_capture_packets(capture_id=self.capture_id['id'],
457 output=self.packet_list)
459 def stop_capture(self):
461 self.client.stop_capture(capture_id=self.capture_id['id'])
462 self.capture_id = None
463 self.client.set_service_mode(ports=self.port_handle, enabled=False)
468 self.client.reset(self.port_handle)
469 self.client.disconnect()
471 # TRex does not like a reset while in disconnected state