1e7c1335996361e5c564c38e6a953fde67733b8b
[nfvbench.git] / nfvbench / traffic_gen / trex.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 import os
16 import random
17 import time
18 import traceback
19
20 from collections import defaultdict
21 from itertools import count
22 from nfvbench.log import LOG
23 from nfvbench.specs import ChainType
24 from nfvbench.traffic_server import TRexTrafficServer
25 from nfvbench.utils import cast_integer
26 from nfvbench.utils import timeout
27 from nfvbench.utils import TimeoutError
28 from traffic_base import AbstractTrafficGenerator
29 from traffic_base import TrafficGeneratorException
30 import traffic_utils as utils
31
32 # pylint: disable=import-error
33 from trex_stl_lib.api import CTRexVmInsFixHwCs
34 from trex_stl_lib.api import Dot1Q
35 from trex_stl_lib.api import Ether
36 from trex_stl_lib.api import IP
37 from trex_stl_lib.api import STLClient
38 from trex_stl_lib.api import STLError
39 from trex_stl_lib.api import STLFlowLatencyStats
40 from trex_stl_lib.api import STLFlowStats
41 from trex_stl_lib.api import STLPktBuilder
42 from trex_stl_lib.api import STLScVmRaw
43 from trex_stl_lib.api import STLStream
44 from trex_stl_lib.api import STLTXCont
45 from trex_stl_lib.api import STLVmFixChecksumHw
46 from trex_stl_lib.api import STLVmFlowVar
47 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
48 from trex_stl_lib.api import STLVmWrFlowVar
49 from trex_stl_lib.api import UDP
50 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
51 # pylint: enable=import-error
52
53
54 class TRex(AbstractTrafficGenerator):
55     LATENCY_PPS = 1000
56
57     def __init__(self, runner):
58         AbstractTrafficGenerator.__init__(self, runner)
59         self.client = None
60         self.id = count()
61         self.latencies = defaultdict(list)
62         self.stream_ids = defaultdict(list)
63         self.port_handle = []
64         self.streamblock = defaultdict(list)
65         self.rates = []
66         self.arps = {}
67
68     def get_version(self):
69         return self.client.get_server_version()
70
71     def extract_stats(self, in_stats):
72         utils.nan_replace(in_stats)
73         LOG.debug(in_stats)
74
75         result = {}
76         for ph in self.port_handle:
77             stats = self.__combine_stats(in_stats, ph)
78             result[ph] = {
79                 'tx': {
80                     'total_pkts': cast_integer(stats['tx_pkts']['total']),
81                     'total_pkt_bytes': cast_integer(stats['tx_bytes']['total']),
82                     'pkt_rate': cast_integer(stats['tx_pps']['total']),
83                     'pkt_bit_rate': cast_integer(stats['tx_bps']['total'])
84                 },
85                 'rx': {
86                     'total_pkts': cast_integer(stats['rx_pkts']['total']),
87                     'total_pkt_bytes': cast_integer(stats['rx_bytes']['total']),
88                     'pkt_rate': cast_integer(stats['rx_pps']['total']),
89                     'pkt_bit_rate': cast_integer(stats['rx_bps']['total']),
90                     'dropped_pkts': cast_integer(
91                         stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
92                 }
93             }
94
95             lat = self.__combine_latencies(in_stats, ph)
96             result[ph]['rx']['max_delay_usec'] = cast_integer(
97                 lat['total_max']) if 'total_max' in lat else float('nan')
98             result[ph]['rx']['min_delay_usec'] = cast_integer(
99                 lat['total_min']) if 'total_min' in lat else float('nan')
100             result[ph]['rx']['avg_delay_usec'] = cast_integer(
101                 lat['average']) if 'average' in lat else float('nan')
102         total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
103         result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
104         return result
105
106     def __combine_stats(self, in_stats, port_handle):
107         """Traverses TRex result dictionary and combines stream stats. Used for combining latency
108         and regular streams together.
109         """
110         result = defaultdict(lambda: defaultdict(float))
111
112         for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
113             record = in_stats['flow_stats'][pg_id]
114             for stat_type, stat_type_values in record.iteritems():
115                 for ph, value in stat_type_values.iteritems():
116                     result[stat_type][ph] += value
117
118         return result
119
120     def __combine_latencies(self, in_stats, port_handle):
121         """Traverses TRex result dictionary and combines chosen latency stats."""
122         if not self.latencies[port_handle]:
123             return {}
124
125         result = defaultdict(float)
126         result['total_min'] = float("inf")
127         for lat_id in self.latencies[port_handle]:
128             lat = in_stats['latency'][lat_id]
129             result['dropped_pkts'] += lat['err_cntrs']['dropped']
130             result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
131             result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
132             result['average'] += lat['latency']['average']
133
134         result['average'] /= len(self.latencies[port_handle])
135
136         return result
137
138     def create_pkt(self, stream_cfg, l2frame_size):
139
140         pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
141
142         if stream_cfg['vlan_tag'] is not None:
143             # 50 = 14 (Ethernet II) + 4 (Vlan tag) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
144             pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
145             payload = 'x' * (max(64, int(l2frame_size)) - 50)
146         else:
147             # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
148             payload = 'x' * (max(64, int(l2frame_size)) - 46)
149
150         udp_args = {}
151         if stream_cfg['udp_src_port']:
152             udp_args['sport'] = int(stream_cfg['udp_src_port'])
153         if stream_cfg['udp_dst_port']:
154             udp_args['dport'] = int(stream_cfg['udp_dst_port'])
155         pkt_base /= IP() / UDP(**udp_args)
156
157         if stream_cfg['ip_addrs_step'] == 'random':
158             src_fv = STLVmFlowVarRepetableRandom(
159                 name="ip_src",
160                 min_value=stream_cfg['ip_src_addr'],
161                 max_value=stream_cfg['ip_src_addr_max'],
162                 size=4,
163                 seed=random.randint(0, 32767),
164                 limit=stream_cfg['ip_src_count'])
165             dst_fv = STLVmFlowVarRepetableRandom(
166                 name="ip_dst",
167                 min_value=stream_cfg['ip_dst_addr'],
168                 max_value=stream_cfg['ip_dst_addr_max'],
169                 size=4,
170                 seed=random.randint(0, 32767),
171                 limit=stream_cfg['ip_dst_count'])
172         else:
173             src_fv = STLVmFlowVar(
174                 name="ip_src",
175                 min_value=stream_cfg['ip_src_addr'],
176                 max_value=stream_cfg['ip_src_addr'],
177                 size=4,
178                 op="inc",
179                 step=stream_cfg['ip_addrs_step'])
180             dst_fv = STLVmFlowVar(
181                 name="ip_dst",
182                 min_value=stream_cfg['ip_dst_addr'],
183                 max_value=stream_cfg['ip_dst_addr_max'],
184                 size=4,
185                 op="inc",
186                 step=stream_cfg['ip_addrs_step'])
187
188         vm_param = [
189             src_fv,
190             STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
191             dst_fv,
192             STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
193             STLVmFixChecksumHw(l3_offset="IP",
194                                l4_offset="UDP",
195                                l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
196         ]
197
198         return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
199
200     def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
201         idx_lat = None
202         streams = []
203         if l2frame == 'IMIX':
204             for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
205                 pkt = self.create_pkt(stream_cfg, l2_frame_size)
206                 streams.append(STLStream(packet=pkt,
207                                          isg=0.1 * t,
208                                          flow_stats=STLFlowStats(
209                                              pg_id=self.stream_ids[port_handle]),
210                                          mode=STLTXCont(pps=ratio)))
211
212             if latency:
213                 idx_lat = self.id.next()
214                 sl = STLStream(packet=pkt,
215                                isg=isg,
216                                flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
217                                mode=STLTXCont(pps=self.LATENCY_PPS))
218                 streams.append(sl)
219         else:
220             pkt = self.create_pkt(stream_cfg, l2frame)
221             streams.append(STLStream(packet=pkt,
222                                      flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
223                                      mode=STLTXCont()))
224
225             if latency:
226                 idx_lat = self.id.next()
227                 streams.append(STLStream(packet=pkt,
228                                          flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
229                                          mode=STLTXCont(pps=self.LATENCY_PPS)))
230
231         if latency:
232             self.latencies[port_handle].append(idx_lat)
233
234         return streams
235
236     def init(self):
237         pass
238
239     @timeout(5)
240     def __connect(self, client):
241         client.connect()
242
243     def __connect_after_start(self):
244         # after start, Trex may take a bit of time to initialize
245         # so we need to retry a few times
246         for it in xrange(self.config.generic_retry_count):
247             try:
248                 time.sleep(1)
249                 self.client.connect()
250                 break
251             except Exception as ex:
252                 if it == (self.config.generic_retry_count - 1):
253                     raise ex
254                 LOG.info("Retrying connection to TRex (%s)...", ex.message)
255
256     def connect(self):
257         LOG.info("Connecting to TRex...")
258         server_ip = self.config.generator_config.ip
259
260         # Connect to TRex server
261         self.client = STLClient(server=server_ip)
262         try:
263             self.__connect(self.client)
264         except (TimeoutError, STLError) as e:
265             if server_ip == '127.0.0.1':
266                 try:
267                     self.__start_server()
268                     self.__connect_after_start()
269                 except (TimeoutError, STLError) as e:
270                     LOG.error('Cannot connect to TRex')
271                     LOG.error(traceback.format_exc())
272                     logpath = '/tmp/trex.log'
273                     if os.path.isfile(logpath):
274                         # Wait for TRex to finish writing error message
275                         last_size = 0
276                         for _ in xrange(self.config.generic_retry_count):
277                             size = os.path.getsize(logpath)
278                             if size == last_size:
279                                 # probably not writing anymore
280                                 break
281                             last_size = size
282                             time.sleep(1)
283                         with open(logpath, 'r') as f:
284                             message = f.read()
285                     else:
286                         message = e.message
287                     raise TrafficGeneratorException(message)
288             else:
289                 raise TrafficGeneratorException(e.message)
290
291         ports = list(self.config.generator_config.ports)
292         self.port_handle = ports
293         # Prepare the ports
294         self.client.reset(ports)
295
296     def set_mode(self):
297         if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
298             self.__set_l3_mode()
299         else:
300             self.__set_l2_mode()
301
302     def __set_l3_mode(self):
303         self.client.set_service_mode(ports=self.port_handle, enabled=True)
304         for port, device in zip(self.port_handle, self.config.generator_config.devices):
305             try:
306                 self.client.set_l3_mode(port=port,
307                                         src_ipv4=device.tg_gateway_ip,
308                                         dst_ipv4=device.dst.gateway_ip,
309                                         vlan=device.vlan_tag if device.vlan_tagging else None)
310             except STLError:
311                 # TRex tries to resolve ARP already, doesn't have to be successful yet
312                 continue
313         self.client.set_service_mode(ports=self.port_handle, enabled=False)
314
315     def __set_l2_mode(self):
316         self.client.set_service_mode(ports=self.port_handle, enabled=True)
317         for port, device in zip(self.port_handle, self.config.generator_config.devices):
318             for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
319                 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
320         self.client.set_service_mode(ports=self.port_handle, enabled=False)
321
322     def __start_server(self):
323         server = TRexTrafficServer()
324         server.run_server(self.config.generator_config)
325
326     def resolve_arp(self):
327         self.client.set_service_mode(ports=self.port_handle)
328         LOG.info('Polling ARP until successful')
329         resolved = 0
330         attempt = 0
331         for port, device in zip(self.port_handle, self.config.generator_config.devices):
332             ctx = self.client.create_service_ctx(port=port)
333
334             arps = [
335                 STLServiceARP(ctx,
336                               src_ip=cfg['ip_src_tg_gw'],
337                               dst_ip=cfg['mac_discovery_gw'],
338                               vlan=device.vlan_tag if device.vlan_tagging else None)
339                 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
340             ]
341
342             for _ in xrange(self.config.generic_retry_count):
343                 attempt += 1
344                 try:
345                     ctx.run(arps)
346                 except STLError:
347                     LOG.error(traceback.format_exc())
348                     continue
349
350                 self.arps[port] = [arp.get_record().dst_mac for arp in arps
351                                    if arp.get_record().dst_mac is not None]
352
353                 if len(self.arps[port]) == self.config.service_chain_count:
354                     resolved += 1
355                     LOG.info('ARP resolved successfully for port %s', port)
356                     break
357                 else:
358                     failed = [arp.get_record().dst_ip for arp in arps
359                               if arp.get_record().dst_mac is None]
360                     LOG.info('Retrying ARP for: %s (%d / %d)',
361                              failed, attempt, self.config.generic_retry_count)
362                     time.sleep(self.config.generic_poll_sec)
363
364         self.client.set_service_mode(ports=self.port_handle, enabled=False)
365         return resolved == len(self.port_handle)
366
367     def config_interface(self):
368         pass
369
370     def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
371         """Check if rate provided by user is above requirements. Applies only if latency is True."""
372         intf_speed = self.config.generator_config.intf_speed
373         if latency:
374             if bidirectional:
375                 mult = 2
376                 total_rate = 0
377                 for rate in rates:
378                     r = utils.convert_rates(l2frame_size, rate, intf_speed)
379                     total_rate += int(r['rate_pps'])
380             else:
381                 mult = 1
382                 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
383             # rate must be enough for latency stream and at least 1 pps for base stream per chain
384             required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
385             result = utils.convert_rates(l2frame_size,
386                                          {'rate_pps': required_rate},
387                                          intf_speed * mult)
388             result['result'] = total_rate >= required_rate
389             return result
390
391         return {'result': True}
392
393     def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
394         r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
395         if not r['result']:
396             raise TrafficGeneratorException(
397                 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
398                 .format(pps=r['rate_pps'],
399                         bps=r['rate_bps'],
400                         load=r['rate_percent']))
401
402         stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
403                        for d in self.config.generator_config.devices]
404         self.rates = [utils.to_rate_str(rate) for rate in rates]
405
406         for ph in self.port_handle:
407             # generate one pg_id for each direction
408             self.stream_ids[ph] = self.id.next()
409
410         for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
411             if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
412                 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
413                 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
414
415             self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
416                                                              fwd_stream_cfg,
417                                                              l2frame_size,
418                                                              latency=latency))
419             if len(self.rates) > 1:
420                 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
421                                                                  rev_stream_cfg,
422                                                                  l2frame_size,
423                                                                  isg=10.0,
424                                                                  latency=bidirectional and latency))
425
426         for ph in self.port_handle:
427             self.client.add_streams(self.streamblock[ph], ports=ph)
428             LOG.info('Created traffic stream for port %s.', ph)
429
430     def clear_streamblock(self):
431         self.streamblock = defaultdict(list)
432         self.latencies = defaultdict(list)
433         self.stream_ids = defaultdict(list)
434         self.rates = []
435         self.client.reset(self.port_handle)
436         LOG.info('Cleared all existing streams.')
437
438     def get_stats(self):
439         stats = self.client.get_pgid_stats()
440         return self.extract_stats(stats)
441
442     def get_macs(self):
443         return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
444
445     def clear_stats(self):
446         if self.port_handle:
447             self.client.clear_stats()
448
449     def start_traffic(self):
450         for port, rate in zip(self.port_handle, self.rates):
451             self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
452
453     def stop_traffic(self):
454         self.client.stop(ports=self.port_handle)
455
456     def cleanup(self):
457         if self.client:
458             try:
459                 self.client.reset(self.port_handle)
460                 self.client.disconnect()
461             except STLError:
462                 # TRex does not like a reset while in disconnected state
463                 pass