1 # Copyright 2016 Cisco Systems, Inc. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
20 from collections import defaultdict
21 from itertools import count
22 from nfvbench.log import LOG
23 from nfvbench.specs import ChainType
24 from nfvbench.traffic_server import TRexTrafficServer
25 from nfvbench.utils import cast_integer
26 from nfvbench.utils import timeout
27 from nfvbench.utils import TimeoutError
28 from traffic_base import AbstractTrafficGenerator
29 from traffic_base import TrafficGeneratorException
30 import traffic_utils as utils
32 # pylint: disable=import-error
33 from trex_stl_lib.api import CTRexVmInsFixHwCs
34 from trex_stl_lib.api import Dot1Q
35 from trex_stl_lib.api import Ether
36 from trex_stl_lib.api import IP
37 from trex_stl_lib.api import STLClient
38 from trex_stl_lib.api import STLError
39 from trex_stl_lib.api import STLFlowLatencyStats
40 from trex_stl_lib.api import STLFlowStats
41 from trex_stl_lib.api import STLPktBuilder
42 from trex_stl_lib.api import STLScVmRaw
43 from trex_stl_lib.api import STLStream
44 from trex_stl_lib.api import STLTXCont
45 from trex_stl_lib.api import STLVmFixChecksumHw
46 from trex_stl_lib.api import STLVmFlowVar
47 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
48 from trex_stl_lib.api import STLVmWrFlowVar
49 from trex_stl_lib.api import UDP
50 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
51 # pylint: enable=import-error
54 class TRex(AbstractTrafficGenerator):
57 def __init__(self, runner):
58 AbstractTrafficGenerator.__init__(self, runner)
61 self.latencies = defaultdict(list)
62 self.stream_ids = defaultdict(list)
64 self.streamblock = defaultdict(list)
68 def get_version(self):
69 return self.client.get_server_version()
71 def extract_stats(self, in_stats):
72 utils.nan_replace(in_stats)
76 for ph in self.port_handle:
77 stats = self.__combine_stats(in_stats, ph)
80 'total_pkts': cast_integer(stats['tx_pkts']['total']),
81 'total_pkt_bytes': cast_integer(stats['tx_bytes']['total']),
82 'pkt_rate': cast_integer(stats['tx_pps']['total']),
83 'pkt_bit_rate': cast_integer(stats['tx_bps']['total'])
86 'total_pkts': cast_integer(stats['rx_pkts']['total']),
87 'total_pkt_bytes': cast_integer(stats['rx_bytes']['total']),
88 'pkt_rate': cast_integer(stats['rx_pps']['total']),
89 'pkt_bit_rate': cast_integer(stats['rx_bps']['total']),
90 'dropped_pkts': cast_integer(
91 stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
95 lat = self.__combine_latencies(in_stats, ph)
96 result[ph]['rx']['max_delay_usec'] = cast_integer(
97 lat['total_max']) if 'total_max' in lat else float('nan')
98 result[ph]['rx']['min_delay_usec'] = cast_integer(
99 lat['total_min']) if 'total_min' in lat else float('nan')
100 result[ph]['rx']['avg_delay_usec'] = cast_integer(
101 lat['average']) if 'average' in lat else float('nan')
102 total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
103 result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
106 def __combine_stats(self, in_stats, port_handle):
107 """Traverses TRex result dictionary and combines stream stats. Used for combining latency
108 and regular streams together.
110 result = defaultdict(lambda: defaultdict(float))
112 for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
113 record = in_stats['flow_stats'][pg_id]
114 for stat_type, stat_type_values in record.iteritems():
115 for ph, value in stat_type_values.iteritems():
116 result[stat_type][ph] += value
120 def __combine_latencies(self, in_stats, port_handle):
121 """Traverses TRex result dictionary and combines chosen latency stats."""
122 if not self.latencies[port_handle]:
125 result = defaultdict(float)
126 result['total_min'] = float("inf")
127 for lat_id in self.latencies[port_handle]:
128 lat = in_stats['latency'][lat_id]
129 result['dropped_pkts'] += lat['err_cntrs']['dropped']
130 result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
131 result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
132 result['average'] += lat['latency']['average']
134 result['average'] /= len(self.latencies[port_handle])
138 def create_pkt(self, stream_cfg, l2frame_size):
139 # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
140 payload = 'x' * (max(64, int(l2frame_size)) - 46)
142 pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
144 if stream_cfg['vlan_tag'] is not None:
145 pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
148 if stream_cfg['udp_src_port']:
149 udp_args['sport'] = int(stream_cfg['udp_src_port'])
150 if stream_cfg['udp_dst_port']:
151 udp_args['dport'] = int(stream_cfg['udp_dst_port'])
152 pkt_base /= IP() / UDP(**udp_args)
154 if stream_cfg['ip_addrs_step'] == 'random':
155 src_fv = STLVmFlowVarRepetableRandom(
157 min_value=stream_cfg['ip_src_addr'],
158 max_value=stream_cfg['ip_src_addr_max'],
160 seed=random.randint(0, 32767),
161 limit=stream_cfg['ip_src_count'])
162 dst_fv = STLVmFlowVarRepetableRandom(
164 min_value=stream_cfg['ip_dst_addr'],
165 max_value=stream_cfg['ip_dst_addr_max'],
167 seed=random.randint(0, 32767),
168 limit=stream_cfg['ip_dst_count'])
170 src_fv = STLVmFlowVar(
172 min_value=stream_cfg['ip_src_addr'],
173 max_value=stream_cfg['ip_src_addr'],
176 step=stream_cfg['ip_addrs_step'])
177 dst_fv = STLVmFlowVar(
179 min_value=stream_cfg['ip_dst_addr'],
180 max_value=stream_cfg['ip_dst_addr_max'],
183 step=stream_cfg['ip_addrs_step'])
187 STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
189 STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
190 STLVmFixChecksumHw(l3_offset="IP",
192 l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
195 return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
197 def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
200 if l2frame == 'IMIX':
201 for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
202 pkt = self.create_pkt(stream_cfg, l2_frame_size)
203 streams.append(STLStream(packet=pkt,
205 flow_stats=STLFlowStats(
206 pg_id=self.stream_ids[port_handle]),
207 mode=STLTXCont(pps=ratio)))
210 idx_lat = self.id.next()
211 sl = STLStream(packet=pkt,
213 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
214 mode=STLTXCont(pps=self.LATENCY_PPS))
217 pkt = self.create_pkt(stream_cfg, l2frame)
218 streams.append(STLStream(packet=pkt,
219 flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
223 idx_lat = self.id.next()
224 streams.append(STLStream(packet=pkt,
225 flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
226 mode=STLTXCont(pps=self.LATENCY_PPS)))
229 self.latencies[port_handle].append(idx_lat)
237 def __connect(self, client):
240 def __connect_after_start(self):
241 # after start, Trex may take a bit of time to initialize
242 # so we need to retry a few times
243 for it in xrange(self.config.generic_retry_count):
246 self.client.connect()
248 except Exception as ex:
249 if it == (self.config.generic_retry_count - 1):
251 LOG.info("Retrying connection to TRex (%s)...", ex.message)
254 LOG.info("Connecting to TRex...")
255 server_ip = self.config.generator_config.ip
257 # Connect to TRex server
258 self.client = STLClient(server=server_ip)
260 self.__connect(self.client)
261 except (TimeoutError, STLError) as e:
262 if server_ip == '127.0.0.1':
264 self.__start_server()
265 self.__connect_after_start()
266 except (TimeoutError, STLError) as e:
267 LOG.error('Cannot connect to TRex')
268 LOG.error(traceback.format_exc())
269 logpath = '/tmp/trex.log'
270 if os.path.isfile(logpath):
271 # Wait for TRex to finish writing error message
273 for _ in xrange(self.config.generic_retry_count):
274 size = os.path.getsize(logpath)
275 if size == last_size:
276 # probably not writing anymore
280 with open(logpath, 'r') as f:
284 raise TrafficGeneratorException(message)
286 raise TrafficGeneratorException(e.message)
288 ports = list(self.config.generator_config.ports)
289 self.port_handle = ports
291 self.client.reset(ports)
294 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
299 def __set_l3_mode(self):
300 self.client.set_service_mode(ports=self.port_handle, enabled=True)
301 for port, device in zip(self.port_handle, self.config.generator_config.devices):
303 self.client.set_l3_mode(port=port,
304 src_ipv4=device.tg_gateway_ip,
305 dst_ipv4=device.dst.gateway_ip,
306 vlan=device.vlan_tag if device.vlan_tagging else None)
308 # TRex tries to resolve ARP already, doesn't have to be successful yet
310 self.client.set_service_mode(ports=self.port_handle, enabled=False)
312 def __set_l2_mode(self):
313 self.client.set_service_mode(ports=self.port_handle, enabled=True)
314 for port, device in zip(self.port_handle, self.config.generator_config.devices):
315 for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
316 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
317 self.client.set_service_mode(ports=self.port_handle, enabled=False)
319 def __start_server(self):
320 server = TRexTrafficServer()
321 server.run_server(self.config.generator_config)
323 def resolve_arp(self):
324 self.client.set_service_mode(ports=self.port_handle)
325 LOG.info('Polling ARP until successful')
328 for port, device in zip(self.port_handle, self.config.generator_config.devices):
329 ctx = self.client.create_service_ctx(port=port)
333 src_ip=cfg['ip_src_tg_gw'],
334 dst_ip=cfg['mac_discovery_gw'],
335 vlan=device.vlan_tag if device.vlan_tagging else None)
336 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
339 for _ in xrange(self.config.generic_retry_count):
344 LOG.error(traceback.format_exc())
347 self.arps[port] = [arp.get_record().dst_mac for arp in arps
348 if arp.get_record().dst_mac is not None]
350 if len(self.arps[port]) == self.config.service_chain_count:
352 LOG.info('ARP resolved successfully for port %s', port)
355 failed = [arp.get_record().dst_ip for arp in arps
356 if arp.get_record().dst_mac is None]
357 LOG.info('Retrying ARP for: %d (%d / %d)',
358 failed, attempt, self.config.generic_retry_count)
359 time.sleep(self.config.generic_poll_sec)
361 self.client.set_service_mode(ports=self.port_handle, enabled=False)
362 return resolved == len(self.port_handle)
364 def config_interface(self):
367 def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
368 """Check if rate provided by user is above requirements. Applies only if latency is True."""
369 intf_speed = self.config.generator_config.intf_speed
375 r = utils.convert_rates(l2frame_size, rate, intf_speed)
376 total_rate += int(r['rate_pps'])
379 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
380 # rate must be enough for latency stream and at least 1 pps for base stream per chain
381 required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
382 result = utils.convert_rates(l2frame_size,
383 {'rate_pps': required_rate},
385 result['result'] = total_rate >= required_rate
388 return {'result': True}
390 def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
391 r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
393 raise TrafficGeneratorException(
394 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
395 .format(pps=r['rate_pps'],
397 load=r['rate_percent']))
399 stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
400 for d in self.config.generator_config.devices]
401 self.rates = [utils.to_rate_str(rate) for rate in rates]
403 for ph in self.port_handle:
404 # generate one pg_id for each direction
405 self.stream_ids[ph] = self.id.next()
407 for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
408 if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
409 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
410 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
412 self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
416 if len(self.rates) > 1:
417 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
421 latency=bidirectional and latency))
423 for ph in self.port_handle:
424 self.client.add_streams(self.streamblock[ph], ports=ph)
425 LOG.info('Created traffic stream for port %s.', ph)
427 def clear_streamblock(self):
428 self.streamblock = defaultdict(list)
429 self.latencies = defaultdict(list)
430 self.stream_ids = defaultdict(list)
432 self.client.reset(self.port_handle)
433 LOG.info('Cleared all existing streams.')
436 stats = self.client.get_pgid_stats()
437 return self.extract_stats(stats)
440 return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
442 def clear_stats(self):
444 self.client.clear_stats()
446 def start_traffic(self):
447 for port, rate in zip(self.port_handle, self.rates):
448 self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
450 def stop_traffic(self):
451 self.client.stop(ports=self.port_handle)
456 self.client.reset(self.port_handle)
457 self.client.disconnect()
459 # TRex does not like a reset while in disconnected state