1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 from collections import defaultdict
16 from itertools import count
17 from nfvbench.log import LOG
18 from nfvbench.specs import ChainType
19 from nfvbench.traffic_server import TRexTrafficServer
20 from nfvbench.utils import cast_integer
21 from nfvbench.utils import timeout
22 from nfvbench.utils import TimeoutError
27 from traffic_base import AbstractTrafficGenerator
28 from traffic_base import TrafficGeneratorException
29 import traffic_utils as utils
31 from trex_stl_lib.api import CTRexVmInsFixHwCs
32 from trex_stl_lib.api import Dot1Q
33 from trex_stl_lib.api import Ether
34 from trex_stl_lib.api import IP
35 from trex_stl_lib.api import STLClient
36 from trex_stl_lib.api import STLError
37 from trex_stl_lib.api import STLFlowLatencyStats
38 from trex_stl_lib.api import STLFlowStats
39 from trex_stl_lib.api import STLPktBuilder
40 from trex_stl_lib.api import STLScVmRaw
41 from trex_stl_lib.api import STLStream
42 from trex_stl_lib.api import STLTXCont
43 from trex_stl_lib.api import STLVmFixChecksumHw
44 from trex_stl_lib.api import STLVmFlowVar
45 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
46 from trex_stl_lib.api import STLVmWrFlowVar
47 from trex_stl_lib.api import UDP
48 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
51 class TRex(AbstractTrafficGenerator):
54 def __init__(self, runner):
55 AbstractTrafficGenerator.__init__(self, runner)
58 self.latencies = defaultdict(list)
59 self.stream_ids = defaultdict(list)
61 self.streamblock = defaultdict(list)
65 def get_version(self):
66 return self.client.get_server_version()
68 def extract_stats(self, in_stats):
69 utils.nan_replace(in_stats)
73 for ph in self.port_handle:
74 stats = self.__combine_stats(in_stats, ph)
77 'total_pkts': cast_integer(stats['tx_pkts']['total']),
78 'total_pkt_bytes': cast_integer(stats['tx_bytes']['total']),
79 'pkt_rate': cast_integer(stats['tx_pps']['total']),
80 'pkt_bit_rate': cast_integer(stats['tx_bps']['total'])
83 'total_pkts': cast_integer(stats['rx_pkts']['total']),
84 'total_pkt_bytes': cast_integer(stats['rx_bytes']['total']),
85 'pkt_rate': cast_integer(stats['rx_pps']['total']),
86 'pkt_bit_rate': cast_integer(stats['rx_bps']['total']),
87 'dropped_pkts': cast_integer(
88 stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
92 lat = self.__combine_latencies(in_stats, ph)
93 result[ph]['rx']['max_delay_usec'] = cast_integer(
94 lat['total_max']) if 'total_max' in lat else float('nan')
95 result[ph]['rx']['min_delay_usec'] = cast_integer(
96 lat['total_min']) if 'total_min' in lat else float('nan')
97 result[ph]['rx']['avg_delay_usec'] = cast_integer(
98 lat['average']) if 'average' in lat else float(
100 total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
101 result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
104 def __combine_stats(self, in_stats, port_handle):
105 """Traverses TRex result dictionary and combines stream stats. Used for combining latency
106 and regular streams together.
108 result = defaultdict(lambda: defaultdict(float))
110 for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
111 record = in_stats['flow_stats'][pg_id]
112 for stat_type, stat_type_values in record.iteritems():
113 for ph, value in stat_type_values.iteritems():
114 result[stat_type][ph] += value
118 def __combine_latencies(self, in_stats, port_handle):
119 """Traverses TRex result dictionary and combines chosen latency stats."""
120 if not len(self.latencies[port_handle]):
123 result = defaultdict(float)
124 result['total_min'] = float("inf")
125 for lat_id in self.latencies[port_handle]:
126 lat = in_stats['latency'][lat_id]
127 result['dropped_pkts'] += lat['err_cntrs']['dropped']
128 result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
129 result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
130 result['average'] += lat['latency']['average']
132 result['average'] /= len(self.latencies[port_handle])
136 def create_pkt(self, stream_cfg, l2frame_size):
137 # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
138 payload = 'x' * (max(64, int(l2frame_size)) - 46)
140 pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
142 if stream_cfg['vlan_tag'] is not None:
143 pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
146 if stream_cfg['udp_src_port']:
147 udp_args['sport'] = int(stream_cfg['udp_src_port'])
148 if stream_cfg['udp_dst_port']:
149 udp_args['dport'] = int(stream_cfg['udp_dst_port'])
150 pkt_base /= IP() / UDP(**udp_args)
152 if stream_cfg['ip_addrs_step'] == 'random':
153 src_fv = STLVmFlowVarRepetableRandom(
155 min_value=stream_cfg['ip_src_addr'],
156 max_value=stream_cfg['ip_src_addr_max'],
158 seed=random.randint(0, 32767),
159 limit=stream_cfg['ip_src_count'])
160 dst_fv = STLVmFlowVarRepetableRandom(
162 min_value=stream_cfg['ip_dst_addr'],
163 max_value=stream_cfg['ip_dst_addr_max'],
165 seed=random.randint(0, 32767),
166 limit=stream_cfg['ip_dst_count'])
168 src_fv = STLVmFlowVar(
170 min_value=stream_cfg['ip_src_addr'],
171 max_value=stream_cfg['ip_src_addr'],
174 step=stream_cfg['ip_addrs_step'])
175 dst_fv = STLVmFlowVar(
177 min_value=stream_cfg['ip_dst_addr'],
178 max_value=stream_cfg['ip_dst_addr_max'],
181 step=stream_cfg['ip_addrs_step'])
185 STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
187 STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
188 STLVmFixChecksumHw(l3_offset="IP",
190 l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
193 return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
195 def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
198 if l2frame == 'IMIX':
199 for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
200 pkt = self.create_pkt(stream_cfg, l2_frame_size)
201 streams.append(STLStream(packet=pkt,
203 flow_stats=STLFlowStats(
204 pg_id=self.stream_ids[port_handle]),
205 mode=STLTXCont(pps=ratio)))
208 idx_lat = self.id.next()
209 sl = STLStream(packet=pkt,
211 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
212 mode=STLTXCont(pps=self.LATENCY_PPS))
215 pkt = self.create_pkt(stream_cfg, l2frame)
216 streams.append(STLStream(packet=pkt,
217 flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
221 idx_lat = self.id.next()
222 streams.append(STLStream(packet=pkt,
223 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
224 mode=STLTXCont(pps=self.LATENCY_PPS)))
227 self.latencies[port_handle].append(idx_lat)
235 def __connect(self, client):
238 def __connect_after_start(self):
239 # after start, Trex may take a bit of time to initialize
240 # so we need to retry a few times
241 for it in xrange(self.config.generic_retry_count):
244 self.client.connect()
246 except Exception as ex:
247 if it == (self.config.generic_retry_count - 1):
249 LOG.info("Retrying connection to TRex (%s)...", ex.message)
252 LOG.info("Connecting to TRex...")
253 server_ip = self.config.generator_config.ip
255 # Connect to TRex server
256 self.client = STLClient(server=server_ip)
258 self.__connect(self.client)
259 except (TimeoutError, STLError) as e:
260 if server_ip == '127.0.0.1':
262 self.__start_server()
263 self.__connect_after_start()
264 except (TimeoutError, STLError) as e:
265 LOG.error('Cannot connect to TRex')
266 LOG.error(traceback.format_exc())
267 logpath = '/tmp/trex.log'
268 if os.path.isfile(logpath):
269 # Wait for TRex to finish writing error message
271 for it in xrange(self.config.generic_retry_count):
272 size = os.path.getsize(logpath)
273 if size == last_size:
274 # probably not writing anymore
278 with open(logpath, 'r') as f:
282 raise TrafficGeneratorException(message)
284 raise TrafficGeneratorException(e.message)
286 ports = list(self.config.generator_config.ports)
287 self.port_handle = ports
289 self.client.reset(ports)
292 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
297 def __set_l3_mode(self):
298 self.client.set_service_mode(ports=self.port_handle, enabled=True)
299 for port, device in zip(self.port_handle, self.config.generator_config.devices):
301 self.client.set_l3_mode(port=port,
302 src_ipv4=device.tg_gateway_ip,
303 dst_ipv4=device.dst.gateway_ip,
304 vlan=device.vlan_tag if device.vlan_tagging else None)
306 # TRex tries to resolve ARP already, doesn't have to be successful yet
308 self.client.set_service_mode(ports=self.port_handle, enabled=False)
310 def __set_l2_mode(self):
311 self.client.set_service_mode(ports=self.port_handle, enabled=True)
312 for port, device in zip(self.port_handle, self.config.generator_config.devices):
313 for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
314 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
315 self.client.set_service_mode(ports=self.port_handle, enabled=False)
317 def __start_server(self):
318 server = TRexTrafficServer()
319 server.run_server(self.config.generator_config)
321 def resolve_arp(self):
322 self.client.set_service_mode(ports=self.port_handle)
323 LOG.info('Polling ARP until successful')
326 for port, device in zip(self.port_handle, self.config.generator_config.devices):
327 ctx = self.client.create_service_ctx(port=port)
331 src_ip=cfg['ip_src_tg_gw'],
332 dst_ip=cfg['mac_discovery_gw'],
333 vlan=device.vlan_tag if device.vlan_tagging else None)
334 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
337 for _ in xrange(self.config.generic_retry_count):
342 LOG.error(traceback.format_exc())
345 self.arps[port] = [arp.get_record().dst_mac for arp in arps
346 if arp.get_record().dst_mac is not None]
348 if len(self.arps[port]) == self.config.service_chain_count:
350 LOG.info('ARP resolved successfully for port {}'.format(port))
353 failed = [arp.get_record().dst_ip for arp in arps
354 if arp.get_record().dst_mac is None]
355 LOG.info('Retrying ARP for: {} ({} / {})'.format(
356 failed, attempt, self.config.generic_retry_count))
357 time.sleep(self.config.generic_poll_sec)
359 self.client.set_service_mode(ports=self.port_handle, enabled=False)
360 return resolved == len(self.port_handle)
362 def config_interface(self):
365 def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
366 """Check if rate provided by user is above requirements. Applies only if latency is True."""
367 intf_speed = self.config.generator_config.intf_speed
373 r = utils.convert_rates(l2frame_size, rate, intf_speed)
374 total_rate += int(r['rate_pps'])
377 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
378 # rate must be enough for latency stream and at least 1 pps for base stream per chain
379 required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
380 result = utils.convert_rates(l2frame_size,
381 {'rate_pps': required_rate},
383 result['result'] = total_rate >= required_rate
386 return {'result': True}
388 def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
389 r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
391 raise TrafficGeneratorException(
392 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
393 .format(pps=r['rate_pps'],
395 load=r['rate_percent']))
397 stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
398 for d in self.config.generator_config.devices]
399 self.rates = map(lambda rate: utils.to_rate_str(rate), rates)
401 for ph in self.port_handle:
402 # generate one pg_id for each direction
403 self.stream_ids[ph] = self.id.next()
405 for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
406 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
407 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
408 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
410 self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
414 if len(self.rates) > 1:
415 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
419 latency=bidirectional and latency))
421 for ph in self.port_handle:
422 self.client.add_streams(self.streamblock[ph], ports=ph)
423 LOG.info('Created traffic stream for port %s.' % ph)
425 def modify_rate(self, rate, reverse):
426 port_index = int(reverse)
427 port = self.port_handle[port_index]
428 self.rates[port_index] = utils.to_rate_str(rate)
429 LOG.info('Modified traffic stream for %s, new rate=%s.' % (port, utils.to_rate_str(rate)))
431 def clear_streamblock(self):
432 self.streamblock = defaultdict(list)
433 self.latencies = defaultdict(list)
434 self.stream_ids = defaultdict(list)
436 self.client.reset(self.port_handle)
437 LOG.info('Cleared all existing streams.')
440 stats = self.client.get_pgid_stats()
441 return self.extract_stats(stats)
444 return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
446 def clear_stats(self):
448 self.client.clear_stats()
450 def start_traffic(self):
451 for port, rate in zip(self.port_handle, self.rates):
452 self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
454 def stop_traffic(self):
455 self.client.stop(ports=self.port_handle)
460 self.client.reset(self.port_handle)
461 self.client.disconnect()
463 # TRex does not like a reset while in disconnected state