NFVBENCH-80 latency stream for IMIX uses 1518B frames
[nfvbench.git] / nfvbench / traffic_gen / trex.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 import os
16 import random
17 import time
18 import traceback
19
20 from collections import defaultdict
21 from itertools import count
22 from nfvbench.log import LOG
23 from nfvbench.specs import ChainType
24 from nfvbench.traffic_server import TRexTrafficServer
25 from nfvbench.utils import cast_integer
26 from nfvbench.utils import timeout
27 from nfvbench.utils import TimeoutError
28 from traffic_base import AbstractTrafficGenerator
29 from traffic_base import TrafficGeneratorException
30 import traffic_utils as utils
31
32 # pylint: disable=import-error
33 from trex_stl_lib.api import CTRexVmInsFixHwCs
34 from trex_stl_lib.api import Dot1Q
35 from trex_stl_lib.api import Ether
36 from trex_stl_lib.api import IP
37 from trex_stl_lib.api import STLClient
38 from trex_stl_lib.api import STLError
39 from trex_stl_lib.api import STLFlowLatencyStats
40 from trex_stl_lib.api import STLFlowStats
41 from trex_stl_lib.api import STLPktBuilder
42 from trex_stl_lib.api import STLScVmRaw
43 from trex_stl_lib.api import STLStream
44 from trex_stl_lib.api import STLTXCont
45 from trex_stl_lib.api import STLVmFixChecksumHw
46 from trex_stl_lib.api import STLVmFlowVar
47 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
48 from trex_stl_lib.api import STLVmWrFlowVar
49 from trex_stl_lib.api import UDP
50 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
51
52
53 # pylint: enable=import-error
54
55
56 class TRex(AbstractTrafficGenerator):
57     LATENCY_PPS = 1000
58
59     def __init__(self, runner):
60         AbstractTrafficGenerator.__init__(self, runner)
61         self.client = None
62         self.id = count()
63         self.latencies = defaultdict(list)
64         self.stream_ids = defaultdict(list)
65         self.port_handle = []
66         self.streamblock = defaultdict(list)
67         self.rates = []
68         self.arps = {}
69         self.capture_id = None
70         self.packet_list = []
71
72     def get_version(self):
73         return self.client.get_server_version()
74
75     def extract_stats(self, in_stats):
76         utils.nan_replace(in_stats)
77         LOG.debug(in_stats)
78
79         result = {}
80         for ph in self.port_handle:
81             stats = in_stats[ph]
82             result[ph] = {
83                 'tx': {
84                     'total_pkts': cast_integer(stats['opackets']),
85                     'total_pkt_bytes': cast_integer(stats['obytes']),
86                     'pkt_rate': cast_integer(stats['tx_pps']),
87                     'pkt_bit_rate': cast_integer(stats['tx_bps'])
88                 },
89                 'rx': {
90                     'total_pkts': cast_integer(stats['ipackets']),
91                     'total_pkt_bytes': cast_integer(stats['ibytes']),
92                     'pkt_rate': cast_integer(stats['rx_pps']),
93                     'pkt_bit_rate': cast_integer(stats['rx_bps']),
94                     'dropped_pkts': cast_integer(
95                         stats['opackets'] - stats['ipackets'])
96                 }
97             }
98
99             lat = self.__combine_latencies(in_stats, ph)
100             result[ph]['rx']['max_delay_usec'] = cast_integer(
101                 lat['total_max']) if 'total_max' in lat else float('nan')
102             result[ph]['rx']['min_delay_usec'] = cast_integer(
103                 lat['total_min']) if 'total_min' in lat else float('nan')
104             result[ph]['rx']['avg_delay_usec'] = cast_integer(
105                 lat['average']) if 'average' in lat else float('nan')
106         total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
107         result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
108         return result
109
110     def __combine_latencies(self, in_stats, port_handle):
111         """Traverses TRex result dictionary and combines chosen latency stats."""
112         if not self.latencies[port_handle]:
113             return {}
114
115         result = defaultdict(float)
116         result['total_min'] = float("inf")
117         for lat_id in self.latencies[port_handle]:
118             lat = in_stats['latency'][lat_id]
119             result['dropped_pkts'] += lat['err_cntrs']['dropped']
120             result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
121             result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
122             result['average'] += lat['latency']['average']
123
124         result['average'] /= len(self.latencies[port_handle])
125
126         return result
127
128     def create_pkt(self, stream_cfg, l2frame_size):
129
130         pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
131         if stream_cfg['vlan_tag'] is not None:
132             # 50 = 14 (Ethernet II) + 4 (Vlan tag) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
133             pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
134             l2payload_size = int(l2frame_size) - 50
135         else:
136             # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
137             l2payload_size = int(l2frame_size) - 46
138         payload = 'x' * l2payload_size
139         udp_args = {}
140         if stream_cfg['udp_src_port']:
141             udp_args['sport'] = int(stream_cfg['udp_src_port'])
142         if stream_cfg['udp_dst_port']:
143             udp_args['dport'] = int(stream_cfg['udp_dst_port'])
144         pkt_base /= IP() / UDP(**udp_args)
145
146         if stream_cfg['ip_addrs_step'] == 'random':
147             src_fv = STLVmFlowVarRepetableRandom(
148                 name="ip_src",
149                 min_value=stream_cfg['ip_src_addr'],
150                 max_value=stream_cfg['ip_src_addr_max'],
151                 size=4,
152                 seed=random.randint(0, 32767),
153                 limit=stream_cfg['ip_src_count'])
154             dst_fv = STLVmFlowVarRepetableRandom(
155                 name="ip_dst",
156                 min_value=stream_cfg['ip_dst_addr'],
157                 max_value=stream_cfg['ip_dst_addr_max'],
158                 size=4,
159                 seed=random.randint(0, 32767),
160                 limit=stream_cfg['ip_dst_count'])
161         else:
162             src_fv = STLVmFlowVar(
163                 name="ip_src",
164                 min_value=stream_cfg['ip_src_addr'],
165                 max_value=stream_cfg['ip_src_addr'],
166                 size=4,
167                 op="inc",
168                 step=stream_cfg['ip_addrs_step'])
169             dst_fv = STLVmFlowVar(
170                 name="ip_dst",
171                 min_value=stream_cfg['ip_dst_addr'],
172                 max_value=stream_cfg['ip_dst_addr_max'],
173                 size=4,
174                 op="inc",
175                 step=stream_cfg['ip_addrs_step'])
176
177         vm_param = [
178             src_fv,
179             STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
180             dst_fv,
181             STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
182             STLVmFixChecksumHw(l3_offset="IP",
183                                l4_offset="UDP",
184                                l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
185         ]
186
187         return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
188
189     def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
190         idx_lat = None
191         streams = []
192         if l2frame == 'IMIX':
193             min_size = 64 if stream_cfg['vlan_tag'] is None else 68
194             self.adjust_imix_min_size(min_size)
195             for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
196                 pkt = self.create_pkt(stream_cfg, l2_frame_size)
197                 streams.append(STLStream(packet=pkt,
198                                          isg=0.1 * t,
199                                          flow_stats=STLFlowStats(
200                                              pg_id=self.stream_ids[port_handle]),
201                                          mode=STLTXCont(pps=ratio)))
202
203             if latency:
204                 idx_lat = self.id.next()
205                 pkt = self.create_pkt(stream_cfg, self.imix_avg_l2_size)
206                 sl = STLStream(packet=pkt,
207                                isg=isg,
208                                flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
209                                mode=STLTXCont(pps=self.LATENCY_PPS))
210                 streams.append(sl)
211         else:
212             pkt = self.create_pkt(stream_cfg, l2frame)
213             streams.append(STLStream(packet=pkt,
214                                      flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
215                                      mode=STLTXCont()))
216
217             if latency:
218                 idx_lat = self.id.next()
219                 streams.append(STLStream(packet=pkt,
220                                          flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
221                                          mode=STLTXCont(pps=self.LATENCY_PPS)))
222
223         if latency:
224             self.latencies[port_handle].append(idx_lat)
225
226         return streams
227
228     def init(self):
229         pass
230
231     @timeout(5)
232     def __connect(self, client):
233         client.connect()
234
235     def __connect_after_start(self):
236         # after start, Trex may take a bit of time to initialize
237         # so we need to retry a few times
238         for it in xrange(self.config.generic_retry_count):
239             try:
240                 time.sleep(1)
241                 self.client.connect()
242                 break
243             except Exception as ex:
244                 if it == (self.config.generic_retry_count - 1):
245                     raise ex
246                 LOG.info("Retrying connection to TRex (%s)...", ex.message)
247
248     def connect(self):
249         LOG.info("Connecting to TRex...")
250         server_ip = self.config.generator_config.ip
251
252         # Connect to TRex server
253         self.client = STLClient(server=server_ip)
254         try:
255             self.__connect(self.client)
256         except (TimeoutError, STLError) as e:
257             if server_ip == '127.0.0.1':
258                 try:
259                     self.__start_server()
260                     self.__connect_after_start()
261                 except (TimeoutError, STLError) as e:
262                     LOG.error('Cannot connect to TRex')
263                     LOG.error(traceback.format_exc())
264                     logpath = '/tmp/trex.log'
265                     if os.path.isfile(logpath):
266                         # Wait for TRex to finish writing error message
267                         last_size = 0
268                         for _ in xrange(self.config.generic_retry_count):
269                             size = os.path.getsize(logpath)
270                             if size == last_size:
271                                 # probably not writing anymore
272                                 break
273                             last_size = size
274                             time.sleep(1)
275                         with open(logpath, 'r') as f:
276                             message = f.read()
277                     else:
278                         message = e.message
279                     raise TrafficGeneratorException(message)
280             else:
281                 raise TrafficGeneratorException(e.message)
282
283         ports = list(self.config.generator_config.ports)
284         self.port_handle = ports
285         # Prepare the ports
286         self.client.reset(ports)
287
288     def set_mode(self):
289         if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
290             self.__set_l3_mode()
291         else:
292             self.__set_l2_mode()
293
294     def __set_l3_mode(self):
295         self.client.set_service_mode(ports=self.port_handle, enabled=True)
296         for port, device in zip(self.port_handle, self.config.generator_config.devices):
297             try:
298                 self.client.set_l3_mode(port=port,
299                                         src_ipv4=device.tg_gateway_ip,
300                                         dst_ipv4=device.dst.gateway_ip,
301                                         vlan=device.vlan_tag if device.vlan_tagging else None)
302             except STLError:
303                 # TRex tries to resolve ARP already, doesn't have to be successful yet
304                 continue
305         self.client.set_service_mode(ports=self.port_handle, enabled=False)
306
307     def __set_l2_mode(self):
308         self.client.set_service_mode(ports=self.port_handle, enabled=True)
309         for port, device in zip(self.port_handle, self.config.generator_config.devices):
310             for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
311                 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
312         self.client.set_service_mode(ports=self.port_handle, enabled=False)
313
314     def __start_server(self):
315         server = TRexTrafficServer()
316         server.run_server(self.config.generator_config, self.config.vlan_tagging)
317
318     def resolve_arp(self):
319         self.client.set_service_mode(ports=self.port_handle)
320         LOG.info('Polling ARP until successful')
321         resolved = 0
322         attempt = 0
323         for port, device in zip(self.port_handle, self.config.generator_config.devices):
324             ctx = self.client.create_service_ctx(port=port)
325
326             arps = [
327                 STLServiceARP(ctx,
328                               src_ip=cfg['ip_src_tg_gw'],
329                               dst_ip=cfg['mac_discovery_gw'],
330                               vlan=device.vlan_tag if device.vlan_tagging else None)
331                 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
332             ]
333
334             for _ in xrange(self.config.generic_retry_count):
335                 attempt += 1
336                 try:
337                     ctx.run(arps)
338                 except STLError:
339                     LOG.error(traceback.format_exc())
340                     continue
341
342                 self.arps[port] = [arp.get_record().dst_mac for arp in arps
343                                    if arp.get_record().dst_mac is not None]
344
345                 if len(self.arps[port]) == self.config.service_chain_count:
346                     resolved += 1
347                     LOG.info('ARP resolved successfully for port %s', port)
348                     break
349                 else:
350                     failed = [arp.get_record().dst_ip for arp in arps
351                               if arp.get_record().dst_mac is None]
352                     LOG.info('Retrying ARP for: %s (%d / %d)',
353                              failed, attempt, self.config.generic_retry_count)
354                     time.sleep(self.config.generic_poll_sec)
355
356         self.client.set_service_mode(ports=self.port_handle, enabled=False)
357         return resolved == len(self.port_handle)
358
359     def config_interface(self):
360         pass
361
362     def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
363         """Check if rate provided by user is above requirements. Applies only if latency is True."""
364         intf_speed = self.config.generator_config.intf_speed
365         if latency:
366             if bidirectional:
367                 mult = 2
368                 total_rate = 0
369                 for rate in rates:
370                     r = utils.convert_rates(l2frame_size, rate, intf_speed)
371                     total_rate += int(r['rate_pps'])
372             else:
373                 mult = 1
374                 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
375             # rate must be enough for latency stream and at least 1 pps for base stream per chain
376             required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
377             result = utils.convert_rates(l2frame_size,
378                                          {'rate_pps': required_rate},
379                                          intf_speed * mult)
380             result['result'] = total_rate >= required_rate
381             return result
382
383         return {'result': True}
384
385     def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
386         r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
387         if not r['result']:
388             raise TrafficGeneratorException(
389                 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
390                 .format(pps=r['rate_pps'],
391                         bps=r['rate_bps'],
392                         load=r['rate_percent']))
393
394         stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
395                        for d in self.config.generator_config.devices]
396         self.rates = [utils.to_rate_str(rate) for rate in rates]
397
398         for ph in self.port_handle:
399             # generate one pg_id for each direction
400             self.stream_ids[ph] = self.id.next()
401
402         for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
403             if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
404                 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
405                 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
406
407             self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
408                                                              fwd_stream_cfg,
409                                                              l2frame_size,
410                                                              latency=latency))
411             if len(self.rates) > 1:
412                 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
413                                                                  rev_stream_cfg,
414                                                                  l2frame_size,
415                                                                  isg=10.0,
416                                                                  latency=bidirectional and latency))
417
418         for ph in self.port_handle:
419             self.client.add_streams(self.streamblock[ph], ports=ph)
420             LOG.info('Created traffic stream for port %s.', ph)
421
422     def clear_streamblock(self):
423         self.streamblock = defaultdict(list)
424         self.latencies = defaultdict(list)
425         self.stream_ids = defaultdict(list)
426         self.rates = []
427         self.client.reset(self.port_handle)
428         LOG.info('Cleared all existing streams.')
429
430     def get_stats(self):
431         stats = self.client.get_stats()
432         return self.extract_stats(stats)
433
434     def get_macs(self):
435         return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
436
437     def clear_stats(self):
438         if self.port_handle:
439             self.client.clear_stats()
440
441     def start_traffic(self):
442         for port, rate in zip(self.port_handle, self.rates):
443             self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
444
445     def stop_traffic(self):
446         self.client.stop(ports=self.port_handle)
447
448     def start_capture(self):
449         if self.capture_id:
450             self.stop_capture()
451         self.client.set_service_mode(ports=self.port_handle)
452         self.capture_id = self.client.start_capture(rx_ports=self.port_handle)
453
454     def fetch_capture_packets(self):
455         if self.capture_id:
456             self.packet_list = []
457             self.client.fetch_capture_packets(capture_id=self.capture_id['id'],
458                                               output=self.packet_list)
459
460     def stop_capture(self):
461         if self.capture_id:
462             self.client.stop_capture(capture_id=self.capture_id['id'])
463             self.capture_id = None
464             self.client.set_service_mode(ports=self.port_handle, enabled=False)
465
466     def cleanup(self):
467         if self.client:
468             try:
469                 self.client.reset(self.port_handle)
470                 self.client.disconnect()
471             except STLError:
472                 # TRex does not like a reset while in disconnected state
473                 pass