1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 from collections import defaultdict
16 from itertools import count
17 from nfvbench.log import LOG
18 from nfvbench.specs import ChainType
19 from nfvbench.traffic_server import TRexTrafficServer
20 from nfvbench.utils import timeout
21 from nfvbench.utils import TimeoutError
26 from traffic_base import AbstractTrafficGenerator
27 from traffic_base import TrafficGeneratorException
28 import traffic_utils as utils
30 from trex_stl_lib.api import CTRexVmInsFixHwCs
31 from trex_stl_lib.api import Dot1Q
32 from trex_stl_lib.api import Ether
33 from trex_stl_lib.api import IP
34 from trex_stl_lib.api import STLClient
35 from trex_stl_lib.api import STLError
36 from trex_stl_lib.api import STLFlowLatencyStats
37 from trex_stl_lib.api import STLFlowStats
38 from trex_stl_lib.api import STLPktBuilder
39 from trex_stl_lib.api import STLScVmRaw
40 from trex_stl_lib.api import STLStream
41 from trex_stl_lib.api import STLTXCont
42 from trex_stl_lib.api import STLVmFixChecksumHw
43 from trex_stl_lib.api import STLVmFlowVar
44 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
45 from trex_stl_lib.api import STLVmWrFlowVar
46 from trex_stl_lib.api import UDP
47 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
50 class TRex(AbstractTrafficGenerator):
53 def __init__(self, runner):
54 AbstractTrafficGenerator.__init__(self, runner)
57 self.latencies = defaultdict(list)
58 self.stream_ids = defaultdict(list)
60 self.streamblock = defaultdict(list)
64 def get_version(self):
65 return self.client.get_server_version()
67 def extract_stats(self, in_stats):
68 utils.nan_replace(in_stats)
72 for ph in self.port_handle:
73 stats = self.__combine_stats(in_stats, ph)
76 'total_pkts': int(stats['tx_pkts']['total']),
77 'total_pkt_bytes': int(stats['tx_bytes']['total']),
78 'pkt_rate': int(stats['tx_pps']['total']),
79 'pkt_bit_rate': int(stats['tx_bps']['total'])
82 'total_pkts': int(stats['rx_pkts']['total']),
83 'total_pkt_bytes': int(stats['rx_bytes']['total']),
84 'pkt_rate': int(stats['rx_pps']['total']),
85 'pkt_bit_rate': int(stats['rx_bps']['total']),
86 'dropped_pkts': int(stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
90 lat = self.__combine_latencies(in_stats, ph)
91 result[ph]['rx']['max_delay_usec'] = int(
92 lat['total_max']) if 'total_max' in lat else float('nan')
93 result[ph]['rx']['min_delay_usec'] = int(
94 lat['total_min']) if 'total_min' in lat else float('nan')
95 result[ph]['rx']['avg_delay_usec'] = int(
96 lat['average']) if 'average' in lat else float(
98 total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
99 result["total_tx_rate"] = int(total_tx_pkts / self.config.duration_sec)
102 def __combine_stats(self, in_stats, port_handle):
103 """Traverses TRex result dictionary and combines stream stats. Used for combining latency
104 and regular streams together.
106 result = defaultdict(lambda: defaultdict(float))
108 for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
109 record = in_stats['flow_stats'][pg_id]
110 for stat_type, stat_type_values in record.iteritems():
111 for ph, value in stat_type_values.iteritems():
112 result[stat_type][ph] += value
116 def __combine_latencies(self, in_stats, port_handle):
117 """Traverses TRex result dictionary and combines chosen latency stats."""
118 if not len(self.latencies[port_handle]):
121 result = defaultdict(float)
122 result['total_min'] = float("inf")
123 for lat_id in self.latencies[port_handle]:
124 lat = in_stats['latency'][lat_id]
125 result['dropped_pkts'] += lat['err_cntrs']['dropped']
126 result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
127 result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
128 result['average'] += lat['latency']['average']
130 result['average'] /= len(self.latencies[port_handle])
134 def create_pkt(self, stream_cfg, l2frame_size):
135 # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
136 payload = 'x' * (max(64, int(l2frame_size)) - 46)
138 pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
140 if stream_cfg['vlan_tag'] is not None:
141 pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
144 if stream_cfg['udp_src_port']:
145 udp_args['sport'] = int(stream_cfg['udp_src_port'])
146 if stream_cfg['udp_dst_port']:
147 udp_args['dport'] = int(stream_cfg['udp_dst_port'])
148 pkt_base /= IP() / UDP(**udp_args)
150 if stream_cfg['ip_addrs_step'] == 'random':
151 src_fv = STLVmFlowVarRepetableRandom(
153 min_value=stream_cfg['ip_src_addr'],
154 max_value=stream_cfg['ip_src_addr_max'],
156 seed=random.randint(0, 32767),
157 limit=stream_cfg['ip_src_count'])
158 dst_fv = STLVmFlowVarRepetableRandom(
160 min_value=stream_cfg['ip_dst_addr'],
161 max_value=stream_cfg['ip_dst_addr_max'],
163 seed=random.randint(0, 32767),
164 limit=stream_cfg['ip_dst_count'])
166 src_fv = STLVmFlowVar(
168 min_value=stream_cfg['ip_src_addr'],
169 max_value=stream_cfg['ip_src_addr'],
172 step=stream_cfg['ip_addrs_step'])
173 dst_fv = STLVmFlowVar(
175 min_value=stream_cfg['ip_dst_addr'],
176 max_value=stream_cfg['ip_dst_addr_max'],
179 step=stream_cfg['ip_addrs_step'])
183 STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
185 STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
186 STLVmFixChecksumHw(l3_offset="IP",
188 l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
191 return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
193 def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
196 if l2frame == 'IMIX':
197 for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
198 pkt = self.create_pkt(stream_cfg, l2_frame_size)
199 streams.append(STLStream(packet=pkt,
201 flow_stats=STLFlowStats(
202 pg_id=self.stream_ids[port_handle]),
203 mode=STLTXCont(pps=ratio)))
206 idx_lat = self.id.next()
207 sl = STLStream(packet=pkt,
209 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
210 mode=STLTXCont(pps=self.LATENCY_PPS))
213 pkt = self.create_pkt(stream_cfg, l2frame)
214 streams.append(STLStream(packet=pkt,
215 flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
219 idx_lat = self.id.next()
220 streams.append(STLStream(packet=pkt,
221 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
222 mode=STLTXCont(pps=self.LATENCY_PPS)))
225 self.latencies[port_handle].append(idx_lat)
233 def __connect(self, client):
236 def __connect_after_start(self):
237 # after start, Trex may take a bit of time to initialize
238 # so we need to retry a few times
239 for it in xrange(self.config.generic_retry_count):
242 self.client.connect()
244 except Exception as ex:
245 if it == (self.config.generic_retry_count - 1):
247 LOG.info("Retrying connection to TRex (%s)...", ex.message)
250 LOG.info("Connecting to TRex...")
251 server_ip = self.config.generator_config.ip
253 # Connect to TRex server
254 self.client = STLClient(server=server_ip)
256 self.__connect(self.client)
257 except (TimeoutError, STLError) as e:
258 if server_ip == '127.0.0.1':
260 self.__start_server()
261 self.__connect_after_start()
262 except (TimeoutError, STLError) as e:
263 LOG.error('Cannot connect to TRex')
264 LOG.error(traceback.format_exc())
265 logpath = '/tmp/trex.log'
266 if os.path.isfile(logpath):
267 # Wait for TRex to finish writing error message
269 for it in xrange(self.config.generic_retry_count):
270 size = os.path.getsize(logpath)
271 if size == last_size:
272 # probably not writing anymore
276 with open(logpath, 'r') as f:
280 raise TrafficGeneratorException(message)
282 raise TrafficGeneratorException(e.message)
284 ports = list(self.config.generator_config.ports)
285 self.port_handle = ports
287 self.client.reset(ports)
290 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
295 def __set_l3_mode(self):
296 self.client.set_service_mode(ports=self.port_handle, enabled=True)
297 for port, device in zip(self.port_handle, self.config.generator_config.devices):
299 self.client.set_l3_mode(port=port,
300 src_ipv4=device.tg_gateway_ip,
301 dst_ipv4=device.dst.gateway_ip,
302 vlan=device.vlan_tag if device.vlan_tagging else None)
304 # TRex tries to resolve ARP already, doesn't have to be successful yet
306 self.client.set_service_mode(ports=self.port_handle, enabled=False)
308 def __set_l2_mode(self):
309 self.client.set_service_mode(ports=self.port_handle, enabled=True)
310 for port, device in zip(self.port_handle, self.config.generator_config.devices):
311 for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
312 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
313 self.client.set_service_mode(ports=self.port_handle, enabled=False)
315 def __start_server(self):
316 server = TRexTrafficServer()
317 server.run_server(self.config.generator_config)
319 def resolve_arp(self):
320 self.client.set_service_mode(ports=self.port_handle)
321 LOG.info('Polling ARP until successful')
324 for port, device in zip(self.port_handle, self.config.generator_config.devices):
325 ctx = self.client.create_service_ctx(port=port)
329 src_ip=cfg['ip_src_tg_gw'],
330 dst_ip=cfg['mac_discovery_gw'],
331 vlan=device.vlan_tag if device.vlan_tagging else None)
332 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
335 for _ in xrange(self.config.generic_retry_count):
340 LOG.error(traceback.format_exc())
343 self.arps[port] = [arp.get_record().dst_mac for arp in arps
344 if arp.get_record().dst_mac is not None]
346 if len(self.arps[port]) == self.config.service_chain_count:
348 LOG.info('ARP resolved successfully for port {}'.format(port))
351 failed = [arp.get_record().dst_ip for arp in arps
352 if arp.get_record().dst_mac is None]
353 LOG.info('Retrying ARP for: {} ({} / {})'.format(
354 failed, attempt, self.config.generic_retry_count))
355 time.sleep(self.config.generic_poll_sec)
357 self.client.set_service_mode(ports=self.port_handle, enabled=False)
358 return resolved == len(self.port_handle)
360 def config_interface(self):
363 def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
364 """Check if rate provided by user is above requirements. Applies only if latency is True."""
365 intf_speed = self.config.generator_config.intf_speed
371 r = utils.convert_rates(l2frame_size, rate, intf_speed)
372 total_rate += int(r['rate_pps'])
375 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
376 # rate must be enough for latency stream and at least 1 pps for base stream per chain
377 required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
378 result = utils.convert_rates(l2frame_size,
379 {'rate_pps': required_rate},
381 result['result'] = total_rate >= required_rate
384 return {'result': True}
386 def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
387 r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
389 raise TrafficGeneratorException(
390 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
391 .format(pps=r['rate_pps'],
393 load=r['rate_percent']))
395 stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
396 for d in self.config.generator_config.devices]
397 self.rates = map(lambda rate: utils.to_rate_str(rate), rates)
399 for ph in self.port_handle:
400 # generate one pg_id for each direction
401 self.stream_ids[ph] = self.id.next()
403 for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
404 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
405 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
406 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
408 self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
412 if len(self.rates) > 1:
413 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
417 latency=bidirectional and latency))
419 for ph in self.port_handle:
420 self.client.add_streams(self.streamblock[ph], ports=ph)
421 LOG.info('Created traffic stream for port %s.' % ph)
423 def modify_rate(self, rate, reverse):
424 port_index = int(reverse)
425 port = self.port_handle[port_index]
426 self.rates[port_index] = utils.to_rate_str(rate)
427 LOG.info('Modified traffic stream for %s, new rate=%s.' % (port, utils.to_rate_str(rate)))
429 def clear_streamblock(self):
430 self.streamblock = defaultdict(list)
431 self.latencies = defaultdict(list)
432 self.stream_ids = defaultdict(list)
434 self.client.reset(self.port_handle)
435 LOG.info('Cleared all existing streams.')
438 stats = self.client.get_pgid_stats()
439 return self.extract_stats(stats)
442 return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
444 def clear_stats(self):
446 self.client.clear_stats()
448 def start_traffic(self):
449 for port, rate in zip(self.port_handle, self.rates):
450 self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
452 def stop_traffic(self):
453 self.client.stop(ports=self.port_handle)
458 self.client.reset(self.port_handle)
459 self.client.disconnect()
461 # TRex does not like a reset while in disconnected state