NFVBENCH-80 latency stream for IMIX uses 1518B frames
[nfvbench.git] / nfvbench / traffic_gen / trex.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 import os
16 import random
17 import time
18 import traceback
19
20 from collections import defaultdict
21 from itertools import count
22 from nfvbench.log import LOG
23 from nfvbench.specs import ChainType
24 from nfvbench.traffic_server import TRexTrafficServer
25 from nfvbench.utils import cast_integer
26 from nfvbench.utils import timeout
27 from nfvbench.utils import TimeoutError
28 from traffic_base import AbstractTrafficGenerator
29 from traffic_base import TrafficGeneratorException
30 import traffic_utils as utils
31
32 # pylint: disable=import-error
33 from trex_stl_lib.api import CTRexVmInsFixHwCs
34 from trex_stl_lib.api import Dot1Q
35 from trex_stl_lib.api import Ether
36 from trex_stl_lib.api import IP
37 from trex_stl_lib.api import STLClient
38 from trex_stl_lib.api import STLError
39 from trex_stl_lib.api import STLFlowLatencyStats
40 from trex_stl_lib.api import STLFlowStats
41 from trex_stl_lib.api import STLPktBuilder
42 from trex_stl_lib.api import STLScVmRaw
43 from trex_stl_lib.api import STLStream
44 from trex_stl_lib.api import STLTXCont
45 from trex_stl_lib.api import STLVmFixChecksumHw
46 from trex_stl_lib.api import STLVmFlowVar
47 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
48 from trex_stl_lib.api import STLVmWrFlowVar
49 from trex_stl_lib.api import UDP
50 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
51
52
53 # pylint: enable=import-error
54
55
56 class TRex(AbstractTrafficGenerator):
57     LATENCY_PPS = 1000
58
59     def __init__(self, runner):
60         AbstractTrafficGenerator.__init__(self, runner)
61         self.client = None
62         self.id = count()
63         self.latencies = defaultdict(list)
64         self.stream_ids = defaultdict(list)
65         self.port_handle = []
66         self.streamblock = defaultdict(list)
67         self.rates = []
68         self.arps = {}
69         self.capture_id = None
70         self.packet_list = []
71
72     def get_version(self):
73         return self.client.get_server_version()
74
75     def extract_stats(self, in_stats):
76         utils.nan_replace(in_stats)
77         LOG.debug(in_stats)
78
79         result = {}
80         for ph in self.port_handle:
81             stats = self.__combine_stats(in_stats, ph)
82             result[ph] = {
83                 'tx': {
84                     'total_pkts': cast_integer(stats['tx_pkts']['total']),
85                     'total_pkt_bytes': cast_integer(stats['tx_bytes']['total']),
86                     'pkt_rate': cast_integer(stats['tx_pps']['total']),
87                     'pkt_bit_rate': cast_integer(stats['tx_bps']['total'])
88                 },
89                 'rx': {
90                     'total_pkts': cast_integer(stats['rx_pkts']['total']),
91                     'total_pkt_bytes': cast_integer(stats['rx_bytes']['total']),
92                     'pkt_rate': cast_integer(stats['rx_pps']['total']),
93                     'pkt_bit_rate': cast_integer(stats['rx_bps']['total']),
94                     'dropped_pkts': cast_integer(
95                         stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
96                 }
97             }
98
99             lat = self.__combine_latencies(in_stats, ph)
100             result[ph]['rx']['max_delay_usec'] = cast_integer(
101                 lat['total_max']) if 'total_max' in lat else float('nan')
102             result[ph]['rx']['min_delay_usec'] = cast_integer(
103                 lat['total_min']) if 'total_min' in lat else float('nan')
104             result[ph]['rx']['avg_delay_usec'] = cast_integer(
105                 lat['average']) if 'average' in lat else float('nan')
106         total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
107         result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
108         return result
109
110     def __combine_stats(self, in_stats, port_handle):
111         """Traverses TRex result dictionary and combines stream stats. Used for combining latency
112         and regular streams together.
113         """
114         result = defaultdict(lambda: defaultdict(float))
115
116         for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
117             record = in_stats['flow_stats'][pg_id]
118             for stat_type, stat_type_values in record.iteritems():
119                 for ph, value in stat_type_values.iteritems():
120                     result[stat_type][ph] += value
121
122         return result
123
124     def __combine_latencies(self, in_stats, port_handle):
125         """Traverses TRex result dictionary and combines chosen latency stats."""
126         if not self.latencies[port_handle]:
127             return {}
128
129         result = defaultdict(float)
130         result['total_min'] = float("inf")
131         for lat_id in self.latencies[port_handle]:
132             lat = in_stats['latency'][lat_id]
133             result['dropped_pkts'] += lat['err_cntrs']['dropped']
134             result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
135             result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
136             result['average'] += lat['latency']['average']
137
138         result['average'] /= len(self.latencies[port_handle])
139
140         return result
141
142     def create_pkt(self, stream_cfg, l2frame_size):
143
144         pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
145         if stream_cfg['vlan_tag'] is not None:
146             # 50 = 14 (Ethernet II) + 4 (Vlan tag) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
147             pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
148             l2payload_size = int(l2frame_size) - 50
149         else:
150             # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
151             l2payload_size = int(l2frame_size) - 46
152         payload = 'x' * l2payload_size
153         udp_args = {}
154         if stream_cfg['udp_src_port']:
155             udp_args['sport'] = int(stream_cfg['udp_src_port'])
156         if stream_cfg['udp_dst_port']:
157             udp_args['dport'] = int(stream_cfg['udp_dst_port'])
158         pkt_base /= IP() / UDP(**udp_args)
159
160         if stream_cfg['ip_addrs_step'] == 'random':
161             src_fv = STLVmFlowVarRepetableRandom(
162                 name="ip_src",
163                 min_value=stream_cfg['ip_src_addr'],
164                 max_value=stream_cfg['ip_src_addr_max'],
165                 size=4,
166                 seed=random.randint(0, 32767),
167                 limit=stream_cfg['ip_src_count'])
168             dst_fv = STLVmFlowVarRepetableRandom(
169                 name="ip_dst",
170                 min_value=stream_cfg['ip_dst_addr'],
171                 max_value=stream_cfg['ip_dst_addr_max'],
172                 size=4,
173                 seed=random.randint(0, 32767),
174                 limit=stream_cfg['ip_dst_count'])
175         else:
176             src_fv = STLVmFlowVar(
177                 name="ip_src",
178                 min_value=stream_cfg['ip_src_addr'],
179                 max_value=stream_cfg['ip_src_addr'],
180                 size=4,
181                 op="inc",
182                 step=stream_cfg['ip_addrs_step'])
183             dst_fv = STLVmFlowVar(
184                 name="ip_dst",
185                 min_value=stream_cfg['ip_dst_addr'],
186                 max_value=stream_cfg['ip_dst_addr_max'],
187                 size=4,
188                 op="inc",
189                 step=stream_cfg['ip_addrs_step'])
190
191         vm_param = [
192             src_fv,
193             STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
194             dst_fv,
195             STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
196             STLVmFixChecksumHw(l3_offset="IP",
197                                l4_offset="UDP",
198                                l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
199         ]
200
201         return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
202
203     def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
204         idx_lat = None
205         streams = []
206         if l2frame == 'IMIX':
207             min_size = 64 if stream_cfg['vlan_tag'] is None else 68
208             self.adjust_imix_min_size(min_size)
209             for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
210                 pkt = self.create_pkt(stream_cfg, l2_frame_size)
211                 streams.append(STLStream(packet=pkt,
212                                          isg=0.1 * t,
213                                          flow_stats=STLFlowStats(
214                                              pg_id=self.stream_ids[port_handle]),
215                                          mode=STLTXCont(pps=ratio)))
216
217             if latency:
218                 idx_lat = self.id.next()
219                 pkt = self.create_pkt(stream_cfg, self.imix_avg_l2_size)
220                 sl = STLStream(packet=pkt,
221                                isg=isg,
222                                flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
223                                mode=STLTXCont(pps=self.LATENCY_PPS))
224                 streams.append(sl)
225         else:
226             pkt = self.create_pkt(stream_cfg, l2frame)
227             streams.append(STLStream(packet=pkt,
228                                      flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
229                                      mode=STLTXCont()))
230
231             if latency:
232                 idx_lat = self.id.next()
233                 streams.append(STLStream(packet=pkt,
234                                          flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
235                                          mode=STLTXCont(pps=self.LATENCY_PPS)))
236
237         if latency:
238             self.latencies[port_handle].append(idx_lat)
239
240         return streams
241
242     def init(self):
243         pass
244
245     @timeout(5)
246     def __connect(self, client):
247         client.connect()
248
249     def __connect_after_start(self):
250         # after start, Trex may take a bit of time to initialize
251         # so we need to retry a few times
252         for it in xrange(self.config.generic_retry_count):
253             try:
254                 time.sleep(1)
255                 self.client.connect()
256                 break
257             except Exception as ex:
258                 if it == (self.config.generic_retry_count - 1):
259                     raise ex
260                 LOG.info("Retrying connection to TRex (%s)...", ex.message)
261
262     def connect(self):
263         LOG.info("Connecting to TRex...")
264         server_ip = self.config.generator_config.ip
265
266         # Connect to TRex server
267         self.client = STLClient(server=server_ip)
268         try:
269             self.__connect(self.client)
270         except (TimeoutError, STLError) as e:
271             if server_ip == '127.0.0.1':
272                 try:
273                     self.__start_server()
274                     self.__connect_after_start()
275                 except (TimeoutError, STLError) as e:
276                     LOG.error('Cannot connect to TRex')
277                     LOG.error(traceback.format_exc())
278                     logpath = '/tmp/trex.log'
279                     if os.path.isfile(logpath):
280                         # Wait for TRex to finish writing error message
281                         last_size = 0
282                         for _ in xrange(self.config.generic_retry_count):
283                             size = os.path.getsize(logpath)
284                             if size == last_size:
285                                 # probably not writing anymore
286                                 break
287                             last_size = size
288                             time.sleep(1)
289                         with open(logpath, 'r') as f:
290                             message = f.read()
291                     else:
292                         message = e.message
293                     raise TrafficGeneratorException(message)
294             else:
295                 raise TrafficGeneratorException(e.message)
296
297         ports = list(self.config.generator_config.ports)
298         self.port_handle = ports
299         # Prepare the ports
300         self.client.reset(ports)
301
302     def set_mode(self):
303         if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
304             self.__set_l3_mode()
305         else:
306             self.__set_l2_mode()
307
308     def __set_l3_mode(self):
309         self.client.set_service_mode(ports=self.port_handle, enabled=True)
310         for port, device in zip(self.port_handle, self.config.generator_config.devices):
311             try:
312                 self.client.set_l3_mode(port=port,
313                                         src_ipv4=device.tg_gateway_ip,
314                                         dst_ipv4=device.dst.gateway_ip,
315                                         vlan=device.vlan_tag if device.vlan_tagging else None)
316             except STLError:
317                 # TRex tries to resolve ARP already, doesn't have to be successful yet
318                 continue
319         self.client.set_service_mode(ports=self.port_handle, enabled=False)
320
321     def __set_l2_mode(self):
322         self.client.set_service_mode(ports=self.port_handle, enabled=True)
323         for port, device in zip(self.port_handle, self.config.generator_config.devices):
324             for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
325                 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
326         self.client.set_service_mode(ports=self.port_handle, enabled=False)
327
328     def __start_server(self):
329         server = TRexTrafficServer()
330         server.run_server(self.config.generator_config, self.config.vlan_tagging)
331
332     def resolve_arp(self):
333         self.client.set_service_mode(ports=self.port_handle)
334         LOG.info('Polling ARP until successful')
335         resolved = 0
336         attempt = 0
337         for port, device in zip(self.port_handle, self.config.generator_config.devices):
338             ctx = self.client.create_service_ctx(port=port)
339
340             arps = [
341                 STLServiceARP(ctx,
342                               src_ip=cfg['ip_src_tg_gw'],
343                               dst_ip=cfg['mac_discovery_gw'],
344                               vlan=device.vlan_tag if device.vlan_tagging else None)
345                 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
346             ]
347
348             for _ in xrange(self.config.generic_retry_count):
349                 attempt += 1
350                 try:
351                     ctx.run(arps)
352                 except STLError:
353                     LOG.error(traceback.format_exc())
354                     continue
355
356                 self.arps[port] = [arp.get_record().dst_mac for arp in arps
357                                    if arp.get_record().dst_mac is not None]
358
359                 if len(self.arps[port]) == self.config.service_chain_count:
360                     resolved += 1
361                     LOG.info('ARP resolved successfully for port %s', port)
362                     break
363                 else:
364                     failed = [arp.get_record().dst_ip for arp in arps
365                               if arp.get_record().dst_mac is None]
366                     LOG.info('Retrying ARP for: %s (%d / %d)',
367                              failed, attempt, self.config.generic_retry_count)
368                     time.sleep(self.config.generic_poll_sec)
369
370         self.client.set_service_mode(ports=self.port_handle, enabled=False)
371         return resolved == len(self.port_handle)
372
373     def config_interface(self):
374         pass
375
376     def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
377         """Check if rate provided by user is above requirements. Applies only if latency is True."""
378         intf_speed = self.config.generator_config.intf_speed
379         if latency:
380             if bidirectional:
381                 mult = 2
382                 total_rate = 0
383                 for rate in rates:
384                     r = utils.convert_rates(l2frame_size, rate, intf_speed)
385                     total_rate += int(r['rate_pps'])
386             else:
387                 mult = 1
388                 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
389             # rate must be enough for latency stream and at least 1 pps for base stream per chain
390             required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
391             result = utils.convert_rates(l2frame_size,
392                                          {'rate_pps': required_rate},
393                                          intf_speed * mult)
394             result['result'] = total_rate >= required_rate
395             return result
396
397         return {'result': True}
398
399     def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
400         r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
401         if not r['result']:
402             raise TrafficGeneratorException(
403                 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
404                 .format(pps=r['rate_pps'],
405                         bps=r['rate_bps'],
406                         load=r['rate_percent']))
407
408         stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
409                        for d in self.config.generator_config.devices]
410         self.rates = [utils.to_rate_str(rate) for rate in rates]
411
412         for ph in self.port_handle:
413             # generate one pg_id for each direction
414             self.stream_ids[ph] = self.id.next()
415
416         for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
417             if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
418                 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
419                 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
420
421             self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
422                                                              fwd_stream_cfg,
423                                                              l2frame_size,
424                                                              latency=latency))
425             if len(self.rates) > 1:
426                 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
427                                                                  rev_stream_cfg,
428                                                                  l2frame_size,
429                                                                  isg=10.0,
430                                                                  latency=bidirectional and latency))
431
432         for ph in self.port_handle:
433             self.client.add_streams(self.streamblock[ph], ports=ph)
434             LOG.info('Created traffic stream for port %s.', ph)
435
436     def clear_streamblock(self):
437         self.streamblock = defaultdict(list)
438         self.latencies = defaultdict(list)
439         self.stream_ids = defaultdict(list)
440         self.rates = []
441         self.client.reset(self.port_handle)
442         LOG.info('Cleared all existing streams.')
443
444     def get_stats(self):
445         stats = self.client.get_pgid_stats()
446         return self.extract_stats(stats)
447
448     def get_macs(self):
449         return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
450
451     def clear_stats(self):
452         if self.port_handle:
453             self.client.clear_stats()
454
455     def start_traffic(self):
456         for port, rate in zip(self.port_handle, self.rates):
457             self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
458
459     def stop_traffic(self):
460         self.client.stop(ports=self.port_handle)
461
462     def start_capture(self):
463         if self.capture_id:
464             self.stop_capture()
465         self.client.set_service_mode(ports=self.port_handle)
466         self.capture_id = self.client.start_capture(rx_ports=self.port_handle)
467
468     def fetch_capture_packets(self):
469         if self.capture_id:
470             self.packet_list = []
471             self.client.fetch_capture_packets(capture_id=self.capture_id['id'],
472                                               output=self.packet_list)
473
474     def stop_capture(self):
475         if self.capture_id:
476             self.client.stop_capture(capture_id=self.capture_id['id'])
477             self.capture_id = None
478             self.client.set_service_mode(ports=self.port_handle, enabled=False)
479
480     def cleanup(self):
481         if self.client:
482             try:
483                 self.client.reset(self.port_handle)
484                 self.client.disconnect()
485             except STLError:
486                 # TRex does not like a reset while in disconnected state
487                 pass