NFVBENCH-78 TRex requires at least 16 payload bytes
[nfvbench.git] / nfvbench / traffic_gen / trex.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 import os
16 import random
17 import time
18 import traceback
19
20 from collections import defaultdict
21 from itertools import count
22 from nfvbench.log import LOG
23 from nfvbench.specs import ChainType
24 from nfvbench.traffic_server import TRexTrafficServer
25 from nfvbench.utils import cast_integer
26 from nfvbench.utils import timeout
27 from nfvbench.utils import TimeoutError
28 from traffic_base import AbstractTrafficGenerator
29 from traffic_base import TrafficGeneratorException
30 import traffic_utils as utils
31
32 # pylint: disable=import-error
33 from trex_stl_lib.api import CTRexVmInsFixHwCs
34 from trex_stl_lib.api import Dot1Q
35 from trex_stl_lib.api import Ether
36 from trex_stl_lib.api import IP
37 from trex_stl_lib.api import STLClient
38 from trex_stl_lib.api import STLError
39 from trex_stl_lib.api import STLFlowLatencyStats
40 from trex_stl_lib.api import STLFlowStats
41 from trex_stl_lib.api import STLPktBuilder
42 from trex_stl_lib.api import STLScVmRaw
43 from trex_stl_lib.api import STLStream
44 from trex_stl_lib.api import STLTXCont
45 from trex_stl_lib.api import STLVmFixChecksumHw
46 from trex_stl_lib.api import STLVmFlowVar
47 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
48 from trex_stl_lib.api import STLVmWrFlowVar
49 from trex_stl_lib.api import UDP
50 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
51
52
53 # pylint: enable=import-error
54
55
56 class TRex(AbstractTrafficGenerator):
57     LATENCY_PPS = 1000
58
59     def __init__(self, runner):
60         AbstractTrafficGenerator.__init__(self, runner)
61         self.client = None
62         self.id = count()
63         self.latencies = defaultdict(list)
64         self.stream_ids = defaultdict(list)
65         self.port_handle = []
66         self.streamblock = defaultdict(list)
67         self.rates = []
68         self.arps = {}
69
70     def get_version(self):
71         return self.client.get_server_version()
72
73     def extract_stats(self, in_stats):
74         utils.nan_replace(in_stats)
75         LOG.debug(in_stats)
76
77         result = {}
78         for ph in self.port_handle:
79             stats = self.__combine_stats(in_stats, ph)
80             result[ph] = {
81                 'tx': {
82                     'total_pkts': cast_integer(stats['tx_pkts']['total']),
83                     'total_pkt_bytes': cast_integer(stats['tx_bytes']['total']),
84                     'pkt_rate': cast_integer(stats['tx_pps']['total']),
85                     'pkt_bit_rate': cast_integer(stats['tx_bps']['total'])
86                 },
87                 'rx': {
88                     'total_pkts': cast_integer(stats['rx_pkts']['total']),
89                     'total_pkt_bytes': cast_integer(stats['rx_bytes']['total']),
90                     'pkt_rate': cast_integer(stats['rx_pps']['total']),
91                     'pkt_bit_rate': cast_integer(stats['rx_bps']['total']),
92                     'dropped_pkts': cast_integer(
93                         stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
94                 }
95             }
96
97             lat = self.__combine_latencies(in_stats, ph)
98             result[ph]['rx']['max_delay_usec'] = cast_integer(
99                 lat['total_max']) if 'total_max' in lat else float('nan')
100             result[ph]['rx']['min_delay_usec'] = cast_integer(
101                 lat['total_min']) if 'total_min' in lat else float('nan')
102             result[ph]['rx']['avg_delay_usec'] = cast_integer(
103                 lat['average']) if 'average' in lat else float('nan')
104         total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
105         result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
106         return result
107
108     def __combine_stats(self, in_stats, port_handle):
109         """Traverses TRex result dictionary and combines stream stats. Used for combining latency
110         and regular streams together.
111         """
112         result = defaultdict(lambda: defaultdict(float))
113
114         for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
115             record = in_stats['flow_stats'][pg_id]
116             for stat_type, stat_type_values in record.iteritems():
117                 for ph, value in stat_type_values.iteritems():
118                     result[stat_type][ph] += value
119
120         return result
121
122     def __combine_latencies(self, in_stats, port_handle):
123         """Traverses TRex result dictionary and combines chosen latency stats."""
124         if not self.latencies[port_handle]:
125             return {}
126
127         result = defaultdict(float)
128         result['total_min'] = float("inf")
129         for lat_id in self.latencies[port_handle]:
130             lat = in_stats['latency'][lat_id]
131             result['dropped_pkts'] += lat['err_cntrs']['dropped']
132             result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
133             result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
134             result['average'] += lat['latency']['average']
135
136         result['average'] /= len(self.latencies[port_handle])
137
138         return result
139
140     def create_pkt(self, stream_cfg, l2frame_size):
141
142         pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
143         # TRex requires minimum payload size 16B
144         if stream_cfg['vlan_tag'] is not None:
145             # 50 = 14 (Ethernet II) + 4 (Vlan tag) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
146             pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
147             l2payload_size = max(max(64, int(l2frame_size)) - 50, 16)
148         else:
149             # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
150             l2payload_size = max(max(64, int(l2frame_size)) - 46, 16)
151         payload = 'x' * l2payload_size
152         udp_args = {}
153         if stream_cfg['udp_src_port']:
154             udp_args['sport'] = int(stream_cfg['udp_src_port'])
155         if stream_cfg['udp_dst_port']:
156             udp_args['dport'] = int(stream_cfg['udp_dst_port'])
157         pkt_base /= IP() / UDP(**udp_args)
158
159         if stream_cfg['ip_addrs_step'] == 'random':
160             src_fv = STLVmFlowVarRepetableRandom(
161                 name="ip_src",
162                 min_value=stream_cfg['ip_src_addr'],
163                 max_value=stream_cfg['ip_src_addr_max'],
164                 size=4,
165                 seed=random.randint(0, 32767),
166                 limit=stream_cfg['ip_src_count'])
167             dst_fv = STLVmFlowVarRepetableRandom(
168                 name="ip_dst",
169                 min_value=stream_cfg['ip_dst_addr'],
170                 max_value=stream_cfg['ip_dst_addr_max'],
171                 size=4,
172                 seed=random.randint(0, 32767),
173                 limit=stream_cfg['ip_dst_count'])
174         else:
175             src_fv = STLVmFlowVar(
176                 name="ip_src",
177                 min_value=stream_cfg['ip_src_addr'],
178                 max_value=stream_cfg['ip_src_addr'],
179                 size=4,
180                 op="inc",
181                 step=stream_cfg['ip_addrs_step'])
182             dst_fv = STLVmFlowVar(
183                 name="ip_dst",
184                 min_value=stream_cfg['ip_dst_addr'],
185                 max_value=stream_cfg['ip_dst_addr_max'],
186                 size=4,
187                 op="inc",
188                 step=stream_cfg['ip_addrs_step'])
189
190         vm_param = [
191             src_fv,
192             STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
193             dst_fv,
194             STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
195             STLVmFixChecksumHw(l3_offset="IP",
196                                l4_offset="UDP",
197                                l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
198         ]
199
200         return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
201
202     def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
203         idx_lat = None
204         streams = []
205         if l2frame == 'IMIX':
206             for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
207                 pkt = self.create_pkt(stream_cfg, l2_frame_size)
208                 streams.append(STLStream(packet=pkt,
209                                          isg=0.1 * t,
210                                          flow_stats=STLFlowStats(
211                                              pg_id=self.stream_ids[port_handle]),
212                                          mode=STLTXCont(pps=ratio)))
213
214             if latency:
215                 idx_lat = self.id.next()
216                 sl = STLStream(packet=pkt,
217                                isg=isg,
218                                flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
219                                mode=STLTXCont(pps=self.LATENCY_PPS))
220                 streams.append(sl)
221         else:
222             pkt = self.create_pkt(stream_cfg, l2frame)
223             streams.append(STLStream(packet=pkt,
224                                      flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
225                                      mode=STLTXCont()))
226
227             if latency:
228                 idx_lat = self.id.next()
229                 streams.append(STLStream(packet=pkt,
230                                          flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
231                                          mode=STLTXCont(pps=self.LATENCY_PPS)))
232
233         if latency:
234             self.latencies[port_handle].append(idx_lat)
235
236         return streams
237
238     def init(self):
239         pass
240
241     @timeout(5)
242     def __connect(self, client):
243         client.connect()
244
245     def __connect_after_start(self):
246         # after start, Trex may take a bit of time to initialize
247         # so we need to retry a few times
248         for it in xrange(self.config.generic_retry_count):
249             try:
250                 time.sleep(1)
251                 self.client.connect()
252                 break
253             except Exception as ex:
254                 if it == (self.config.generic_retry_count - 1):
255                     raise ex
256                 LOG.info("Retrying connection to TRex (%s)...", ex.message)
257
258     def connect(self):
259         LOG.info("Connecting to TRex...")
260         server_ip = self.config.generator_config.ip
261
262         # Connect to TRex server
263         self.client = STLClient(server=server_ip)
264         try:
265             self.__connect(self.client)
266         except (TimeoutError, STLError) as e:
267             if server_ip == '127.0.0.1':
268                 try:
269                     self.__start_server()
270                     self.__connect_after_start()
271                 except (TimeoutError, STLError) as e:
272                     LOG.error('Cannot connect to TRex')
273                     LOG.error(traceback.format_exc())
274                     logpath = '/tmp/trex.log'
275                     if os.path.isfile(logpath):
276                         # Wait for TRex to finish writing error message
277                         last_size = 0
278                         for _ in xrange(self.config.generic_retry_count):
279                             size = os.path.getsize(logpath)
280                             if size == last_size:
281                                 # probably not writing anymore
282                                 break
283                             last_size = size
284                             time.sleep(1)
285                         with open(logpath, 'r') as f:
286                             message = f.read()
287                     else:
288                         message = e.message
289                     raise TrafficGeneratorException(message)
290             else:
291                 raise TrafficGeneratorException(e.message)
292
293         ports = list(self.config.generator_config.ports)
294         self.port_handle = ports
295         # Prepare the ports
296         self.client.reset(ports)
297
298     def set_mode(self):
299         if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
300             self.__set_l3_mode()
301         else:
302             self.__set_l2_mode()
303
304     def __set_l3_mode(self):
305         self.client.set_service_mode(ports=self.port_handle, enabled=True)
306         for port, device in zip(self.port_handle, self.config.generator_config.devices):
307             try:
308                 self.client.set_l3_mode(port=port,
309                                         src_ipv4=device.tg_gateway_ip,
310                                         dst_ipv4=device.dst.gateway_ip,
311                                         vlan=device.vlan_tag if device.vlan_tagging else None)
312             except STLError:
313                 # TRex tries to resolve ARP already, doesn't have to be successful yet
314                 continue
315         self.client.set_service_mode(ports=self.port_handle, enabled=False)
316
317     def __set_l2_mode(self):
318         self.client.set_service_mode(ports=self.port_handle, enabled=True)
319         for port, device in zip(self.port_handle, self.config.generator_config.devices):
320             for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
321                 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
322         self.client.set_service_mode(ports=self.port_handle, enabled=False)
323
324     def __start_server(self):
325         server = TRexTrafficServer()
326         server.run_server(self.config.generator_config, self.config.vlan_tagging)
327
328     def resolve_arp(self):
329         self.client.set_service_mode(ports=self.port_handle)
330         LOG.info('Polling ARP until successful')
331         resolved = 0
332         attempt = 0
333         for port, device in zip(self.port_handle, self.config.generator_config.devices):
334             ctx = self.client.create_service_ctx(port=port)
335
336             arps = [
337                 STLServiceARP(ctx,
338                               src_ip=cfg['ip_src_tg_gw'],
339                               dst_ip=cfg['mac_discovery_gw'],
340                               vlan=device.vlan_tag if device.vlan_tagging else None)
341                 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
342             ]
343
344             for _ in xrange(self.config.generic_retry_count):
345                 attempt += 1
346                 try:
347                     ctx.run(arps)
348                 except STLError:
349                     LOG.error(traceback.format_exc())
350                     continue
351
352                 self.arps[port] = [arp.get_record().dst_mac for arp in arps
353                                    if arp.get_record().dst_mac is not None]
354
355                 if len(self.arps[port]) == self.config.service_chain_count:
356                     resolved += 1
357                     LOG.info('ARP resolved successfully for port %s', port)
358                     break
359                 else:
360                     failed = [arp.get_record().dst_ip for arp in arps
361                               if arp.get_record().dst_mac is None]
362                     LOG.info('Retrying ARP for: %s (%d / %d)',
363                              failed, attempt, self.config.generic_retry_count)
364                     time.sleep(self.config.generic_poll_sec)
365
366         self.client.set_service_mode(ports=self.port_handle, enabled=False)
367         return resolved == len(self.port_handle)
368
369     def config_interface(self):
370         pass
371
372     def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
373         """Check if rate provided by user is above requirements. Applies only if latency is True."""
374         intf_speed = self.config.generator_config.intf_speed
375         if latency:
376             if bidirectional:
377                 mult = 2
378                 total_rate = 0
379                 for rate in rates:
380                     r = utils.convert_rates(l2frame_size, rate, intf_speed)
381                     total_rate += int(r['rate_pps'])
382             else:
383                 mult = 1
384                 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
385             # rate must be enough for latency stream and at least 1 pps for base stream per chain
386             required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
387             result = utils.convert_rates(l2frame_size,
388                                          {'rate_pps': required_rate},
389                                          intf_speed * mult)
390             result['result'] = total_rate >= required_rate
391             return result
392
393         return {'result': True}
394
395     def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
396         r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
397         if not r['result']:
398             raise TrafficGeneratorException(
399                 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
400                 .format(pps=r['rate_pps'],
401                         bps=r['rate_bps'],
402                         load=r['rate_percent']))
403
404         stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
405                        for d in self.config.generator_config.devices]
406         self.rates = [utils.to_rate_str(rate) for rate in rates]
407
408         for ph in self.port_handle:
409             # generate one pg_id for each direction
410             self.stream_ids[ph] = self.id.next()
411
412         for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
413             if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
414                 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
415                 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
416
417             self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
418                                                              fwd_stream_cfg,
419                                                              l2frame_size,
420                                                              latency=latency))
421             if len(self.rates) > 1:
422                 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
423                                                                  rev_stream_cfg,
424                                                                  l2frame_size,
425                                                                  isg=10.0,
426                                                                  latency=bidirectional and latency))
427
428         for ph in self.port_handle:
429             self.client.add_streams(self.streamblock[ph], ports=ph)
430             LOG.info('Created traffic stream for port %s.', ph)
431
432     def clear_streamblock(self):
433         self.streamblock = defaultdict(list)
434         self.latencies = defaultdict(list)
435         self.stream_ids = defaultdict(list)
436         self.rates = []
437         self.client.reset(self.port_handle)
438         LOG.info('Cleared all existing streams.')
439
440     def get_stats(self):
441         stats = self.client.get_pgid_stats()
442         return self.extract_stats(stats)
443
444     def get_macs(self):
445         return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
446
447     def clear_stats(self):
448         if self.port_handle:
449             self.client.clear_stats()
450
451     def start_traffic(self):
452         for port, rate in zip(self.port_handle, self.rates):
453             self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
454
455     def stop_traffic(self):
456         self.client.stop(ports=self.port_handle)
457
458     def cleanup(self):
459         if self.client:
460             try:
461                 self.client.reset(self.port_handle)
462                 self.client.disconnect()
463             except STLError:
464                 # TRex does not like a reset while in disconnected state
465                 pass