Merge "NFVBENCH-25 Send run results to fluentd"
[nfvbench.git] / nfvbench / traffic_gen / trex.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14
15 from collections import defaultdict
16 from itertools import count
17 from nfvbench.log import LOG
18 from nfvbench.specs import ChainType
19 from nfvbench.traffic_server import TRexTrafficServer
20 from nfvbench.utils import timeout
21 from nfvbench.utils import TimeoutError
22 import os
23 import random
24 import time
25 import traceback
26 from traffic_base import AbstractTrafficGenerator
27 from traffic_base import TrafficGeneratorException
28 import traffic_utils as utils
29
30 from trex_stl_lib.api import CTRexVmInsFixHwCs
31 from trex_stl_lib.api import Dot1Q
32 from trex_stl_lib.api import Ether
33 from trex_stl_lib.api import IP
34 from trex_stl_lib.api import STLClient
35 from trex_stl_lib.api import STLError
36 from trex_stl_lib.api import STLFlowLatencyStats
37 from trex_stl_lib.api import STLFlowStats
38 from trex_stl_lib.api import STLPktBuilder
39 from trex_stl_lib.api import STLScVmRaw
40 from trex_stl_lib.api import STLStream
41 from trex_stl_lib.api import STLTXCont
42 from trex_stl_lib.api import STLVmFixChecksumHw
43 from trex_stl_lib.api import STLVmFlowVar
44 from trex_stl_lib.api import STLVmFlowVarRepetableRandom
45 from trex_stl_lib.api import STLVmWrFlowVar
46 from trex_stl_lib.api import UDP
47 from trex_stl_lib.services.trex_stl_service_arp import STLServiceARP
48
49
50 class TRex(AbstractTrafficGenerator):
51     LATENCY_PPS = 1000
52
53     def __init__(self, runner):
54         AbstractTrafficGenerator.__init__(self, runner)
55         self.client = None
56         self.id = count()
57         self.latencies = defaultdict(list)
58         self.stream_ids = defaultdict(list)
59         self.port_handle = []
60         self.streamblock = defaultdict(list)
61         self.rates = []
62         self.arps = {}
63
64     def get_version(self):
65         return self.client.get_server_version()
66
67     def extract_stats(self, in_stats):
68         utils.nan_replace(in_stats)
69         LOG.debug(in_stats)
70
71         result = {}
72         for ph in self.port_handle:
73             stats = self.__combine_stats(in_stats, ph)
74             result[ph] = {
75                 'tx': {
76                     'total_pkts': int(stats['tx_pkts']['total']),
77                     'total_pkt_bytes': int(stats['tx_bytes']['total']),
78                     'pkt_rate': int(stats['tx_pps']['total']),
79                     'pkt_bit_rate': int(stats['tx_bps']['total'])
80                 },
81                 'rx': {
82                     'total_pkts': int(stats['rx_pkts']['total']),
83                     'total_pkt_bytes': int(stats['rx_bytes']['total']),
84                     'pkt_rate': int(stats['rx_pps']['total']),
85                     'pkt_bit_rate': int(stats['rx_bps']['total']),
86                     'dropped_pkts': int(stats['tx_pkts']['total'] - stats['rx_pkts']['total'])
87                 }
88             }
89
90             lat = self.__combine_latencies(in_stats, ph)
91             result[ph]['rx']['max_delay_usec'] = int(
92                 lat['total_max']) if 'total_max' in lat else float('nan')
93             result[ph]['rx']['min_delay_usec'] = int(
94                 lat['total_min']) if 'total_min' in lat else float('nan')
95             result[ph]['rx']['avg_delay_usec'] = int(
96                 lat['average']) if 'average' in lat else float(
97                 'nan')
98         total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
99         result["total_tx_rate"] = int(total_tx_pkts / self.config.duration_sec)
100         return result
101
102     def __combine_stats(self, in_stats, port_handle):
103         """Traverses TRex result dictionary and combines stream stats. Used for combining latency
104         and regular streams together.
105         """
106         result = defaultdict(lambda: defaultdict(float))
107
108         for pg_id in [self.stream_ids[port_handle]] + self.latencies[port_handle]:
109             record = in_stats['flow_stats'][pg_id]
110             for stat_type, stat_type_values in record.iteritems():
111                 for ph, value in stat_type_values.iteritems():
112                     result[stat_type][ph] += value
113
114         return result
115
116     def __combine_latencies(self, in_stats, port_handle):
117         """Traverses TRex result dictionary and combines chosen latency stats."""
118         if not len(self.latencies[port_handle]):
119             return {}
120
121         result = defaultdict(float)
122         result['total_min'] = float("inf")
123         for lat_id in self.latencies[port_handle]:
124             lat = in_stats['latency'][lat_id]
125             result['dropped_pkts'] += lat['err_cntrs']['dropped']
126             result['total_max'] = max(lat['latency']['total_max'], result['total_max'])
127             result['total_min'] = min(lat['latency']['total_min'], result['total_min'])
128             result['average'] += lat['latency']['average']
129
130         result['average'] /= len(self.latencies[port_handle])
131
132         return result
133
134     def create_pkt(self, stream_cfg, l2frame_size):
135         # 46 = 14 (Ethernet II) + 4 (CRC Checksum) + 20 (IPv4) + 8 (UDP)
136         payload = 'x' * (max(64, int(l2frame_size)) - 46)
137
138         pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
139
140         if stream_cfg['vlan_tag'] is not None:
141             pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
142
143         udp_args = {}
144         if stream_cfg['udp_src_port']:
145             udp_args['sport'] = int(stream_cfg['udp_src_port'])
146         if stream_cfg['udp_dst_port']:
147             udp_args['dport'] = int(stream_cfg['udp_dst_port'])
148         pkt_base /= IP() / UDP(**udp_args)
149
150         if stream_cfg['ip_addrs_step'] == 'random':
151             src_fv = STLVmFlowVarRepetableRandom(
152                 name="ip_src",
153                 min_value=stream_cfg['ip_src_addr'],
154                 max_value=stream_cfg['ip_src_addr_max'],
155                 size=4,
156                 seed=random.randint(0, 32767),
157                 limit=stream_cfg['ip_src_count'])
158             dst_fv = STLVmFlowVarRepetableRandom(
159                 name="ip_dst",
160                 min_value=stream_cfg['ip_dst_addr'],
161                 max_value=stream_cfg['ip_dst_addr_max'],
162                 size=4,
163                 seed=random.randint(0, 32767),
164                 limit=stream_cfg['ip_dst_count'])
165         else:
166             src_fv = STLVmFlowVar(
167                 name="ip_src",
168                 min_value=stream_cfg['ip_src_addr'],
169                 max_value=stream_cfg['ip_src_addr'],
170                 size=4,
171                 op="inc",
172                 step=stream_cfg['ip_addrs_step'])
173             dst_fv = STLVmFlowVar(
174                 name="ip_dst",
175                 min_value=stream_cfg['ip_dst_addr'],
176                 max_value=stream_cfg['ip_dst_addr_max'],
177                 size=4,
178                 op="inc",
179                 step=stream_cfg['ip_addrs_step'])
180
181         vm_param = [
182             src_fv,
183             STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP.src"),
184             dst_fv,
185             STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP.dst"),
186             STLVmFixChecksumHw(l3_offset="IP",
187                                l4_offset="UDP",
188                                l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP)
189         ]
190
191         return STLPktBuilder(pkt=pkt_base / payload, vm=STLScVmRaw(vm_param))
192
193     def generate_streams(self, port_handle, stream_cfg, l2frame, isg=0.0, latency=True):
194         idx_lat = None
195         streams = []
196         if l2frame == 'IMIX':
197             for t, (ratio, l2_frame_size) in enumerate(zip(self.imix_ratios, self.imix_l2_sizes)):
198                 pkt = self.create_pkt(stream_cfg, l2_frame_size)
199                 streams.append(STLStream(packet=pkt,
200                                          isg=0.1 * t,
201                                          flow_stats=STLFlowStats(
202                                              pg_id=self.stream_ids[port_handle]),
203                                          mode=STLTXCont(pps=ratio)))
204
205             if latency:
206                 idx_lat = self.id.next()
207                 sl = STLStream(packet=pkt,
208                                isg=isg,
209                                flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
210                                mode=STLTXCont(pps=self.LATENCY_PPS))
211                 streams.append(sl)
212         else:
213             pkt = self.create_pkt(stream_cfg, l2frame)
214             streams.append(STLStream(packet=pkt,
215                                      flow_stats=STLFlowStats(pg_id=self.stream_ids[port_handle]),
216                                      mode=STLTXCont()))
217
218             if latency:
219                 idx_lat = self.id.next()
220                 streams.append(STLStream(packet=pkt,
221                                          flow_stats=STLFlowLatencyStats(pg_id=idx_lat),
222                                          mode=STLTXCont(pps=self.LATENCY_PPS)))
223
224         if latency:
225             self.latencies[port_handle].append(idx_lat)
226
227         return streams
228
229     def init(self):
230         pass
231
232     @timeout(5)
233     def __connect(self, client):
234         client.connect()
235
236     def __connect_after_start(self):
237         # after start, Trex may take a bit of time to initialize
238         # so we need to retry a few times
239         for it in xrange(self.config.generic_retry_count):
240             try:
241                 time.sleep(1)
242                 self.client.connect()
243                 break
244             except Exception as ex:
245                 if it == (self.config.generic_retry_count - 1):
246                     raise ex
247                 LOG.info("Retrying connection to TRex (%s)...", ex.message)
248
249     def connect(self):
250         LOG.info("Connecting to TRex...")
251         server_ip = self.config.generator_config.ip
252
253         # Connect to TRex server
254         self.client = STLClient(server=server_ip)
255         try:
256             self.__connect(self.client)
257         except (TimeoutError, STLError) as e:
258             if server_ip == '127.0.0.1':
259                 try:
260                     self.__start_server()
261                     self.__connect_after_start()
262                 except (TimeoutError, STLError) as e:
263                     LOG.error('Cannot connect to TRex')
264                     LOG.error(traceback.format_exc())
265                     logpath = '/tmp/trex.log'
266                     if os.path.isfile(logpath):
267                         # Wait for TRex to finish writing error message
268                         last_size = 0
269                         for it in xrange(self.config.generic_retry_count):
270                             size = os.path.getsize(logpath)
271                             if size == last_size:
272                                 # probably not writing anymore
273                                 break
274                             last_size = size
275                             time.sleep(1)
276                         with open(logpath, 'r') as f:
277                             message = f.read()
278                     else:
279                         message = e.message
280                     raise TrafficGeneratorException(message)
281             else:
282                 raise TrafficGeneratorException(e.message)
283
284         ports = list(self.config.generator_config.ports)
285         self.port_handle = ports
286         # Prepare the ports
287         self.client.reset(ports)
288
289     def set_mode(self):
290         if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
291             self.__set_l3_mode()
292         else:
293             self.__set_l2_mode()
294
295     def __set_l3_mode(self):
296         self.client.set_service_mode(ports=self.port_handle, enabled=True)
297         for port, device in zip(self.port_handle, self.config.generator_config.devices):
298             try:
299                 self.client.set_l3_mode(port=port,
300                                         src_ipv4=device.tg_gateway_ip,
301                                         dst_ipv4=device.dst.gateway_ip,
302                                         vlan=device.vlan_tag if device.vlan_tagging else None)
303             except STLError:
304                 # TRex tries to resolve ARP already, doesn't have to be successful yet
305                 continue
306         self.client.set_service_mode(ports=self.port_handle, enabled=False)
307
308     def __set_l2_mode(self):
309         self.client.set_service_mode(ports=self.port_handle, enabled=True)
310         for port, device in zip(self.port_handle, self.config.generator_config.devices):
311             for cfg in device.get_stream_configs(self.config.generator_config.service_chain):
312                 self.client.set_l2_mode(port=port, dst_mac=cfg['mac_dst'])
313         self.client.set_service_mode(ports=self.port_handle, enabled=False)
314
315     def __start_server(self):
316         server = TRexTrafficServer()
317         server.run_server(self.config.generator_config)
318
319     def resolve_arp(self):
320         self.client.set_service_mode(ports=self.port_handle)
321         LOG.info('Polling ARP until successful')
322         resolved = 0
323         attempt = 0
324         for port, device in zip(self.port_handle, self.config.generator_config.devices):
325             ctx = self.client.create_service_ctx(port=port)
326
327             arps = [
328                 STLServiceARP(ctx,
329                               src_ip=cfg['ip_src_tg_gw'],
330                               dst_ip=cfg['mac_discovery_gw'],
331                               vlan=device.vlan_tag if device.vlan_tagging else None)
332                 for cfg in device.get_stream_configs(self.config.generator_config.service_chain)
333             ]
334
335             for _ in xrange(self.config.generic_retry_count):
336                 attempt += 1
337                 try:
338                     ctx.run(arps)
339                 except STLError:
340                     LOG.error(traceback.format_exc())
341                     continue
342
343                 self.arps[port] = [arp.get_record().dst_mac for arp in arps
344                                    if arp.get_record().dst_mac is not None]
345
346                 if len(self.arps[port]) == self.config.service_chain_count:
347                     resolved += 1
348                     LOG.info('ARP resolved successfully for port {}'.format(port))
349                     break
350                 else:
351                     failed = [arp.get_record().dst_ip for arp in arps
352                               if arp.get_record().dst_mac is None]
353                     LOG.info('Retrying ARP for: {} ({} / {})'.format(
354                         failed, attempt, self.config.generic_retry_count))
355                     time.sleep(self.config.generic_poll_sec)
356
357         self.client.set_service_mode(ports=self.port_handle, enabled=False)
358         return resolved == len(self.port_handle)
359
360     def config_interface(self):
361         pass
362
363     def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
364         """Check if rate provided by user is above requirements. Applies only if latency is True."""
365         intf_speed = self.config.generator_config.intf_speed
366         if latency:
367             if bidirectional:
368                 mult = 2
369                 total_rate = 0
370                 for rate in rates:
371                     r = utils.convert_rates(l2frame_size, rate, intf_speed)
372                     total_rate += int(r['rate_pps'])
373             else:
374                 mult = 1
375                 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
376             # rate must be enough for latency stream and at least 1 pps for base stream per chain
377             required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
378             result = utils.convert_rates(l2frame_size,
379                                          {'rate_pps': required_rate},
380                                          intf_speed * mult)
381             result['result'] = total_rate >= required_rate
382             return result
383
384         return {'result': True}
385
386     def create_traffic(self, l2frame_size, rates, bidirectional, latency=True):
387         r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
388         if not r['result']:
389             raise TrafficGeneratorException(
390                 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
391                 .format(pps=r['rate_pps'],
392                         bps=r['rate_bps'],
393                         load=r['rate_percent']))
394
395         stream_cfgs = [d.get_stream_configs(self.config.generator_config.service_chain)
396                        for d in self.config.generator_config.devices]
397         self.rates = map(lambda rate: utils.to_rate_str(rate), rates)
398
399         for ph in self.port_handle:
400             # generate one pg_id for each direction
401             self.stream_ids[ph] = self.id.next()
402
403         for i, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
404             if self.config.service_chain == ChainType.EXT and not self.config.no_arp:
405                 fwd_stream_cfg['mac_dst'] = self.arps[self.port_handle[0]][i]
406                 rev_stream_cfg['mac_dst'] = self.arps[self.port_handle[1]][i]
407
408             self.streamblock[0].extend(self.generate_streams(self.port_handle[0],
409                                                              fwd_stream_cfg,
410                                                              l2frame_size,
411                                                              latency=latency))
412             if len(self.rates) > 1:
413                 self.streamblock[1].extend(self.generate_streams(self.port_handle[1],
414                                                                  rev_stream_cfg,
415                                                                  l2frame_size,
416                                                                  isg=10.0,
417                                                                  latency=bidirectional and latency))
418
419         for ph in self.port_handle:
420             self.client.add_streams(self.streamblock[ph], ports=ph)
421             LOG.info('Created traffic stream for port %s.' % ph)
422
423     def modify_rate(self, rate, reverse):
424         port_index = int(reverse)
425         port = self.port_handle[port_index]
426         self.rates[port_index] = utils.to_rate_str(rate)
427         LOG.info('Modified traffic stream for %s, new rate=%s.' % (port, utils.to_rate_str(rate)))
428
429     def clear_streamblock(self):
430         self.streamblock = defaultdict(list)
431         self.latencies = defaultdict(list)
432         self.stream_ids = defaultdict(list)
433         self.rates = []
434         self.client.reset(self.port_handle)
435         LOG.info('Cleared all existing streams.')
436
437     def get_stats(self):
438         stats = self.client.get_pgid_stats()
439         return self.extract_stats(stats)
440
441     def get_macs(self):
442         return [self.client.get_port_attr(port=port)['src_mac'] for port in self.port_handle]
443
444     def clear_stats(self):
445         if self.port_handle:
446             self.client.clear_stats()
447
448     def start_traffic(self):
449         for port, rate in zip(self.port_handle, self.rates):
450             self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
451
452     def stop_traffic(self):
453         self.client.stop(ports=self.port_handle)
454
455     def cleanup(self):
456         if self.client:
457             try:
458                 self.client.reset(self.port_handle)
459                 self.client.disconnect()
460             except STLError:
461                 # TRex does not like a reset while in disconnected state
462                 pass