1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
20 from collections import defaultdict
21 from itertools import count
22 from nfvbench.log import LOG
23 from nfvbench.specs import ChainType
24 from nfvbench.traffic_server import TRexTrafficServer
25 from nfvbench.utils import cast_integer
26 from nfvbench.utils import timeout
27 from nfvbench.utils import TimeoutError
28 from traffic_base import AbstractTrafficGenerator
29 from traffic_base import TrafficGeneratorException
30 import traffic_utils as utils
32 # pylint: disable=import-error
33 from trex_stl_lib.api import CTRexVmInsFixHwCs
34 from trex_stl_lib.api import Dot1Q
35 from trex_stl_lib.api import Ether
36 from trex_stl_lib.api import IP
37 from trex_stl_lib.api import STLClient
38 from trex_stl_lib.api import STLError
39 from trex_stl_lib.api import STLFlowLatencyStats
40 from trex_stl_lib.api import STLFlowStats
41 from trex_stl_lib.api import STLPktBuilder
42 from trex_stl_lib.api import STLScVmRaw
43 from trex_stl_lib.api import STLStream
44 from trex_stl_lib.api import STLTXCont
45 from trex_stl_lib.api import STLVmFixChecksumHw
46 from trex_stl_lib.api import STLVmFlowVar
47 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
48 from trex_stl_lib.api import STLVmWrFlowVar
49 from trex_stl_lib.api import UDP
50 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
53 # pylint: enable=import-error
56 class TRex(AbstractTrafficGenerator):
59 def __init__(self, runner):
60 AbstractTrafficGenerator.__init__(self, runner)
63 self.latencies = defaultdict(list)
64 self.stream_ids = defaultdict(list)
66 self.streamblock = defaultdict(list)
69 self.capture_id = None
72 def get_version(self):
73 return self.client.get_server_version()
75 def extract_stats(self, in_stats):
76 utils.nan_replace(in_stats)
80 for ph in self.port_handle:
81 stats = self.__combine_stats(in_stats, ph)
84 'total_pkts': cast_integer(stats['tx_pkts']['total']),
85 'total_pkt_bytes': cast_integer(stats['tx_bytes']['total']),
86 'pkt_rate': cast_integer(stats['tx_pps']['total']),
87 'pkt_bit_rate': cast_integer(stats['tx_bps']['total'])
90 'total_pkts': cast_integer(stats['rx_pkts']['total']),
91 'total_pkt_bytes': cast_integer(stats['rx_bytes']['total']),
92 'pkt_rate': cast_integer(stats['rx_pps']['total']),
93 'pkt_bit_rate': cast_integer(stats['rx_bps']['total']),
94 'dropped_pkts': cast_integer(
95 stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
99 lat = self.__combine_latencies(in_stats, ph)
100 result[ph]['rx']['max_delay_usec'] = cast_integer(
101 lat['total_max']) if 'total_max' in lat else float('nan')
102 result[ph]['rx']['min_delay_usec'] = cast_integer(
103 lat['total_min']) if 'total_min' in lat else float('nan')
104 result[ph]['rx']['avg_delay_usec'] = cast_integer(
105 lat['average']) if 'average' in lat else float('nan')
106 total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
107 result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
110 def __combine_stats(self, in_stats, port_handle):
111 """Traverses TRex result dictionary and combines stream stats. Used for combining latency
112 and regular streams together.
114 result = defaultdict(lambda: defaultdict(float))
116 for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
117 record = in_stats['flow_stats'][pg_id]
118 for stat_type, stat_type_values in record.iteritems():
119 for ph, value in stat_type_values.iteritems():
120 result[stat_type][ph] += value
124 def __combine_latencies(self, in_stats, port_handle):
125 """Traverses TRex result dictionary and combines chosen latency stats."""
126 if not self.latencies[port_handle]:
129 result = defaultdict(float)
130 result['total_min'] = float("inf")
131 for lat_id in self.latencies[port_handle]:
132 lat = in_stats['latency'][lat_id]
133 result['dropped_pkts'] += lat['err_cntrs']['dropped']
134 result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
135 result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
136 result['average'] += lat['latency']['average']
138 result['average'] /= len(self.latencies[port_handle])
142 def create_pkt(self, stream_cfg, l2frame_size):
144 pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
145 if stream_cfg['vlan_tag'] is not None:
146 # 50 = 14 (Ethernet II) + 4 (Vlan tag) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
147 pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
148 l2payload_size = int(l2frame_size) - 50
150 # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
151 l2payload_size = int(l2frame_size) - 46
152 payload = 'x' * l2payload_size
154 if stream_cfg['udp_src_port']:
155 udp_args['sport'] = int(stream_cfg['udp_src_port'])
156 if stream_cfg['udp_dst_port']:
157 udp_args['dport'] = int(stream_cfg['udp_dst_port'])
158 pkt_base /= IP() / UDP(**udp_args)
160 if stream_cfg['ip_addrs_step'] == 'random':
161 src_fv = STLVmFlowVarRepetableRandom(
163 min_value=stream_cfg['ip_src_addr'],
164 max_value=stream_cfg['ip_src_addr_max'],
166 seed=random.randint(0, 32767),
167 limit=stream_cfg['ip_src_count'])
168 dst_fv = STLVmFlowVarRepetableRandom(
170 min_value=stream_cfg['ip_dst_addr'],
171 max_value=stream_cfg['ip_dst_addr_max'],
173 seed=random.randint(0, 32767),
174 limit=stream_cfg['ip_dst_count'])
176 src_fv = STLVmFlowVar(
178 min_value=stream_cfg['ip_src_addr'],
179 max_value=stream_cfg['ip_src_addr'],
182 step=stream_cfg['ip_addrs_step'])
183 dst_fv = STLVmFlowVar(
185 min_value=stream_cfg['ip_dst_addr'],
186 max_value=stream_cfg['ip_dst_addr_max'],
189 step=stream_cfg['ip_addrs_step'])
193 STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
195 STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
196 STLVmFixChecksumHw(l3_offset="IP",
198 l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
201 return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
203 def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
206 if l2frame == 'IMIX':
207 min_size = 64 if stream_cfg['vlan_tag'] is None else 68
208 self.adjust_imix_min_size(min_size)
209 for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
210 pkt = self.create_pkt(stream_cfg, l2_frame_size)
211 streams.append(STLStream(packet=pkt,
213 flow_stats=STLFlowStats(
214 pg_id=self.stream_ids[port_handle]),
215 mode=STLTXCont(pps=ratio)))
218 idx_lat = self.id.next()
219 pkt = self.create_pkt(stream_cfg, self.imix_avg_l2_size)
220 sl = STLStream(packet=pkt,
222 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
223 mode=STLTXCont(pps=self.LATENCY_PPS))
226 pkt = self.create_pkt(stream_cfg, l2frame)
227 streams.append(STLStream(packet=pkt,
228 flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
232 idx_lat = self.id.next()
233 streams.append(STLStream(packet=pkt,
234 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
235 mode=STLTXCont(pps=self.LATENCY_PPS)))
238 self.latencies[port_handle].append(idx_lat)
246 def __connect(self, client):
249 def __connect_after_start(self):
250 # after start, Trex may take a bit of time to initialize
251 # so we need to retry a few times
252 for it in xrange(self.config.generic_retry_count):
255 self.client.connect()
257 except Exception as ex:
258 if it == (self.config.generic_retry_count - 1):
260 LOG.info("Retrying connection to TRex (%s)...", ex.message)
263 LOG.info("Connecting to TRex...")
264 server_ip = self.config.generator_config.ip
266 # Connect to TRex server
267 self.client = STLClient(server=server_ip)
269 self.__connect(self.client)
270 except (TimeoutError, STLError) as e:
271 if server_ip == '127.0.0.1':
273 self.__start_server()
274 self.__connect_after_start()
275 except (TimeoutError, STLError) as e:
276 LOG.error('Cannot connect to TRex')
277 LOG.error(traceback.format_exc())
278 logpath = '/tmp/trex.log'
279 if os.path.isfile(logpath):
280 # Wait for TRex to finish writing error message
282 for _ in xrange(self.config.generic_retry_count):
283 size = os.path.getsize(logpath)
284 if size == last_size:
285 # probably not writing anymore
289 with open(logpath, 'r') as f:
293 raise TrafficGeneratorException(message)
295 raise TrafficGeneratorException(e.message)
297 ports = list(self.config.generator_config.ports)
298 self.port_handle = ports
300 self.client.reset(ports)
303 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
308 def __set_l3_mode(self):
309 self.client.set_service_mode(ports=self.port_handle, enabled=True)
310 for port, device in zip(self.port_handle, self.config.generator_config.devices):
312 self.client.set_l3_mode(port=port,
313 src_ipv4=device.tg_gateway_ip,
314 dst_ipv4=device.dst.gateway_ip,
315 vlan=device.vlan_tag if device.vlan_tagging else None)
317 # TRex tries to resolve ARP already, doesn't have to be successful yet
319 self.client.set_service_mode(ports=self.port_handle, enabled=False)
321 def __set_l2_mode(self):
322 self.client.set_service_mode(ports=self.port_handle, enabled=True)
323 for port, device in zip(self.port_handle, self.config.generator_config.devices):
324 for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
325 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
326 self.client.set_service_mode(ports=self.port_handle, enabled=False)
328 def __start_server(self):
329 server = TRexTrafficServer()
330 server.run_server(self.config.generator_config, self.config.vlan_tagging)
332 def resolve_arp(self):
333 self.client.set_service_mode(ports=self.port_handle)
334 LOG.info('Polling ARP until successful')
337 for port, device in zip(self.port_handle, self.config.generator_config.devices):
338 ctx = self.client.create_service_ctx(port=port)
342 src_ip=cfg['ip_src_tg_gw'],
343 dst_ip=cfg['mac_discovery_gw'],
344 vlan=device.vlan_tag if device.vlan_tagging else None)
345 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
348 for _ in xrange(self.config.generic_retry_count):
353 LOG.error(traceback.format_exc())
356 self.arps[port] = [arp.get_record().dst_mac for arp in arps
357 if arp.get_record().dst_mac is not None]
359 if len(self.arps[port]) == self.config.service_chain_count:
361 LOG.info('ARP resolved successfully for port %s', port)
364 failed = [arp.get_record().dst_ip for arp in arps
365 if arp.get_record().dst_mac is None]
366 LOG.info('Retrying ARP for: %s (%d / %d)',
367 failed, attempt, self.config.generic_retry_count)
368 time.sleep(self.config.generic_poll_sec)
370 self.client.set_service_mode(ports=self.port_handle, enabled=False)
371 return resolved == len(self.port_handle)
373 def config_interface(self):
376 def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
377 """Check if rate provided by user is above requirements. Applies only if latency is True."""
378 intf_speed = self.config.generator_config.intf_speed
384 r = utils.convert_rates(l2frame_size, rate, intf_speed)
385 total_rate += int(r['rate_pps'])
388 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
389 # rate must be enough for latency stream and at least 1 pps for base stream per chain
390 required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
391 result = utils.convert_rates(l2frame_size,
392 {'rate_pps': required_rate},
394 result['result'] = total_rate >= required_rate
397 return {'result': True}
399 def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
400 r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
402 raise TrafficGeneratorException(
403 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
404 .format(pps=r['rate_pps'],
406 load=r['rate_percent']))
408 stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
409 for d in self.config.generator_config.devices]
410 self.rates = [utils.to_rate_str(rate) for rate in rates]
412 for ph in self.port_handle:
413 # generate one pg_id for each direction
414 self.stream_ids[ph] = self.id.next()
416 for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
417 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
418 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
419 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
421 self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
425 if len(self.rates) > 1:
426 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
430 latency=bidirectional and latency))
432 for ph in self.port_handle:
433 self.client.add_streams(self.streamblock[ph], ports=ph)
434 LOG.info('Created traffic stream for port %s.', ph)
436 def clear_streamblock(self):
437 self.streamblock = defaultdict(list)
438 self.latencies = defaultdict(list)
439 self.stream_ids = defaultdict(list)
441 self.client.reset(self.port_handle)
442 LOG.info('Cleared all existing streams.')
445 stats = self.client.get_pgid_stats()
446 return self.extract_stats(stats)
449 return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
451 def clear_stats(self):
453 self.client.clear_stats()
455 def start_traffic(self):
456 for port, rate in zip(self.port_handle, self.rates):
457 self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
459 def stop_traffic(self):
460 self.client.stop(ports=self.port_handle)
462 def start_capture(self):
465 self.client.set_service_mode(ports=self.port_handle)
466 self.capture_id = self.client.start_capture(rx_ports=self.port_handle)
468 def fetch_capture_packets(self):
470 self.packet_list = []
471 self.client.fetch_capture_packets(capture_id=self.capture_id['id'],
472 output=self.packet_list)
474 def stop_capture(self):
476 self.client.stop_capture(capture_id=self.capture_id['id'])
477 self.capture_id = None
478 self.client.set_service_mode(ports=self.port_handle, enabled=False)
483 self.client.reset(self.port_handle)
484 self.client.disconnect()
486 # TRex does not like a reset while in disconnected state