1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
15 from collections import defaultdict
16 from itertools import count
17 from nfvbench.log import LOG
18 from nfvbench.specs import ChainType
19 from nfvbench.traffic_server import TRexTrafficServer
20 from nfvbench.utils import timeout
21 from nfvbench.utils import TimeoutError
26 from traffic_base import AbstractTrafficGenerator
27 from traffic_base import TrafficGeneratorException
28 import traffic_utils as utils
31 from trex_stl_lib.api import CTRexVmInsFixHwCs
32 from trex_stl_lib.api import Dot1Q
33 from trex_stl_lib.api import Ether
34 from trex_stl_lib.api import IP
35 from trex_stl_lib.api import STLClient
36 from trex_stl_lib.api import STLError
37 from trex_stl_lib.api import STLFlowLatencyStats
38 from trex_stl_lib.api import STLFlowStats
39 from trex_stl_lib.api import STLPktBuilder
40 from trex_stl_lib.api import STLScVmRaw
41 from trex_stl_lib.api import STLStream
42 from trex_stl_lib.api import STLTXCont
43 from trex_stl_lib.api import STLVmFixChecksumHw
44 from trex_stl_lib.api import STLVmFlowVar
45 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
46 from trex_stl_lib.api import STLVmWrFlowVar
47 from trex_stl_lib.api import UDP
48 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
51 class TRex(AbstractTrafficGenerator):
55 def __init__(self, runner):
56 AbstractTrafficGenerator.__init__(self, runner)
59 self.latencies = defaultdict(list)
60 self.stream_ids = defaultdict(list)
62 self.streamblock = defaultdict(list)
66 def get_version(self):
67 return self.client.get_server_version()
69 def extract_stats(self, in_stats):
70 utils.nan_replace(in_stats)
74 for ph in self.port_handle:
75 stats = self.__combine_stats(in_stats, ph)
78 'total_pkts': stats['tx_pkts']['total'],
79 'total_pkt_bytes': stats['tx_bytes']['total'],
80 'pkt_rate': stats['tx_pps']['total'],
81 'pkt_bit_rate': stats['tx_bps']['total']
84 'total_pkts': stats['rx_pkts']['total'],
85 'total_pkt_bytes': stats['rx_bytes']['total'],
86 'pkt_rate': stats['rx_pps']['total'],
87 'pkt_bit_rate': stats['rx_bps']['total'],
88 'dropped_pkts': stats['tx_pkts']['total'] - stats['rx_pkts']['total']
92 lat = self.__combine_latencies(in_stats, ph)
93 result[ph]['rx']['max_delay_usec'] = lat.get('total_max', float('nan'))
94 result[ph]['rx']['min_delay_usec'] = lat.get('total_min', float('nan'))
95 result[ph]['rx']['avg_delay_usec'] = lat.get('average', float('nan'))
97 total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
98 result["total_tx_rate"] = total_tx_pkts / self.config.duration_sec
101 def __combine_stats(self, in_stats, port_handle):
102 """Traverses TRex result dictionary and combines stream stats. Used for combining latency
103 and regular streams together.
105 result = defaultdict(lambda: defaultdict(float))
107 for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
108 record = in_stats['flow_stats'][pg_id]
109 for stat_type, stat_type_values in record.iteritems():
110 for ph, value in stat_type_values.iteritems():
111 result[stat_type][ph] += value
115 def __combine_latencies(self, in_stats, port_handle):
116 """Traverses TRex result dictionary and combines chosen latency stats."""
117 if not len(self.latencies[port_handle]):
120 result = defaultdict(float)
121 result['total_min'] = float("inf")
122 for lat_id in self.latencies[port_handle]:
123 lat = in_stats['latency'][lat_id]
124 result['dropped_pkts'] += lat['err_cntrs']['dropped']
125 result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
126 result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
127 result['average'] += lat['latency']['average']
129 result['average'] /= len(self.latencies[port_handle])
133 def create_pkt(self, stream_cfg, l2frame_size):
134 # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
135 payload = 'x' * (max(64, int(l2frame_size)) - 46)
137 pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
139 if stream_cfg['vlan_tag'] is not None:
140 pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
142 pkt_base /= IP() / UDP()
144 if stream_cfg['ip_addrs_step'] == 'random':
145 src_fv = STLVmFlowVarRepetableRandom(
147 min_value=stream_cfg['ip_src_addr'],
148 max_value=stream_cfg['ip_src_addr_max'],
150 seed=random.randint(0, 32767),
151 limit=stream_cfg['ip_src_count'])
152 dst_fv = STLVmFlowVarRepetableRandom(
154 min_value=stream_cfg['ip_dst_addr'],
155 max_value=stream_cfg['ip_dst_addr_max'],
157 seed=random.randint(0, 32767),
158 limit=stream_cfg['ip_dst_count'])
160 src_fv = STLVmFlowVar(
162 min_value=stream_cfg['ip_src_addr'],
163 max_value=stream_cfg['ip_src_addr'],
166 step=stream_cfg['ip_addrs_step'])
167 dst_fv = STLVmFlowVar(
169 min_value=stream_cfg['ip_dst_addr'],
170 max_value=stream_cfg['ip_dst_addr_max'],
173 step=stream_cfg['ip_addrs_step'])
177 STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
179 STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
180 STLVmFixChecksumHw(l3_offset="IP",
182 l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
185 return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
187 def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
190 if l2frame == 'IMIX':
191 for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
192 pkt = self.create_pkt(stream_cfg, l2_frame_size)
193 streams.append(STLStream(packet=pkt,
195 flow_stats=STLFlowStats(
196 pg_id=self.stream_ids[port_handle]),
197 mode=STLTXCont(pps=ratio)))
200 idx_lat = self.id.next()
201 sl = STLStream(packet=pkt,
203 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
204 mode=STLTXCont(pps=self.LATENCY_PPS))
207 pkt = self.create_pkt(stream_cfg, l2frame)
208 streams.append(STLStream(packet=pkt,
209 flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
213 idx_lat = self.id.next()
214 streams.append(STLStream(packet=pkt,
215 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
216 mode=STLTXCont(pps=self.LATENCY_PPS)))
219 self.latencies[port_handle].append(idx_lat)
227 def __connect(self, client):
230 def __connect_after_start(self):
231 # after start, Trex may take a bit of time to initialize
232 # so we need to retry a few times
233 for it in xrange(self.config.generic_retry_count):
236 self.client.connect()
238 except Exception as ex:
239 if it == (self.config.generic_retry_count - 1):
241 LOG.info("Retrying connection to TRex (%s)...", ex.message)
244 LOG.info("Connecting to TRex...")
245 server_ip = self.config.generator_config.ip
247 # Connect to TRex server
248 self.client = STLClient(server=server_ip)
250 self.__connect(self.client)
251 except (TimeoutError, STLError) as e:
252 if server_ip == '127.0.0.1':
254 self.__start_server()
255 self.__connect_after_start()
256 except (TimeoutError, STLError) as e:
257 LOG.error('Cannot connect to TRex')
258 LOG.error(traceback.format_exc())
259 logpath = '/tmp/trex.log'
260 if os.path.isfile(logpath):
261 # Wait for TRex to finish writing error message
263 for it in xrange(self.config.generic_retry_count):
264 size = os.path.getsize(logpath)
265 if size == last_size:
266 # probably not writing anymore
270 with open(logpath, 'r') as f:
274 raise TrafficGeneratorException(message)
276 raise TrafficGeneratorException(e.message)
278 ports = list(self.config.generator_config.ports)
279 self.port_handle = ports
281 self.client.reset(ports)
284 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
289 def __set_l3_mode(self):
290 self.client.set_service_mode(ports=self.port_handle, enabled=True)
291 for port, device in zip(self.port_handle, self.config.generator_config.devices):
293 self.client.set_l3_mode(port=port,
294 src_ipv4=device.tg_gateway_ip,
295 dst_ipv4=device.dst.gateway_ip,
296 vlan=device.vlan_tag if device.vlan_tagging else None)
298 # TRex tries to resolve ARP already, doesn't have to be successful yet
300 self.client.set_service_mode(ports=self.port_handle, enabled=False)
302 def __set_l2_mode(self):
303 self.client.set_service_mode(ports=self.port_handle, enabled=True)
304 for port, device in zip(self.port_handle, self.config.generator_config.devices):
305 for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
306 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
307 self.client.set_service_mode(ports=self.port_handle, enabled=False)
309 def __start_server(self):
310 server = TRexTrafficServer()
311 server.run_server(self.config.generator_config)
313 def resolve_arp(self):
314 self.client.set_service_mode(ports=self.port_handle)
315 LOG.info('Polling ARP until successful')
318 for port, device in zip(self.port_handle, self.config.generator_config.devices):
319 ctx = self.client.create_service_ctx(port=port)
323 src_ip=cfg['ip_src_tg_gw'],
324 dst_ip=cfg['mac_discovery_gw'],
325 vlan=device.vlan_tag if device.vlan_tagging else None)
326 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
329 for _ in xrange(self.config.generic_retry_count):
334 LOG.error(traceback.format_exc())
337 self.arps[port] = [arp.get_record().dst_mac for arp in arps
338 if arp.get_record().dst_mac is not None]
340 if len(self.arps[port]) == self.config.service_chain_count:
342 LOG.info('ARP resolved successfully for port {}'.format(port))
345 failed = [arp.get_record().dst_ip for arp in arps
346 if arp.get_record().dst_mac is None]
347 LOG.info('Retrying ARP for: {} ({} / {})'.format(
348 failed, attempt, self.config.generic_retry_count))
349 time.sleep(self.config.generic_poll_sec)
351 self.client.set_service_mode(ports=self.port_handle, enabled=False)
352 return resolved == len(self.port_handle)
354 def config_interface(self):
357 def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
358 """Check if rate provided by user is above requirements. Applies only if latency is True."""
359 intf_speed = self.config.generator_config.intf_speed
365 r = utils.convert_rates(l2frame_size, rate, intf_speed)
366 total_rate += int(r['rate_pps'])
369 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
370 # rate must be enough for latency stream and at least 1 pps for base stream per chain
371 required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
372 result = utils.convert_rates(l2frame_size,
373 {'rate_pps': required_rate},
375 result['result'] = total_rate >= required_rate
378 return {'result': True}
380 def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
381 r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
383 raise TrafficGeneratorException(
384 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
385 .format(pps=r['rate_pps'],
387 load=r['rate_percent']))
389 stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
390 for d in self.config.generator_config.devices]
391 self.rates = map(lambda rate: utils.to_rate_str(rate), rates)
393 for ph in self.port_handle:
394 # generate one pg_id for each direction
395 self.stream_ids[ph] = self.id.next()
397 for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
398 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
399 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
400 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
402 self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
406 if len(self.rates) > 1:
407 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
411 latency=bidirectional and latency))
413 for ph in self.port_handle:
414 self.client.add_streams(self.streamblock[ph], ports=ph)
415 LOG.info('Created traffic stream for port %s.' % ph)
417 def modify_rate(self, rate, reverse):
418 port_index = int(reverse)
419 port = self.port_handle[port_index]
420 self.rates[port_index] = utils.to_rate_str(rate)
421 LOG.info('Modified traffic stream for %s, new rate=%s.' % (port, utils.to_rate_str(rate)))
423 def clear_streamblock(self):
424 self.streamblock = defaultdict(list)
425 self.latencies = defaultdict(list)
426 self.stream_ids = defaultdict(list)
428 self.client.reset(self.port_handle)
429 LOG.info('Cleared all existing streams.')
432 stats = self.client.get_pgid_stats()
433 return self.extract_stats(stats)
436 return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
438 def clear_stats(self):
440 self.client.clear_stats()
442 def start_traffic(self):
443 for port, rate in zip(self.port_handle, self.rates):
444 self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
446 def stop_traffic(self):
447 self.client.stop(ports=self.port_handle)
452 self.client.reset(self.port_handle)
453 self.client.disconnect()
455 # TRex does not like a reset while in disconnected state