[NFVBENCH-72] ARP retry log may cause exception
[nfvbench.git] / nfvbench / traffic_gen / trex.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 import os
16 import random
17 import time
18 import traceback
19
20 from collections import defaultdict
21 from itertools import count
22 from nfvbench.log import LOG
23 from nfvbench.specs import ChainType
24 from nfvbench.traffic_server import TRexTrafficServer
25 from nfvbench.utils import cast_integer
26 from nfvbench.utils import timeout
27 from nfvbench.utils import TimeoutError
28 from traffic_base import AbstractTrafficGenerator
29 from traffic_base import TrafficGeneratorException
30 import traffic_utils as utils
31
32 # pylint: disable=import-error
33 from trex_stl_lib.api import CTRexVmInsFixHwCs
34 from trex_stl_lib.api import Dot1Q
35 from trex_stl_lib.api import Ether
36 from trex_stl_lib.api import IP
37 from trex_stl_lib.api import STLClient
38 from trex_stl_lib.api import STLError
39 from trex_stl_lib.api import STLFlowLatencyStats
40 from trex_stl_lib.api import STLFlowStats
41 from trex_stl_lib.api import STLPktBuilder
42 from trex_stl_lib.api import STLScVmRaw
43 from trex_stl_lib.api import STLStream
44 from trex_stl_lib.api import STLTXCont
45 from trex_stl_lib.api import STLVmFixChecksumHw
46 from trex_stl_lib.api import STLVmFlowVar
47 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
48 from trex_stl_lib.api import STLVmWrFlowVar
49 from trex_stl_lib.api import UDP
50 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
51 # pylint: enable=import-error
52
53
54 class TRex(AbstractTrafficGenerator):
55     LATENCY_PPS = 1000
56
57     def __init__(self, runner):
58         AbstractTrafficGenerator.__init__(self, runner)
59         self.client = None
60         self.id = count()
61         self.latencies = defaultdict(list)
62         self.stream_ids = defaultdict(list)
63         self.port_handle = []
64         self.streamblock = defaultdict(list)
65         self.rates = []
66         self.arps = {}
67
68     def get_version(self):
69         return self.client.get_server_version()
70
71     def extract_stats(self, in_stats):
72         utils.nan_replace(in_stats)
73         LOG.debug(in_stats)
74
75         result = {}
76         for ph in self.port_handle:
77             stats = self.__combine_stats(in_stats, ph)
78             result[ph] = {
79                 'tx': {
80                     'total_pkts': cast_integer(stats['tx_pkts']['total']),
81                     'total_pkt_bytes': cast_integer(stats['tx_bytes']['total']),
82                     'pkt_rate': cast_integer(stats['tx_pps']['total']),
83                     'pkt_bit_rate': cast_integer(stats['tx_bps']['total'])
84                 },
85                 'rx': {
86                     'total_pkts': cast_integer(stats['rx_pkts']['total']),
87                     'total_pkt_bytes': cast_integer(stats['rx_bytes']['total']),
88                     'pkt_rate': cast_integer(stats['rx_pps']['total']),
89                     'pkt_bit_rate': cast_integer(stats['rx_bps']['total']),
90                     'dropped_pkts': cast_integer(
91                         stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
92                 }
93             }
94
95             lat = self.__combine_latencies(in_stats, ph)
96             result[ph]['rx']['max_delay_usec'] = cast_integer(
97                 lat['total_max']) if 'total_max' in lat else float('nan')
98             result[ph]['rx']['min_delay_usec'] = cast_integer(
99                 lat['total_min']) if 'total_min' in lat else float('nan')
100             result[ph]['rx']['avg_delay_usec'] = cast_integer(
101                 lat['average']) if 'average' in lat else float('nan')
102         total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
103         result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
104         return result
105
106     def __combine_stats(self, in_stats, port_handle):
107         """Traverses TRex result dictionary and combines stream stats. Used for combining latency
108         and regular streams together.
109         """
110         result = defaultdict(lambda: defaultdict(float))
111
112         for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
113             record = in_stats['flow_stats'][pg_id]
114             for stat_type, stat_type_values in record.iteritems():
115                 for ph, value in stat_type_values.iteritems():
116                     result[stat_type][ph] += value
117
118         return result
119
120     def __combine_latencies(self, in_stats, port_handle):
121         """Traverses TRex result dictionary and combines chosen latency stats."""
122         if not self.latencies[port_handle]:
123             return {}
124
125         result = defaultdict(float)
126         result['total_min'] = float("inf")
127         for lat_id in self.latencies[port_handle]:
128             lat = in_stats['latency'][lat_id]
129             result['dropped_pkts'] += lat['err_cntrs']['dropped']
130             result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
131             result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
132             result['average'] += lat['latency']['average']
133
134         result['average'] /= len(self.latencies[port_handle])
135
136         return result
137
138     def create_pkt(self, stream_cfg, l2frame_size):
139         # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
140         payload = 'x' * (max(64, int(l2frame_size)) - 46)
141
142         pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
143
144         if stream_cfg['vlan_tag'] is not None:
145             pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
146
147         udp_args = {}
148         if stream_cfg['udp_src_port']:
149             udp_args['sport'] = int(stream_cfg['udp_src_port'])
150         if stream_cfg['udp_dst_port']:
151             udp_args['dport'] = int(stream_cfg['udp_dst_port'])
152         pkt_base /= IP() / UDP(**udp_args)
153
154         if stream_cfg['ip_addrs_step'] == 'random':
155             src_fv = STLVmFlowVarRepetableRandom(
156                 name="ip_src",
157                 min_value=stream_cfg['ip_src_addr'],
158                 max_value=stream_cfg['ip_src_addr_max'],
159                 size=4,
160                 seed=random.randint(0, 32767),
161                 limit=stream_cfg['ip_src_count'])
162             dst_fv = STLVmFlowVarRepetableRandom(
163                 name="ip_dst",
164                 min_value=stream_cfg['ip_dst_addr'],
165                 max_value=stream_cfg['ip_dst_addr_max'],
166                 size=4,
167                 seed=random.randint(0, 32767),
168                 limit=stream_cfg['ip_dst_count'])
169         else:
170             src_fv = STLVmFlowVar(
171                 name="ip_src",
172                 min_value=stream_cfg['ip_src_addr'],
173                 max_value=stream_cfg['ip_src_addr'],
174                 size=4,
175                 op="inc",
176                 step=stream_cfg['ip_addrs_step'])
177             dst_fv = STLVmFlowVar(
178                 name="ip_dst",
179                 min_value=stream_cfg['ip_dst_addr'],
180                 max_value=stream_cfg['ip_dst_addr_max'],
181                 size=4,
182                 op="inc",
183                 step=stream_cfg['ip_addrs_step'])
184
185         vm_param = [
186             src_fv,
187             STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
188             dst_fv,
189             STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
190             STLVmFixChecksumHw(l3_offset="IP",
191                                l4_offset="UDP",
192                                l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
193         ]
194
195         return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
196
197     def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
198         idx_lat = None
199         streams = []
200         if l2frame == 'IMIX':
201             for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
202                 pkt = self.create_pkt(stream_cfg, l2_frame_size)
203                 streams.append(STLStream(packet=pkt,
204                                          isg=0.1 * t,
205                                          flow_stats=STLFlowStats(
206                                              pg_id=self.stream_ids[port_handle]),
207                                          mode=STLTXCont(pps=ratio)))
208
209             if latency:
210                 idx_lat = self.id.next()
211                 sl = STLStream(packet=pkt,
212                                isg=isg,
213                                flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
214                                mode=STLTXCont(pps=self.LATENCY_PPS))
215                 streams.append(sl)
216         else:
217             pkt = self.create_pkt(stream_cfg, l2frame)
218             streams.append(STLStream(packet=pkt,
219                                      flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
220                                      mode=STLTXCont()))
221
222             if latency:
223                 idx_lat = self.id.next()
224                 streams.append(STLStream(packet=pkt,
225                                          flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
226                                          mode=STLTXCont(pps=self.LATENCY_PPS)))
227
228         if latency:
229             self.latencies[port_handle].append(idx_lat)
230
231         return streams
232
233     def init(self):
234         pass
235
236     @timeout(5)
237     def __connect(self, client):
238         client.connect()
239
240     def __connect_after_start(self):
241         # after start, Trex may take a bit of time to initialize
242         # so we need to retry a few times
243         for it in xrange(self.config.generic_retry_count):
244             try:
245                 time.sleep(1)
246                 self.client.connect()
247                 break
248             except Exception as ex:
249                 if it == (self.config.generic_retry_count - 1):
250                     raise ex
251                 LOG.info("Retrying connection to TRex (%s)...", ex.message)
252
253     def connect(self):
254         LOG.info("Connecting to TRex...")
255         server_ip = self.config.generator_config.ip
256
257         # Connect to TRex server
258         self.client = STLClient(server=server_ip)
259         try:
260             self.__connect(self.client)
261         except (TimeoutError, STLError) as e:
262             if server_ip == '127.0.0.1':
263                 try:
264                     self.__start_server()
265                     self.__connect_after_start()
266                 except (TimeoutError, STLError) as e:
267                     LOG.error('Cannot connect to TRex')
268                     LOG.error(traceback.format_exc())
269                     logpath = '/tmp/trex.log'
270                     if os.path.isfile(logpath):
271                         # Wait for TRex to finish writing error message
272                         last_size = 0
273                         for _ in xrange(self.config.generic_retry_count):
274                             size = os.path.getsize(logpath)
275                             if size == last_size:
276                                 # probably not writing anymore
277                                 break
278                             last_size = size
279                             time.sleep(1)
280                         with open(logpath, 'r') as f:
281                             message = f.read()
282                     else:
283                         message = e.message
284                     raise TrafficGeneratorException(message)
285             else:
286                 raise TrafficGeneratorException(e.message)
287
288         ports = list(self.config.generator_config.ports)
289         self.port_handle = ports
290         # Prepare the ports
291         self.client.reset(ports)
292
293     def set_mode(self):
294         if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
295             self.__set_l3_mode()
296         else:
297             self.__set_l2_mode()
298
299     def __set_l3_mode(self):
300         self.client.set_service_mode(ports=self.port_handle, enabled=True)
301         for port, device in zip(self.port_handle, self.config.generator_config.devices):
302             try:
303                 self.client.set_l3_mode(port=port,
304                                         src_ipv4=device.tg_gateway_ip,
305                                         dst_ipv4=device.dst.gateway_ip,
306                                         vlan=device.vlan_tag if device.vlan_tagging else None)
307             except STLError:
308                 # TRex tries to resolve ARP already, doesn't have to be successful yet
309                 continue
310         self.client.set_service_mode(ports=self.port_handle, enabled=False)
311
312     def __set_l2_mode(self):
313         self.client.set_service_mode(ports=self.port_handle, enabled=True)
314         for port, device in zip(self.port_handle, self.config.generator_config.devices):
315             for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
316                 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
317         self.client.set_service_mode(ports=self.port_handle, enabled=False)
318
319     def __start_server(self):
320         server = TRexTrafficServer()
321         server.run_server(self.config.generator_config)
322
323     def resolve_arp(self):
324         self.client.set_service_mode(ports=self.port_handle)
325         LOG.info('Polling ARP until successful')
326         resolved = 0
327         attempt = 0
328         for port, device in zip(self.port_handle, self.config.generator_config.devices):
329             ctx = self.client.create_service_ctx(port=port)
330
331             arps = [
332                 STLServiceARP(ctx,
333                               src_ip=cfg['ip_src_tg_gw'],
334                               dst_ip=cfg['mac_discovery_gw'],
335                               vlan=device.vlan_tag if device.vlan_tagging else None)
336                 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
337             ]
338
339             for _ in xrange(self.config.generic_retry_count):
340                 attempt += 1
341                 try:
342                     ctx.run(arps)
343                 except STLError:
344                     LOG.error(traceback.format_exc())
345                     continue
346
347                 self.arps[port] = [arp.get_record().dst_mac for arp in arps
348                                    if arp.get_record().dst_mac is not None]
349
350                 if len(self.arps[port]) == self.config.service_chain_count:
351                     resolved += 1
352                     LOG.info('ARP resolved successfully for port %s', port)
353                     break
354                 else:
355                     failed = [arp.get_record().dst_ip for arp in arps
356                               if arp.get_record().dst_mac is None]
357                     LOG.info('Retrying ARP for: %s (%d / %d)',
358                              failed, attempt, self.config.generic_retry_count)
359                     time.sleep(self.config.generic_poll_sec)
360
361         self.client.set_service_mode(ports=self.port_handle, enabled=False)
362         return resolved == len(self.port_handle)
363
364     def config_interface(self):
365         pass
366
367     def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
368         """Check if rate provided by user is above requirements. Applies only if latency is True."""
369         intf_speed = self.config.generator_config.intf_speed
370         if latency:
371             if bidirectional:
372                 mult = 2
373                 total_rate = 0
374                 for rate in rates:
375                     r = utils.convert_rates(l2frame_size, rate, intf_speed)
376                     total_rate += int(r['rate_pps'])
377             else:
378                 mult = 1
379                 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
380             # rate must be enough for latency stream and at least 1 pps for base stream per chain
381             required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
382             result = utils.convert_rates(l2frame_size,
383                                          {'rate_pps': required_rate},
384                                          intf_speed * mult)
385             result['result'] = total_rate >= required_rate
386             return result
387
388         return {'result': True}
389
390     def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
391         r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
392         if not r['result']:
393             raise TrafficGeneratorException(
394                 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
395                 .format(pps=r['rate_pps'],
396                         bps=r['rate_bps'],
397                         load=r['rate_percent']))
398
399         stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
400                        for d in self.config.generator_config.devices]
401         self.rates = [utils.to_rate_str(rate) for rate in rates]
402
403         for ph in self.port_handle:
404             # generate one pg_id for each direction
405             self.stream_ids[ph] = self.id.next()
406
407         for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
408             if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
409                 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
410                 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
411
412             self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
413                                                              fwd_stream_cfg,
414                                                              l2frame_size,
415                                                              latency=latency))
416             if len(self.rates) > 1:
417                 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
418                                                                  rev_stream_cfg,
419                                                                  l2frame_size,
420                                                                  isg=10.0,
421                                                                  latency=bidirectional and latency))
422
423         for ph in self.port_handle:
424             self.client.add_streams(self.streamblock[ph], ports=ph)
425             LOG.info('Created traffic stream for port %s.', ph)
426
427     def clear_streamblock(self):
428         self.streamblock = defaultdict(list)
429         self.latencies = defaultdict(list)
430         self.stream_ids = defaultdict(list)
431         self.rates = []
432         self.client.reset(self.port_handle)
433         LOG.info('Cleared all existing streams.')
434
435     def get_stats(self):
436         stats = self.client.get_pgid_stats()
437         return self.extract_stats(stats)
438
439     def get_macs(self):
440         return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
441
442     def clear_stats(self):
443         if self.port_handle:
444             self.client.clear_stats()
445
446     def start_traffic(self):
447         for port, rate in zip(self.port_handle, self.rates):
448             self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
449
450     def stop_traffic(self):
451         self.client.stop(ports=self.port_handle)
452
453     def cleanup(self):
454         if self.client:
455             try:
456                 self.client.reset(self.port_handle)
457                 self.client.disconnect()
458             except STLError:
459                 # TRex does not like a reset while in disconnected state
460                 pass