1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
20 from collections import defaultdict
21 from itertools import count
22 from nfvbench.log import LOG
23 from nfvbench.specs import ChainType
24 from nfvbench.traffic_server import TRexTrafficServer
25 from nfvbench.utils import cast_integer
26 from nfvbench.utils import timeout
27 from nfvbench.utils import TimeoutError
28 from traffic_base import AbstractTrafficGenerator
29 from traffic_base import TrafficGeneratorException
30 import traffic_utils as utils
32 # pylint: disable=import-error
33 from trex_stl_lib.api import CTRexVmInsFixHwCs
34 from trex_stl_lib.api import Dot1Q
35 from trex_stl_lib.api import Ether
36 from trex_stl_lib.api import IP
37 from trex_stl_lib.api import STLClient
38 from trex_stl_lib.api import STLError
39 from trex_stl_lib.api import STLFlowLatencyStats
40 from trex_stl_lib.api import STLFlowStats
41 from trex_stl_lib.api import STLPktBuilder
42 from trex_stl_lib.api import STLScVmRaw
43 from trex_stl_lib.api import STLStream
44 from trex_stl_lib.api import STLTXCont
45 from trex_stl_lib.api import STLVmFixChecksumHw
46 from trex_stl_lib.api import STLVmFlowVar
47 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
48 from trex_stl_lib.api import STLVmWrFlowVar
49 from trex_stl_lib.api import UDP
50 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
53 # pylint: enable=import-error
56 class TRex(AbstractTrafficGenerator):
59 def __init__(self, runner):
60 AbstractTrafficGenerator.__init__(self, runner)
63 self.latencies = defaultdict(list)
64 self.stream_ids = defaultdict(list)
66 self.streamblock = defaultdict(list)
70 def get_version(self):
71 return self.client.get_server_version()
73 def extract_stats(self, in_stats):
74 utils.nan_replace(in_stats)
78 for ph in self.port_handle:
79 stats = self.__combine_stats(in_stats, ph)
82 'total_pkts': cast_integer(stats['tx_pkts']['total']),
83 'total_pkt_bytes': cast_integer(stats['tx_bytes']['total']),
84 'pkt_rate': cast_integer(stats['tx_pps']['total']),
85 'pkt_bit_rate': cast_integer(stats['tx_bps']['total'])
88 'total_pkts': cast_integer(stats['rx_pkts']['total']),
89 'total_pkt_bytes': cast_integer(stats['rx_bytes']['total']),
90 'pkt_rate': cast_integer(stats['rx_pps']['total']),
91 'pkt_bit_rate': cast_integer(stats['rx_bps']['total']),
92 'dropped_pkts': cast_integer(
93 stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
97 lat = self.__combine_latencies(in_stats, ph)
98 result[ph]['rx']['max_delay_usec'] = cast_integer(
99 lat['total_max']) if 'total_max' in lat else float('nan')
100 result[ph]['rx']['min_delay_usec'] = cast_integer(
101 lat['total_min']) if 'total_min' in lat else float('nan')
102 result[ph]['rx']['avg_delay_usec'] = cast_integer(
103 lat['average']) if 'average' in lat else float('nan')
104 total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
105 result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
108 def __combine_stats(self, in_stats, port_handle):
109 """Traverses TRex result dictionary and combines stream stats. Used for combining latency
110 and regular streams together.
112 result = defaultdict(lambda: defaultdict(float))
114 for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
115 record = in_stats['flow_stats'][pg_id]
116 for stat_type, stat_type_values in record.iteritems():
117 for ph, value in stat_type_values.iteritems():
118 result[stat_type][ph] += value
122 def __combine_latencies(self, in_stats, port_handle):
123 """Traverses TRex result dictionary and combines chosen latency stats."""
124 if not self.latencies[port_handle]:
127 result = defaultdict(float)
128 result['total_min'] = float("inf")
129 for lat_id in self.latencies[port_handle]:
130 lat = in_stats['latency'][lat_id]
131 result['dropped_pkts'] += lat['err_cntrs']['dropped']
132 result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
133 result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
134 result['average'] += lat['latency']['average']
136 result['average'] /= len(self.latencies[port_handle])
140 def create_pkt(self, stream_cfg, l2frame_size):
142 pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
143 # TRex requires minimum payload size 16B
144 if stream_cfg['vlan_tag'] is not None:
145 # 50 = 14 (Ethernet II) + 4 (Vlan tag) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
146 pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
147 l2payload_size = max(max(64, int(l2frame_size)) - 50, 16)
149 # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
150 l2payload_size = max(max(64, int(l2frame_size)) - 46, 16)
151 payload = 'x' * l2payload_size
153 if stream_cfg['udp_src_port']:
154 udp_args['sport'] = int(stream_cfg['udp_src_port'])
155 if stream_cfg['udp_dst_port']:
156 udp_args['dport'] = int(stream_cfg['udp_dst_port'])
157 pkt_base /= IP() / UDP(**udp_args)
159 if stream_cfg['ip_addrs_step'] == 'random':
160 src_fv = STLVmFlowVarRepetableRandom(
162 min_value=stream_cfg['ip_src_addr'],
163 max_value=stream_cfg['ip_src_addr_max'],
165 seed=random.randint(0, 32767),
166 limit=stream_cfg['ip_src_count'])
167 dst_fv = STLVmFlowVarRepetableRandom(
169 min_value=stream_cfg['ip_dst_addr'],
170 max_value=stream_cfg['ip_dst_addr_max'],
172 seed=random.randint(0, 32767),
173 limit=stream_cfg['ip_dst_count'])
175 src_fv = STLVmFlowVar(
177 min_value=stream_cfg['ip_src_addr'],
178 max_value=stream_cfg['ip_src_addr'],
181 step=stream_cfg['ip_addrs_step'])
182 dst_fv = STLVmFlowVar(
184 min_value=stream_cfg['ip_dst_addr'],
185 max_value=stream_cfg['ip_dst_addr_max'],
188 step=stream_cfg['ip_addrs_step'])
192 STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
194 STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
195 STLVmFixChecksumHw(l3_offset="IP",
197 l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
200 return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
202 def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
205 if l2frame == 'IMIX':
206 for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
207 pkt = self.create_pkt(stream_cfg, l2_frame_size)
208 streams.append(STLStream(packet=pkt,
210 flow_stats=STLFlowStats(
211 pg_id=self.stream_ids[port_handle]),
212 mode=STLTXCont(pps=ratio)))
215 idx_lat = self.id.next()
216 sl = STLStream(packet=pkt,
218 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
219 mode=STLTXCont(pps=self.LATENCY_PPS))
222 pkt = self.create_pkt(stream_cfg, l2frame)
223 streams.append(STLStream(packet=pkt,
224 flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
228 idx_lat = self.id.next()
229 streams.append(STLStream(packet=pkt,
230 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
231 mode=STLTXCont(pps=self.LATENCY_PPS)))
234 self.latencies[port_handle].append(idx_lat)
242 def __connect(self, client):
245 def __connect_after_start(self):
246 # after start, Trex may take a bit of time to initialize
247 # so we need to retry a few times
248 for it in xrange(self.config.generic_retry_count):
251 self.client.connect()
253 except Exception as ex:
254 if it == (self.config.generic_retry_count - 1):
256 LOG.info("Retrying connection to TRex (%s)...", ex.message)
259 LOG.info("Connecting to TRex...")
260 server_ip = self.config.generator_config.ip
262 # Connect to TRex server
263 self.client = STLClient(server=server_ip)
265 self.__connect(self.client)
266 except (TimeoutError, STLError) as e:
267 if server_ip == '127.0.0.1':
269 self.__start_server()
270 self.__connect_after_start()
271 except (TimeoutError, STLError) as e:
272 LOG.error('Cannot connect to TRex')
273 LOG.error(traceback.format_exc())
274 logpath = '/tmp/trex.log'
275 if os.path.isfile(logpath):
276 # Wait for TRex to finish writing error message
278 for _ in xrange(self.config.generic_retry_count):
279 size = os.path.getsize(logpath)
280 if size == last_size:
281 # probably not writing anymore
285 with open(logpath, 'r') as f:
289 raise TrafficGeneratorException(message)
291 raise TrafficGeneratorException(e.message)
293 ports = list(self.config.generator_config.ports)
294 self.port_handle = ports
296 self.client.reset(ports)
299 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
304 def __set_l3_mode(self):
305 self.client.set_service_mode(ports=self.port_handle, enabled=True)
306 for port, device in zip(self.port_handle, self.config.generator_config.devices):
308 self.client.set_l3_mode(port=port,
309 src_ipv4=device.tg_gateway_ip,
310 dst_ipv4=device.dst.gateway_ip,
311 vlan=device.vlan_tag if device.vlan_tagging else None)
313 # TRex tries to resolve ARP already, doesn't have to be successful yet
315 self.client.set_service_mode(ports=self.port_handle, enabled=False)
317 def __set_l2_mode(self):
318 self.client.set_service_mode(ports=self.port_handle, enabled=True)
319 for port, device in zip(self.port_handle, self.config.generator_config.devices):
320 for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
321 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
322 self.client.set_service_mode(ports=self.port_handle, enabled=False)
324 def __start_server(self):
325 server = TRexTrafficServer()
326 server.run_server(self.config.generator_config, self.config.vlan_tagging)
328 def resolve_arp(self):
329 self.client.set_service_mode(ports=self.port_handle)
330 LOG.info('Polling ARP until successful')
333 for port, device in zip(self.port_handle, self.config.generator_config.devices):
334 ctx = self.client.create_service_ctx(port=port)
338 src_ip=cfg['ip_src_tg_gw'],
339 dst_ip=cfg['mac_discovery_gw'],
340 vlan=device.vlan_tag if device.vlan_tagging else None)
341 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
344 for _ in xrange(self.config.generic_retry_count):
349 LOG.error(traceback.format_exc())
352 self.arps[port] = [arp.get_record().dst_mac for arp in arps
353 if arp.get_record().dst_mac is not None]
355 if len(self.arps[port]) == self.config.service_chain_count:
357 LOG.info('ARP resolved successfully for port %s', port)
360 failed = [arp.get_record().dst_ip for arp in arps
361 if arp.get_record().dst_mac is None]
362 LOG.info('Retrying ARP for: %s (%d / %d)',
363 failed, attempt, self.config.generic_retry_count)
364 time.sleep(self.config.generic_poll_sec)
366 self.client.set_service_mode(ports=self.port_handle, enabled=False)
367 return resolved == len(self.port_handle)
369 def config_interface(self):
372 def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
373 """Check if rate provided by user is above requirements. Applies only if latency is True."""
374 intf_speed = self.config.generator_config.intf_speed
380 r = utils.convert_rates(l2frame_size, rate, intf_speed)
381 total_rate += int(r['rate_pps'])
384 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
385 # rate must be enough for latency stream and at least 1 pps for base stream per chain
386 required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
387 result = utils.convert_rates(l2frame_size,
388 {'rate_pps': required_rate},
390 result['result'] = total_rate >= required_rate
393 return {'result': True}
395 def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
396 r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
398 raise TrafficGeneratorException(
399 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
400 .format(pps=r['rate_pps'],
402 load=r['rate_percent']))
404 stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
405 for d in self.config.generator_config.devices]
406 self.rates = [utils.to_rate_str(rate) for rate in rates]
408 for ph in self.port_handle:
409 # generate one pg_id for each direction
410 self.stream_ids[ph] = self.id.next()
412 for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
413 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
414 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
415 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
417 self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
421 if len(self.rates) > 1:
422 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
426 latency=bidirectional and latency))
428 for ph in self.port_handle:
429 self.client.add_streams(self.streamblock[ph], ports=ph)
430 LOG.info('Created traffic stream for port %s.', ph)
432 def clear_streamblock(self):
433 self.streamblock = defaultdict(list)
434 self.latencies = defaultdict(list)
435 self.stream_ids = defaultdict(list)
437 self.client.reset(self.port_handle)
438 LOG.info('Cleared all existing streams.')
441 stats = self.client.get_pgid_stats()
442 return self.extract_stats(stats)
445 return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
447 def clear_stats(self):
449 self.client.clear_stats()
451 def start_traffic(self):
452 for port, rate in zip(self.port_handle, self.rates):
453 self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
455 def stop_traffic(self):
456 self.client.stop(ports=self.port_handle)
461 self.client.reset(self.port_handle)
462 self.client.disconnect()
464 # TRex does not like a reset while in disconnected state