498ff50a60b73f3d0771ec6d0864868c28df5c32
[nfvbench.git] / nfvbench / traffic_gen / trex.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 from collections import defaultdict
16 from itertools import count
17 from nfvbench.log import LOG
18 from nfvbench.specs import ChainType
19 from nfvbench.traffic_server import TRexTrafficServer
20 from nfvbench.utils import cast_integer
21 from nfvbench.utils import timeout
22 from nfvbench.utils import TimeoutError
23 import os
24 import random
25 import time
26 import traceback
27 from traffic_base import AbstractTrafficGenerator
28 from traffic_base import TrafficGeneratorException
29 import traffic_utils as utils
30
31 from trex_stl_lib.api import CTRexVmInsFixHwCs
32 from trex_stl_lib.api import Dot1Q
33 from trex_stl_lib.api import Ether
34 from trex_stl_lib.api import IP
35 from trex_stl_lib.api import STLClient
36 from trex_stl_lib.api import STLError
37 from trex_stl_lib.api import STLFlowLatencyStats
38 from trex_stl_lib.api import STLFlowStats
39 from trex_stl_lib.api import STLPktBuilder
40 from trex_stl_lib.api import STLScVmRaw
41 from trex_stl_lib.api import STLStream
42 from trex_stl_lib.api import STLTXCont
43 from trex_stl_lib.api import STLVmFixChecksumHw
44 from trex_stl_lib.api import STLVmFlowVar
45 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
46 from trex_stl_lib.api import STLVmWrFlowVar
47 from trex_stl_lib.api import UDP
48 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
49
50
51 class TRex(AbstractTrafficGenerator):
52     LATENCY_PPS = 1000
53
54     def __init__(self, runner):
55         AbstractTrafficGenerator.__init__(self, runner)
56         self.client = None
57         self.id = count()
58         self.latencies = defaultdict(list)
59         self.stream_ids = defaultdict(list)
60         self.port_handle = []
61         self.streamblock = defaultdict(list)
62         self.rates = []
63         self.arps = {}
64
65     def get_version(self):
66         return self.client.get_server_version()
67
68     def extract_stats(self, in_stats):
69         utils.nan_replace(in_stats)
70         LOG.debug(in_stats)
71
72         result = {}
73         for ph in self.port_handle:
74             stats = self.__combine_stats(in_stats, ph)
75             result[ph] = {
76                 'tx': {
77                     'total_pkts': cast_integer(stats['tx_pkts']['total']),
78                     'total_pkt_bytes': cast_integer(stats['tx_bytes']['total']),
79                     'pkt_rate': cast_integer(stats['tx_pps']['total']),
80                     'pkt_bit_rate': cast_integer(stats['tx_bps']['total'])
81                 },
82                 'rx': {
83                     'total_pkts': cast_integer(stats['rx_pkts']['total']),
84                     'total_pkt_bytes': cast_integer(stats['rx_bytes']['total']),
85                     'pkt_rate': cast_integer(stats['rx_pps']['total']),
86                     'pkt_bit_rate': cast_integer(stats['rx_bps']['total']),
87                     'dropped_pkts': cast_integer(
88                         stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
89                 }
90             }
91
92             lat = self.__combine_latencies(in_stats, ph)
93             result[ph]['rx']['max_delay_usec'] = cast_integer(
94                 lat['total_max']) if 'total_max' in lat else float('nan')
95             result[ph]['rx']['min_delay_usec'] = cast_integer(
96                 lat['total_min']) if 'total_min' in lat else float('nan')
97             result[ph]['rx']['avg_delay_usec'] = cast_integer(
98                 lat['average']) if 'average' in lat else float(
99                 'nan')
100         total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
101         result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
102         return result
103
104     def __combine_stats(self, in_stats, port_handle):
105         """Traverses TRex result dictionary and combines stream stats. Used for combining latency
106         and regular streams together.
107         """
108         result = defaultdict(lambda: defaultdict(float))
109
110         for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
111             record = in_stats['flow_stats'][pg_id]
112             for stat_type, stat_type_values in record.iteritems():
113                 for ph, value in stat_type_values.iteritems():
114                     result[stat_type][ph] += value
115
116         return result
117
118     def __combine_latencies(self, in_stats, port_handle):
119         """Traverses TRex result dictionary and combines chosen latency stats."""
120         if not len(self.latencies[port_handle]):
121             return {}
122
123         result = defaultdict(float)
124         result['total_min'] = float("inf")
125         for lat_id in self.latencies[port_handle]:
126             lat = in_stats['latency'][lat_id]
127             result['dropped_pkts'] += lat['err_cntrs']['dropped']
128             result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
129             result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
130             result['average'] += lat['latency']['average']
131
132         result['average'] /= len(self.latencies[port_handle])
133
134         return result
135
136     def create_pkt(self, stream_cfg, l2frame_size):
137         # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
138         payload = 'x' * (max(64, int(l2frame_size)) - 46)
139
140         pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
141
142         if stream_cfg['vlan_tag'] is not None:
143             pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
144
145         udp_args = {}
146         if stream_cfg['udp_src_port']:
147             udp_args['sport'] = int(stream_cfg['udp_src_port'])
148         if stream_cfg['udp_dst_port']:
149             udp_args['dport'] = int(stream_cfg['udp_dst_port'])
150         pkt_base /= IP() / UDP(**udp_args)
151
152         if stream_cfg['ip_addrs_step'] == 'random':
153             src_fv = STLVmFlowVarRepetableRandom(
154                 name="ip_src",
155                 min_value=stream_cfg['ip_src_addr'],
156                 max_value=stream_cfg['ip_src_addr_max'],
157                 size=4,
158                 seed=random.randint(0, 32767),
159                 limit=stream_cfg['ip_src_count'])
160             dst_fv = STLVmFlowVarRepetableRandom(
161                 name="ip_dst",
162                 min_value=stream_cfg['ip_dst_addr'],
163                 max_value=stream_cfg['ip_dst_addr_max'],
164                 size=4,
165                 seed=random.randint(0, 32767),
166                 limit=stream_cfg['ip_dst_count'])
167         else:
168             src_fv = STLVmFlowVar(
169                 name="ip_src",
170                 min_value=stream_cfg['ip_src_addr'],
171                 max_value=stream_cfg['ip_src_addr'],
172                 size=4,
173                 op="inc",
174                 step=stream_cfg['ip_addrs_step'])
175             dst_fv = STLVmFlowVar(
176                 name="ip_dst",
177                 min_value=stream_cfg['ip_dst_addr'],
178                 max_value=stream_cfg['ip_dst_addr_max'],
179                 size=4,
180                 op="inc",
181                 step=stream_cfg['ip_addrs_step'])
182
183         vm_param = [
184             src_fv,
185             STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
186             dst_fv,
187             STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
188             STLVmFixChecksumHw(l3_offset="IP",
189                                l4_offset="UDP",
190                                l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
191         ]
192
193         return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
194
195     def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
196         idx_lat = None
197         streams = []
198         if l2frame == 'IMIX':
199             for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
200                 pkt = self.create_pkt(stream_cfg, l2_frame_size)
201                 streams.append(STLStream(packet=pkt,
202                                          isg=0.1 * t,
203                                          flow_stats=STLFlowStats(
204                                              pg_id=self.stream_ids[port_handle]),
205                                          mode=STLTXCont(pps=ratio)))
206
207             if latency:
208                 idx_lat = self.id.next()
209                 sl = STLStream(packet=pkt,
210                                isg=isg,
211                                flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
212                                mode=STLTXCont(pps=self.LATENCY_PPS))
213                 streams.append(sl)
214         else:
215             pkt = self.create_pkt(stream_cfg, l2frame)
216             streams.append(STLStream(packet=pkt,
217                                      flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
218                                      mode=STLTXCont()))
219
220             if latency:
221                 idx_lat = self.id.next()
222                 streams.append(STLStream(packet=pkt,
223                                          flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
224                                          mode=STLTXCont(pps=self.LATENCY_PPS)))
225
226         if latency:
227             self.latencies[port_handle].append(idx_lat)
228
229         return streams
230
231     def init(self):
232         pass
233
234     @timeout(5)
235     def __connect(self, client):
236         client.connect()
237
238     def __connect_after_start(self):
239         # after start, Trex may take a bit of time to initialize
240         # so we need to retry a few times
241         for it in xrange(self.config.generic_retry_count):
242             try:
243                 time.sleep(1)
244                 self.client.connect()
245                 break
246             except Exception as ex:
247                 if it == (self.config.generic_retry_count - 1):
248                     raise ex
249                 LOG.info("Retrying connection to TRex (%s)...", ex.message)
250
251     def connect(self):
252         LOG.info("Connecting to TRex...")
253         server_ip = self.config.generator_config.ip
254
255         # Connect to TRex server
256         self.client = STLClient(server=server_ip)
257         try:
258             self.__connect(self.client)
259         except (TimeoutError, STLError) as e:
260             if server_ip == '127.0.0.1':
261                 try:
262                     self.__start_server()
263                     self.__connect_after_start()
264                 except (TimeoutError, STLError) as e:
265                     LOG.error('Cannot connect to TRex')
266                     LOG.error(traceback.format_exc())
267                     logpath = '/tmp/trex.log'
268                     if os.path.isfile(logpath):
269                         # Wait for TRex to finish writing error message
270                         last_size = 0
271                         for it in xrange(self.config.generic_retry_count):
272                             size = os.path.getsize(logpath)
273                             if size == last_size:
274                                 # probably not writing anymore
275                                 break
276                             last_size = size
277                             time.sleep(1)
278                         with open(logpath, 'r') as f:
279                             message = f.read()
280                     else:
281                         message = e.message
282                     raise TrafficGeneratorException(message)
283             else:
284                 raise TrafficGeneratorException(e.message)
285
286         ports = list(self.config.generator_config.ports)
287         self.port_handle = ports
288         # Prepare the ports
289         self.client.reset(ports)
290
291     def set_mode(self):
292         if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
293             self.__set_l3_mode()
294         else:
295             self.__set_l2_mode()
296
297     def __set_l3_mode(self):
298         self.client.set_service_mode(ports=self.port_handle, enabled=True)
299         for port, device in zip(self.port_handle, self.config.generator_config.devices):
300             try:
301                 self.client.set_l3_mode(port=port,
302                                         src_ipv4=device.tg_gateway_ip,
303                                         dst_ipv4=device.dst.gateway_ip,
304                                         vlan=device.vlan_tag if device.vlan_tagging else None)
305             except STLError:
306                 # TRex tries to resolve ARP already, doesn't have to be successful yet
307                 continue
308         self.client.set_service_mode(ports=self.port_handle, enabled=False)
309
310     def __set_l2_mode(self):
311         self.client.set_service_mode(ports=self.port_handle, enabled=True)
312         for port, device in zip(self.port_handle, self.config.generator_config.devices):
313             for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
314                 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
315         self.client.set_service_mode(ports=self.port_handle, enabled=False)
316
317     def __start_server(self):
318         server = TRexTrafficServer()
319         server.run_server(self.config.generator_config)
320
321     def resolve_arp(self):
322         self.client.set_service_mode(ports=self.port_handle)
323         LOG.info('Polling ARP until successful')
324         resolved = 0
325         attempt = 0
326         for port, device in zip(self.port_handle, self.config.generator_config.devices):
327             ctx = self.client.create_service_ctx(port=port)
328
329             arps = [
330                 STLServiceARP(ctx,
331                               src_ip=cfg['ip_src_tg_gw'],
332                               dst_ip=cfg['mac_discovery_gw'],
333                               vlan=device.vlan_tag if device.vlan_tagging else None)
334                 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
335             ]
336
337             for _ in xrange(self.config.generic_retry_count):
338                 attempt += 1
339                 try:
340                     ctx.run(arps)
341                 except STLError:
342                     LOG.error(traceback.format_exc())
343                     continue
344
345                 self.arps[port] = [arp.get_record().dst_mac for arp in arps
346                                    if arp.get_record().dst_mac is not None]
347
348                 if len(self.arps[port]) == self.config.service_chain_count:
349                     resolved += 1
350                     LOG.info('ARP resolved successfully for port {}'.format(port))
351                     break
352                 else:
353                     failed = [arp.get_record().dst_ip for arp in arps
354                               if arp.get_record().dst_mac is None]
355                     LOG.info('Retrying ARP for: {} ({} / {})'.format(
356                         failed, attempt, self.config.generic_retry_count))
357                     time.sleep(self.config.generic_poll_sec)
358
359         self.client.set_service_mode(ports=self.port_handle, enabled=False)
360         return resolved == len(self.port_handle)
361
362     def config_interface(self):
363         pass
364
365     def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
366         """Check if rate provided by user is above requirements. Applies only if latency is True."""
367         intf_speed = self.config.generator_config.intf_speed
368         if latency:
369             if bidirectional:
370                 mult = 2
371                 total_rate = 0
372                 for rate in rates:
373                     r = utils.convert_rates(l2frame_size, rate, intf_speed)
374                     total_rate += int(r['rate_pps'])
375             else:
376                 mult = 1
377                 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
378             # rate must be enough for latency stream and at least 1 pps for base stream per chain
379             required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
380             result = utils.convert_rates(l2frame_size,
381                                          {'rate_pps': required_rate},
382                                          intf_speed * mult)
383             result['result'] = total_rate >= required_rate
384             return result
385
386         return {'result': True}
387
388     def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
389         r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
390         if not r['result']:
391             raise TrafficGeneratorException(
392                 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
393                 .format(pps=r['rate_pps'],
394                         bps=r['rate_bps'],
395                         load=r['rate_percent']))
396
397         stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
398                        for d in self.config.generator_config.devices]
399         self.rates = map(lambda rate: utils.to_rate_str(rate), rates)
400
401         for ph in self.port_handle:
402             # generate one pg_id for each direction
403             self.stream_ids[ph] = self.id.next()
404
405         for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
406             if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
407                 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
408                 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
409
410             self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
411                                                              fwd_stream_cfg,
412                                                              l2frame_size,
413                                                              latency=latency))
414             if len(self.rates) > 1:
415                 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
416                                                                  rev_stream_cfg,
417                                                                  l2frame_size,
418                                                                  isg=10.0,
419                                                                  latency=bidirectional and latency))
420
421         for ph in self.port_handle:
422             self.client.add_streams(self.streamblock[ph], ports=ph)
423             LOG.info('Created traffic stream for port %s.' % ph)
424
425     def modify_rate(self, rate, reverse):
426         port_index = int(reverse)
427         port = self.port_handle[port_index]
428         self.rates[port_index] = utils.to_rate_str(rate)
429         LOG.info('Modified traffic stream for %s, new rate=%s.' % (port, utils.to_rate_str(rate)))
430
431     def clear_streamblock(self):
432         self.streamblock = defaultdict(list)
433         self.latencies = defaultdict(list)
434         self.stream_ids = defaultdict(list)
435         self.rates = []
436         self.client.reset(self.port_handle)
437         LOG.info('Cleared all existing streams.')
438
439     def get_stats(self):
440         stats = self.client.get_pgid_stats()
441         return self.extract_stats(stats)
442
443     def get_macs(self):
444         return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
445
446     def clear_stats(self):
447         if self.port_handle:
448             self.client.clear_stats()
449
450     def start_traffic(self):
451         for port, rate in zip(self.port_handle, self.rates):
452             self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
453
454     def stop_traffic(self):
455         self.client.stop(ports=self.port_handle)
456
457     def cleanup(self):
458         if self.client:
459             try:
460                 self.client.reset(self.port_handle)
461                 self.client.disconnect()
462             except STLError:
463                 # TRex does not like a reset while in disconnected state
464                 pass