[NFVBENCH-81]With some Intel X710 NIC cards, NFVbench reports erroneous RX counters
[nfvbench.git] / nfvbench / traffic_gen / trex.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 import os
16 import random
17 import time
18 import traceback
19
20 from collections import defaultdict
21 from itertools import count
22 from nfvbench.log import LOG
23 from nfvbench.specs import ChainType
24 from nfvbench.traffic_server import TRexTrafficServer
25 from nfvbench.utils import cast_integer
26 from nfvbench.utils import timeout
27 from nfvbench.utils import TimeoutError
28 from traffic_base import AbstractTrafficGenerator
29 from traffic_base import TrafficGeneratorException
30 import traffic_utils as utils
31
32 # pylint: disable=import-error
33 from trex_stl_lib.api import CTRexVmInsFixHwCs
34 from trex_stl_lib.api import Dot1Q
35 from trex_stl_lib.api import Ether
36 from trex_stl_lib.api import IP
37 from trex_stl_lib.api import STLClient
38 from trex_stl_lib.api import STLError
39 from trex_stl_lib.api import STLFlowLatencyStats
40 from trex_stl_lib.api import STLFlowStats
41 from trex_stl_lib.api import STLPktBuilder
42 from trex_stl_lib.api import STLScVmRaw
43 from trex_stl_lib.api import STLStream
44 from trex_stl_lib.api import STLTXCont
45 from trex_stl_lib.api import STLVmFixChecksumHw
46 from trex_stl_lib.api import STLVmFlowVar
47 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
48 from trex_stl_lib.api import STLVmWrFlowVar
49 from trex_stl_lib.api import UDP
50 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
51
52
53 # pylint: enable=import-error
54
55
56 class TRex(AbstractTrafficGenerator):
57     LATENCY_PPS = 1000
58
59     def __init__(self, runner):
60         AbstractTrafficGenerator.__init__(self, runner)
61         self.client = None
62         self.id = count()
63         self.latencies = defaultdict(list)
64         self.stream_ids = defaultdict(list)
65         self.port_handle = []
66         self.streamblock = defaultdict(list)
67         self.rates = []
68         self.arps = {}
69         self.capture_id = None
70         self.packet_list = []
71
72     def get_version(self):
73         return self.client.get_server_version()
74
75     def extract_stats(self, in_stats):
76         utils.nan_replace(in_stats)
77         LOG.debug(in_stats)
78
79         result = {}
80         for ph in self.port_handle:
81             stats = in_stats[ph]
82             result[ph] = {
83                 'tx': {
84                     'total_pkts': cast_integer(stats['opackets']),
85                     'total_pkt_bytes': cast_integer(stats['obytes']),
86                     'pkt_rate': cast_integer(stats['tx_pps']),
87                     'pkt_bit_rate': cast_integer(stats['tx_bps'])
88                 },
89                 'rx': {
90                     'total_pkts': cast_integer(stats['ipackets']),
91                     'total_pkt_bytes': cast_integer(stats['ibytes']),
92                     'pkt_rate': cast_integer(stats['rx_pps']),
93                     'pkt_bit_rate': cast_integer(stats['rx_bps']),
94                     'dropped_pkts': cast_integer(
95                         stats['opackets'] - stats['ipackets'])
96                 }
97             }
98
99             lat = self.__combine_latencies(in_stats, ph)
100             result[ph]['rx']['max_delay_usec'] = cast_integer(
101                 lat['total_max']) if 'total_max' in lat else float('nan')
102             result[ph]['rx']['min_delay_usec'] = cast_integer(
103                 lat['total_min']) if 'total_min' in lat else float('nan')
104             result[ph]['rx']['avg_delay_usec'] = cast_integer(
105                 lat['average']) if 'average' in lat else float('nan')
106         total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
107         result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
108         return result
109
110     def __combine_latencies(self, in_stats, port_handle):
111         """Traverses TRex result dictionary and combines chosen latency stats."""
112         if not self.latencies[port_handle]:
113             return {}
114
115         result = defaultdict(float)
116         result['total_min'] = float("inf")
117         for lat_id in self.latencies[port_handle]:
118             lat = in_stats['latency'][lat_id]
119             result['dropped_pkts'] += lat['err_cntrs']['dropped']
120             result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
121             result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
122             result['average'] += lat['latency']['average']
123
124         result['average'] /= len(self.latencies[port_handle])
125
126         return result
127
128     def create_pkt(self, stream_cfg, l2frame_size):
129
130         pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
131         if stream_cfg['vlan_tag'] is not None:
132             # 50 = 14 (Ethernet II) + 4 (Vlan tag) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
133             pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
134             l2payload_size = int(l2frame_size) - 50
135         else:
136             # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
137             l2payload_size = int(l2frame_size) - 46
138         payload = 'x' * l2payload_size
139         udp_args = {}
140         if stream_cfg['udp_src_port']:
141             udp_args['sport'] = int(stream_cfg['udp_src_port'])
142         if stream_cfg['udp_dst_port']:
143             udp_args['dport'] = int(stream_cfg['udp_dst_port'])
144         pkt_base /= IP() / UDP(**udp_args)
145
146         if stream_cfg['ip_addrs_step'] == 'random':
147             src_fv = STLVmFlowVarRepetableRandom(
148                 name="ip_src",
149                 min_value=stream_cfg['ip_src_addr'],
150                 max_value=stream_cfg['ip_src_addr_max'],
151                 size=4,
152                 seed=random.randint(0, 32767),
153                 limit=stream_cfg['ip_src_count'])
154             dst_fv = STLVmFlowVarRepetableRandom(
155                 name="ip_dst",
156                 min_value=stream_cfg['ip_dst_addr'],
157                 max_value=stream_cfg['ip_dst_addr_max'],
158                 size=4,
159                 seed=random.randint(0, 32767),
160                 limit=stream_cfg['ip_dst_count'])
161         else:
162             src_fv = STLVmFlowVar(
163                 name="ip_src",
164                 min_value=stream_cfg['ip_src_addr'],
165                 max_value=stream_cfg['ip_src_addr'],
166                 size=4,
167                 op="inc",
168                 step=stream_cfg['ip_addrs_step'])
169             dst_fv = STLVmFlowVar(
170                 name="ip_dst",
171                 min_value=stream_cfg['ip_dst_addr'],
172                 max_value=stream_cfg['ip_dst_addr_max'],
173                 size=4,
174                 op="inc",
175                 step=stream_cfg['ip_addrs_step'])
176
177         vm_param = [
178             src_fv,
179             STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
180             dst_fv,
181             STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
182             STLVmFixChecksumHw(l3_offset="IP",
183                                l4_offset="UDP",
184                                l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
185         ]
186
187         return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
188
189     def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
190         idx_lat = None
191         streams = []
192         if l2frame == 'IMIX':
193             min_size = 64 if stream_cfg['vlan_tag'] is None else 68
194             self.adjust_imix_min_size(min_size)
195             for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
196                 pkt = self.create_pkt(stream_cfg, l2_frame_size)
197                 streams.append(STLStream(packet=pkt,
198                                          isg=0.1 * t,
199                                          flow_stats=STLFlowStats(
200                                              pg_id=self.stream_ids[port_handle]),
201                                          mode=STLTXCont(pps=ratio)))
202
203             if latency:
204                 idx_lat = self.id.next()
205                 sl = STLStream(packet=pkt,
206                                isg=isg,
207                                flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
208                                mode=STLTXCont(pps=self.LATENCY_PPS))
209                 streams.append(sl)
210         else:
211             pkt = self.create_pkt(stream_cfg, l2frame)
212             streams.append(STLStream(packet=pkt,
213                                      flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
214                                      mode=STLTXCont()))
215
216             if latency:
217                 idx_lat = self.id.next()
218                 streams.append(STLStream(packet=pkt,
219                                          flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
220                                          mode=STLTXCont(pps=self.LATENCY_PPS)))
221
222         if latency:
223             self.latencies[port_handle].append(idx_lat)
224
225         return streams
226
227     def init(self):
228         pass
229
230     @timeout(5)
231     def __connect(self, client):
232         client.connect()
233
234     def __connect_after_start(self):
235         # after start, Trex may take a bit of time to initialize
236         # so we need to retry a few times
237         for it in xrange(self.config.generic_retry_count):
238             try:
239                 time.sleep(1)
240                 self.client.connect()
241                 break
242             except Exception as ex:
243                 if it == (self.config.generic_retry_count - 1):
244                     raise ex
245                 LOG.info("Retrying connection to TRex (%s)...", ex.message)
246
247     def connect(self):
248         LOG.info("Connecting to TRex...")
249         server_ip = self.config.generator_config.ip
250
251         # Connect to TRex server
252         self.client = STLClient(server=server_ip)
253         try:
254             self.__connect(self.client)
255         except (TimeoutError, STLError) as e:
256             if server_ip == '127.0.0.1':
257                 try:
258                     self.__start_server()
259                     self.__connect_after_start()
260                 except (TimeoutError, STLError) as e:
261                     LOG.error('Cannot connect to TRex')
262                     LOG.error(traceback.format_exc())
263                     logpath = '/tmp/trex.log'
264                     if os.path.isfile(logpath):
265                         # Wait for TRex to finish writing error message
266                         last_size = 0
267                         for _ in xrange(self.config.generic_retry_count):
268                             size = os.path.getsize(logpath)
269                             if size == last_size:
270                                 # probably not writing anymore
271                                 break
272                             last_size = size
273                             time.sleep(1)
274                         with open(logpath, 'r') as f:
275                             message = f.read()
276                     else:
277                         message = e.message
278                     raise TrafficGeneratorException(message)
279             else:
280                 raise TrafficGeneratorException(e.message)
281
282         ports = list(self.config.generator_config.ports)
283         self.port_handle = ports
284         # Prepare the ports
285         self.client.reset(ports)
286
287     def set_mode(self):
288         if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
289             self.__set_l3_mode()
290         else:
291             self.__set_l2_mode()
292
293     def __set_l3_mode(self):
294         self.client.set_service_mode(ports=self.port_handle, enabled=True)
295         for port, device in zip(self.port_handle, self.config.generator_config.devices):
296             try:
297                 self.client.set_l3_mode(port=port,
298                                         src_ipv4=device.tg_gateway_ip,
299                                         dst_ipv4=device.dst.gateway_ip,
300                                         vlan=device.vlan_tag if device.vlan_tagging else None)
301             except STLError:
302                 # TRex tries to resolve ARP already, doesn't have to be successful yet
303                 continue
304         self.client.set_service_mode(ports=self.port_handle, enabled=False)
305
306     def __set_l2_mode(self):
307         self.client.set_service_mode(ports=self.port_handle, enabled=True)
308         for port, device in zip(self.port_handle, self.config.generator_config.devices):
309             for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
310                 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
311         self.client.set_service_mode(ports=self.port_handle, enabled=False)
312
313     def __start_server(self):
314         server = TRexTrafficServer()
315         server.run_server(self.config.generator_config, self.config.vlan_tagging)
316
317     def resolve_arp(self):
318         self.client.set_service_mode(ports=self.port_handle)
319         LOG.info('Polling ARP until successful')
320         resolved = 0
321         attempt = 0
322         for port, device in zip(self.port_handle, self.config.generator_config.devices):
323             ctx = self.client.create_service_ctx(port=port)
324
325             arps = [
326                 STLServiceARP(ctx,
327                               src_ip=cfg['ip_src_tg_gw'],
328                               dst_ip=cfg['mac_discovery_gw'],
329                               vlan=device.vlan_tag if device.vlan_tagging else None)
330                 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
331             ]
332
333             for _ in xrange(self.config.generic_retry_count):
334                 attempt += 1
335                 try:
336                     ctx.run(arps)
337                 except STLError:
338                     LOG.error(traceback.format_exc())
339                     continue
340
341                 self.arps[port] = [arp.get_record().dst_mac for arp in arps
342                                    if arp.get_record().dst_mac is not None]
343
344                 if len(self.arps[port]) == self.config.service_chain_count:
345                     resolved += 1
346                     LOG.info('ARP resolved successfully for port %s', port)
347                     break
348                 else:
349                     failed = [arp.get_record().dst_ip for arp in arps
350                               if arp.get_record().dst_mac is None]
351                     LOG.info('Retrying ARP for: %s (%d / %d)',
352                              failed, attempt, self.config.generic_retry_count)
353                     time.sleep(self.config.generic_poll_sec)
354
355         self.client.set_service_mode(ports=self.port_handle, enabled=False)
356         return resolved == len(self.port_handle)
357
358     def config_interface(self):
359         pass
360
361     def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
362         """Check if rate provided by user is above requirements. Applies only if latency is True."""
363         intf_speed = self.config.generator_config.intf_speed
364         if latency:
365             if bidirectional:
366                 mult = 2
367                 total_rate = 0
368                 for rate in rates:
369                     r = utils.convert_rates(l2frame_size, rate, intf_speed)
370                     total_rate += int(r['rate_pps'])
371             else:
372                 mult = 1
373                 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
374             # rate must be enough for latency stream and at least 1 pps for base stream per chain
375             required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
376             result = utils.convert_rates(l2frame_size,
377                                          {'rate_pps': required_rate},
378                                          intf_speed * mult)
379             result['result'] = total_rate >= required_rate
380             return result
381
382         return {'result': True}
383
384     def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
385         r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
386         if not r['result']:
387             raise TrafficGeneratorException(
388                 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
389                 .format(pps=r['rate_pps'],
390                         bps=r['rate_bps'],
391                         load=r['rate_percent']))
392
393         stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
394                        for d in self.config.generator_config.devices]
395         self.rates = [utils.to_rate_str(rate) for rate in rates]
396
397         for ph in self.port_handle:
398             # generate one pg_id for each direction
399             self.stream_ids[ph] = self.id.next()
400
401         for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
402             if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
403                 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
404                 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
405
406             self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
407                                                              fwd_stream_cfg,
408                                                              l2frame_size,
409                                                              latency=latency))
410             if len(self.rates) > 1:
411                 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
412                                                                  rev_stream_cfg,
413                                                                  l2frame_size,
414                                                                  isg=10.0,
415                                                                  latency=bidirectional and latency))
416
417         for ph in self.port_handle:
418             self.client.add_streams(self.streamblock[ph], ports=ph)
419             LOG.info('Created traffic stream for port %s.', ph)
420
421     def clear_streamblock(self):
422         self.streamblock = defaultdict(list)
423         self.latencies = defaultdict(list)
424         self.stream_ids = defaultdict(list)
425         self.rates = []
426         self.client.reset(self.port_handle)
427         LOG.info('Cleared all existing streams.')
428
429     def get_stats(self):
430         stats = self.client.get_stats()
431         return self.extract_stats(stats)
432
433     def get_macs(self):
434         return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
435
436     def clear_stats(self):
437         if self.port_handle:
438             self.client.clear_stats()
439
440     def start_traffic(self):
441         for port, rate in zip(self.port_handle, self.rates):
442             self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
443
444     def stop_traffic(self):
445         self.client.stop(ports=self.port_handle)
446
447     def start_capture(self):
448         if self.capture_id:
449             self.stop_capture()
450         self.client.set_service_mode(ports=self.port_handle)
451         self.capture_id = self.client.start_capture(rx_ports=self.port_handle)
452
453     def fetch_capture_packets(self):
454         if self.capture_id:
455             self.packet_list = []
456             self.client.fetch_capture_packets(capture_id=self.capture_id['id'],
457                                               output=self.packet_list)
458
459     def stop_capture(self):
460         if self.capture_id:
461             self.client.stop_capture(capture_id=self.capture_id['id'])
462             self.capture_id = None
463             self.client.set_service_mode(ports=self.port_handle, enabled=False)
464
465     def cleanup(self):
466         if self.client:
467             try:
468                 self.client.reset(self.port_handle)
469                 self.client.disconnect()
470             except STLError:
471                 # TRex does not like a reset while in disconnected state
472                 pass