1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
20 from collections import defaultdict
21 from itertools import count
22 from nfvbench.log import LOG
23 from nfvbench.specs import ChainType
24 from nfvbench.traffic_server import TRexTrafficServer
25 from nfvbench.utils import cast_integer
26 from nfvbench.utils import timeout
27 from nfvbench.utils import TimeoutError
28 from traffic_base import AbstractTrafficGenerator
29 from traffic_base import TrafficGeneratorException
30 import traffic_utils as utils
32 # pylint: disable=import-error
33 from trex_stl_lib.api import CTRexVmInsFixHwCs
34 from trex_stl_lib.api import Dot1Q
35 from trex_stl_lib.api import Ether
36 from trex_stl_lib.api import IP
37 from trex_stl_lib.api import STLClient
38 from trex_stl_lib.api import STLError
39 from trex_stl_lib.api import STLFlowLatencyStats
40 from trex_stl_lib.api import STLFlowStats
41 from trex_stl_lib.api import STLPktBuilder
42 from trex_stl_lib.api import STLScVmRaw
43 from trex_stl_lib.api import STLStream
44 from trex_stl_lib.api import STLTXCont
45 from trex_stl_lib.api import STLVmFixChecksumHw
46 from trex_stl_lib.api import STLVmFlowVar
47 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
48 from trex_stl_lib.api import STLVmWrFlowVar
49 from trex_stl_lib.api import UDP
50 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
51 # pylint: enable=import-error
54 class TRex(AbstractTrafficGenerator):
57 def __init__(self, runner):
58 AbstractTrafficGenerator.__init__(self, runner)
61 self.latencies = defaultdict(list)
62 self.stream_ids = defaultdict(list)
64 self.streamblock = defaultdict(list)
68 def get_version(self):
69 return self.client.get_server_version()
71 def extract_stats(self, in_stats):
72 utils.nan_replace(in_stats)
76 for ph in self.port_handle:
77 stats = self.__combine_stats(in_stats, ph)
80 'total_pkts': cast_integer(stats['tx_pkts']['total']),
81 'total_pkt_bytes': cast_integer(stats['tx_bytes']['total']),
82 'pkt_rate': cast_integer(stats['tx_pps']['total']),
83 'pkt_bit_rate': cast_integer(stats['tx_bps']['total'])
86 'total_pkts': cast_integer(stats['rx_pkts']['total']),
87 'total_pkt_bytes': cast_integer(stats['rx_bytes']['total']),
88 'pkt_rate': cast_integer(stats['rx_pps']['total']),
89 'pkt_bit_rate': cast_integer(stats['rx_bps']['total']),
90 'dropped_pkts': cast_integer(
91 stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
95 lat = self.__combine_latencies(in_stats, ph)
96 result[ph]['rx']['max_delay_usec'] = cast_integer(
97 lat['total_max']) if 'total_max' in lat else float('nan')
98 result[ph]['rx']['min_delay_usec'] = cast_integer(
99 lat['total_min']) if 'total_min' in lat else float('nan')
100 result[ph]['rx']['avg_delay_usec'] = cast_integer(
101 lat['average']) if 'average' in lat else float('nan')
102 total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
103 result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
106 def __combine_stats(self, in_stats, port_handle):
107 """Traverses TRex result dictionary and combines stream stats. Used for combining latency
108 and regular streams together.
110 result = defaultdict(lambda: defaultdict(float))
112 for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
113 record = in_stats['flow_stats'][pg_id]
114 for stat_type, stat_type_values in record.iteritems():
115 for ph, value in stat_type_values.iteritems():
116 result[stat_type][ph] += value
120 def __combine_latencies(self, in_stats, port_handle):
121 """Traverses TRex result dictionary and combines chosen latency stats."""
122 if not self.latencies[port_handle]:
125 result = defaultdict(float)
126 result['total_min'] = float("inf")
127 for lat_id in self.latencies[port_handle]:
128 lat = in_stats['latency'][lat_id]
129 result['dropped_pkts'] += lat['err_cntrs']['dropped']
130 result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
131 result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
132 result['average'] += lat['latency']['average']
134 result['average'] /= len(self.latencies[port_handle])
138 def create_pkt(self, stream_cfg, l2frame_size):
140 pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
142 if stream_cfg['vlan_tag'] is not None:
143 # 50 = 14 (Ethernet II) + 4 (Vlan tag) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
144 pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
145 payload = 'x' * (max(64, int(l2frame_size)) - 50)
147 # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
148 payload = 'x' * (max(64, int(l2frame_size)) - 46)
151 if stream_cfg['udp_src_port']:
152 udp_args['sport'] = int(stream_cfg['udp_src_port'])
153 if stream_cfg['udp_dst_port']:
154 udp_args['dport'] = int(stream_cfg['udp_dst_port'])
155 pkt_base /= IP() / UDP(**udp_args)
157 if stream_cfg['ip_addrs_step'] == 'random':
158 src_fv = STLVmFlowVarRepetableRandom(
160 min_value=stream_cfg['ip_src_addr'],
161 max_value=stream_cfg['ip_src_addr_max'],
163 seed=random.randint(0, 32767),
164 limit=stream_cfg['ip_src_count'])
165 dst_fv = STLVmFlowVarRepetableRandom(
167 min_value=stream_cfg['ip_dst_addr'],
168 max_value=stream_cfg['ip_dst_addr_max'],
170 seed=random.randint(0, 32767),
171 limit=stream_cfg['ip_dst_count'])
173 src_fv = STLVmFlowVar(
175 min_value=stream_cfg['ip_src_addr'],
176 max_value=stream_cfg['ip_src_addr'],
179 step=stream_cfg['ip_addrs_step'])
180 dst_fv = STLVmFlowVar(
182 min_value=stream_cfg['ip_dst_addr'],
183 max_value=stream_cfg['ip_dst_addr_max'],
186 step=stream_cfg['ip_addrs_step'])
190 STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
192 STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
193 STLVmFixChecksumHw(l3_offset="IP",
195 l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
198 return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
200 def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
203 if l2frame == 'IMIX':
204 for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
205 pkt = self.create_pkt(stream_cfg, l2_frame_size)
206 streams.append(STLStream(packet=pkt,
208 flow_stats=STLFlowStats(
209 pg_id=self.stream_ids[port_handle]),
210 mode=STLTXCont(pps=ratio)))
213 idx_lat = self.id.next()
214 sl = STLStream(packet=pkt,
216 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
217 mode=STLTXCont(pps=self.LATENCY_PPS))
220 pkt = self.create_pkt(stream_cfg, l2frame)
221 streams.append(STLStream(packet=pkt,
222 flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
226 idx_lat = self.id.next()
227 streams.append(STLStream(packet=pkt,
228 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
229 mode=STLTXCont(pps=self.LATENCY_PPS)))
232 self.latencies[port_handle].append(idx_lat)
240 def __connect(self, client):
243 def __connect_after_start(self):
244 # after start, Trex may take a bit of time to initialize
245 # so we need to retry a few times
246 for it in xrange(self.config.generic_retry_count):
249 self.client.connect()
251 except Exception as ex:
252 if it == (self.config.generic_retry_count - 1):
254 LOG.info("Retrying connection to TRex (%s)...", ex.message)
257 LOG.info("Connecting to TRex...")
258 server_ip = self.config.generator_config.ip
260 # Connect to TRex server
261 self.client = STLClient(server=server_ip)
263 self.__connect(self.client)
264 except (TimeoutError, STLError) as e:
265 if server_ip == '127.0.0.1':
267 self.__start_server()
268 self.__connect_after_start()
269 except (TimeoutError, STLError) as e:
270 LOG.error('Cannot connect to TRex')
271 LOG.error(traceback.format_exc())
272 logpath = '/tmp/trex.log'
273 if os.path.isfile(logpath):
274 # Wait for TRex to finish writing error message
276 for _ in xrange(self.config.generic_retry_count):
277 size = os.path.getsize(logpath)
278 if size == last_size:
279 # probably not writing anymore
283 with open(logpath, 'r') as f:
287 raise TrafficGeneratorException(message)
289 raise TrafficGeneratorException(e.message)
291 ports = list(self.config.generator_config.ports)
292 self.port_handle = ports
294 self.client.reset(ports)
297 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
302 def __set_l3_mode(self):
303 self.client.set_service_mode(ports=self.port_handle, enabled=True)
304 for port, device in zip(self.port_handle, self.config.generator_config.devices):
306 self.client.set_l3_mode(port=port,
307 src_ipv4=device.tg_gateway_ip,
308 dst_ipv4=device.dst.gateway_ip,
309 vlan=device.vlan_tag if device.vlan_tagging else None)
311 # TRex tries to resolve ARP already, doesn't have to be successful yet
313 self.client.set_service_mode(ports=self.port_handle, enabled=False)
315 def __set_l2_mode(self):
316 self.client.set_service_mode(ports=self.port_handle, enabled=True)
317 for port, device in zip(self.port_handle, self.config.generator_config.devices):
318 for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
319 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
320 self.client.set_service_mode(ports=self.port_handle, enabled=False)
322 def __start_server(self):
323 server = TRexTrafficServer()
324 server.run_server(self.config.generator_config)
326 def resolve_arp(self):
327 self.client.set_service_mode(ports=self.port_handle)
328 LOG.info('Polling ARP until successful')
331 for port, device in zip(self.port_handle, self.config.generator_config.devices):
332 ctx = self.client.create_service_ctx(port=port)
336 src_ip=cfg['ip_src_tg_gw'],
337 dst_ip=cfg['mac_discovery_gw'],
338 vlan=device.vlan_tag if device.vlan_tagging else None)
339 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
342 for _ in xrange(self.config.generic_retry_count):
347 LOG.error(traceback.format_exc())
350 self.arps[port] = [arp.get_record().dst_mac for arp in arps
351 if arp.get_record().dst_mac is not None]
353 if len(self.arps[port]) == self.config.service_chain_count:
355 LOG.info('ARP resolved successfully for port %s', port)
358 failed = [arp.get_record().dst_ip for arp in arps
359 if arp.get_record().dst_mac is None]
360 LOG.info('Retrying ARP for: %s (%d / %d)',
361 failed, attempt, self.config.generic_retry_count)
362 time.sleep(self.config.generic_poll_sec)
364 self.client.set_service_mode(ports=self.port_handle, enabled=False)
365 return resolved == len(self.port_handle)
367 def config_interface(self):
370 def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
371 """Check if rate provided by user is above requirements. Applies only if latency is True."""
372 intf_speed = self.config.generator_config.intf_speed
378 r = utils.convert_rates(l2frame_size, rate, intf_speed)
379 total_rate += int(r['rate_pps'])
382 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
383 # rate must be enough for latency stream and at least 1 pps for base stream per chain
384 required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
385 result = utils.convert_rates(l2frame_size,
386 {'rate_pps': required_rate},
388 result['result'] = total_rate >= required_rate
391 return {'result': True}
393 def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
394 r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
396 raise TrafficGeneratorException(
397 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
398 .format(pps=r['rate_pps'],
400 load=r['rate_percent']))
402 stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
403 for d in self.config.generator_config.devices]
404 self.rates = [utils.to_rate_str(rate) for rate in rates]
406 for ph in self.port_handle:
407 # generate one pg_id for each direction
408 self.stream_ids[ph] = self.id.next()
410 for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
411 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
412 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
413 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
415 self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
419 if len(self.rates) > 1:
420 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
424 latency=bidirectional and latency))
426 for ph in self.port_handle:
427 self.client.add_streams(self.streamblock[ph], ports=ph)
428 LOG.info('Created traffic stream for port %s.', ph)
430 def clear_streamblock(self):
431 self.streamblock = defaultdict(list)
432 self.latencies = defaultdict(list)
433 self.stream_ids = defaultdict(list)
435 self.client.reset(self.port_handle)
436 LOG.info('Cleared all existing streams.')
439 stats = self.client.get_pgid_stats()
440 return self.extract_stats(stats)
443 return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
445 def clear_stats(self):
447 self.client.clear_stats()
449 def start_traffic(self):
450 for port, rate in zip(self.port_handle, self.rates):
451 self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
453 def stop_traffic(self):
454 self.client.stop(ports=self.port_handle)
459 self.client.reset(self.port_handle)
460 self.client.disconnect()
462 # TRex does not like a reset while in disconnected state