NFVBENCH-172: Add quartiles and 99 percentile latency values
[nfvbench.git] / nfvbench / traffic_gen / trex_gen.py
1 # Copyright 2016 Cisco Systems, Inc.  All rights reserved.
2 #
3 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
4 #    not use this file except in compliance with the License. You may obtain
5 #    a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #    Unless required by applicable law or agreed to in writing, software
10 #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 #    License for the specific language governing permissions and limitations
13 #    under the License.
14 """Driver module for TRex traffic generator."""
15
16 import math
17 import os
18 import random
19 import time
20 import traceback
21 from functools import reduce
22
23 from itertools import count
24 # pylint: disable=import-error
25 from scapy.contrib.mpls import MPLS  # flake8: noqa
26 # pylint: enable=import-error
27 from nfvbench.log import LOG
28 from nfvbench.traffic_server import TRexTrafficServer
29 from nfvbench.utils import cast_integer
30 from nfvbench.utils import timeout
31 from nfvbench.utils import TimeoutError
32
33 from hdrh.histogram import HdrHistogram
34
35 # pylint: disable=import-error
36 from trex.common.services.trex_service_arp import ServiceARP
37 from trex.stl.api import bind_layers
38 from trex.stl.api import CTRexVmInsFixHwCs
39 from trex.stl.api import Dot1Q
40 from trex.stl.api import Ether
41 from trex.stl.api import FlagsField
42 from trex.stl.api import IP
43 from trex.stl.api import Packet
44 from trex.stl.api import STLClient
45 from trex.stl.api import STLError
46 from trex.stl.api import STLFlowLatencyStats
47 from trex.stl.api import STLFlowStats
48 from trex.stl.api import STLPktBuilder
49 from trex.stl.api import STLScVmRaw
50 from trex.stl.api import STLStream
51 from trex.stl.api import STLTXCont
52 from trex.stl.api import STLVmFixChecksumHw
53 from trex.stl.api import STLVmFixIpv4
54 from trex.stl.api import STLVmFlowVar
55 from trex.stl.api import STLVmFlowVarRepeatableRandom
56 from trex.stl.api import STLVmWrFlowVar
57 from trex.stl.api import ThreeBytesField
58 from trex.stl.api import UDP
59 from trex.stl.api import XByteField
60
61 # pylint: enable=import-error
62
63 from .traffic_base import AbstractTrafficGenerator
64 from .traffic_base import TrafficGeneratorException
65 from . import traffic_utils as utils
66 from .traffic_utils import IMIX_AVG_L2_FRAME_SIZE
67 from .traffic_utils import IMIX_L2_SIZES
68 from .traffic_utils import IMIX_RATIOS
69
70 class VXLAN(Packet):
71     """VxLAN class."""
72
73     _VXLAN_FLAGS = ['R' * 27] + ['I'] + ['R' * 5]
74     name = "VXLAN"
75     fields_desc = [FlagsField("flags", 0x08000000, 32, _VXLAN_FLAGS),
76                    ThreeBytesField("vni", 0),
77                    XByteField("reserved", 0x00)]
78
79     def mysummary(self):
80         """Summary."""
81         return self.sprintf("VXLAN (vni=%VXLAN.vni%)")
82
83 class TRex(AbstractTrafficGenerator):
84     """TRex traffic generator driver."""
85
86     LATENCY_PPS = 1000
87     CHAIN_PG_ID_MASK = 0x007F
88     PORT_PG_ID_MASK = 0x0080
89     LATENCY_PG_ID_MASK = 0x0100
90
91     def __init__(self, traffic_client):
92         """Trex driver."""
93         AbstractTrafficGenerator.__init__(self, traffic_client)
94         self.client = None
95         self.id = count()
96         self.port_handle = []
97         self.chain_count = self.generator_config.service_chain_count
98         self.rates = []
99         self.capture_id = None
100         self.packet_list = []
101         self.l2_frame_size = 0
102
103     def get_version(self):
104         """Get the Trex version."""
105         return self.client.get_server_version() if self.client else ''
106
107     def get_pg_id(self, port, chain_id):
108         """Calculate the packet group IDs to use for a given port/stream type/chain_id.
109
110         port: 0 or 1
111         chain_id: identifies to which chain the pg_id is associated (0 to 255)
112         return: pg_id, lat_pg_id
113
114         We use a bit mask to set up the 3 fields:
115         0x007F: chain ID (8 bits for a max of 128 chains)
116         0x0080: port bit
117         0x0100: latency bit
118         """
119         pg_id = port * TRex.PORT_PG_ID_MASK | chain_id
120         return pg_id, pg_id | TRex.LATENCY_PG_ID_MASK
121
122     def extract_stats(self, in_stats, ifstats):
123         """Extract stats from dict returned by Trex API.
124
125         :param in_stats: dict as returned by TRex api
126         """
127         utils.nan_replace(in_stats)
128         # LOG.debug(in_stats)
129
130         result = {}
131         # port_handles should have only 2 elements: [0, 1]
132         # so (1 - ph) will be the index for the far end port
133         for ph in self.port_handle:
134             stats = in_stats[ph]
135             far_end_stats = in_stats[1 - ph]
136             result[ph] = {
137                 'tx': {
138                     'total_pkts': cast_integer(stats['opackets']),
139                     'total_pkt_bytes': cast_integer(stats['obytes']),
140                     'pkt_rate': cast_integer(stats['tx_pps']),
141                     'pkt_bit_rate': cast_integer(stats['tx_bps'])
142                 },
143                 'rx': {
144                     'total_pkts': cast_integer(stats['ipackets']),
145                     'total_pkt_bytes': cast_integer(stats['ibytes']),
146                     'pkt_rate': cast_integer(stats['rx_pps']),
147                     'pkt_bit_rate': cast_integer(stats['rx_bps']),
148                     # how many pkts were dropped in RX direction
149                     # need to take the tx counter on the far end port
150                     'dropped_pkts': cast_integer(
151                         far_end_stats['opackets'] - stats['ipackets'])
152                 }
153             }
154             self.__combine_latencies(in_stats, result[ph]['rx'], ph)
155
156         total_tx_pkts = result[0]['tx']['total_pkts'] + result[1]['tx']['total_pkts']
157         result["total_tx_rate"] = cast_integer(total_tx_pkts / self.config.duration_sec)
158         # actual offered tx rate in bps
159         avg_packet_size = utils.get_average_packet_size(self.l2_frame_size)
160         total_tx_bps = utils.pps_to_bps(result["total_tx_rate"], avg_packet_size)
161         result['offered_tx_rate_bps'] = total_tx_bps
162         result["flow_stats"] = in_stats["flow_stats"]
163         result["latency"] = in_stats["latency"]
164
165         # Merge HDRHistogram to have an overall value for all chains and ports
166         try:
167             hdrh_list = []
168             if ifstats:
169                 for chain_id, _ in enumerate(ifstats):
170                     for ph in self.port_handle:
171                         _, lat_pg_id = self.get_pg_id(ph, chain_id)
172                         hdrh_list.append(
173                             HdrHistogram.decode(in_stats['latency'][lat_pg_id]['latency']['hdrh']))
174             else:
175                 for pg_id in in_stats['latency']:
176                     if pg_id != 'global':
177                         hdrh_list.append(
178                             HdrHistogram.decode(in_stats['latency'][pg_id]['latency']['hdrh']))
179
180             def add_hdrh(x, y):
181                 x.add(y)
182                 return x
183             decoded_hdrh = reduce(add_hdrh, hdrh_list)
184             result["hdrh"] = HdrHistogram.encode(decoded_hdrh).decode('utf-8')
185         except KeyError:
186             pass
187
188         return result
189
190     def get_stream_stats(self, trex_stats, if_stats, latencies, chain_idx):
191         """Extract the aggregated stats for a given chain.
192
193         trex_stats: stats as returned by get_stats()
194         if_stats: a list of 2 interface stats to update (port 0 and 1)
195         latencies: a list of 2 Latency instances to update for this chain (port 0 and 1)
196                    latencies[p] is the latency for packets sent on port p
197                    if there are no latency streams, the Latency instances are not modified
198         chain_idx: chain index of the interface stats
199
200         The packet counts include normal and latency streams.
201
202         Trex returns flows stats as follows:
203
204         'flow_stats': {0: {'rx_bps': {0: 0, 1: 0, 'total': 0},
205                    'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
206                    'rx_bytes': {0: nan, 1: nan, 'total': nan},
207                    'rx_pkts': {0: 0, 1: 15001, 'total': 15001},
208                    'rx_pps': {0: 0, 1: 0, 'total': 0},
209                    'tx_bps': {0: 0, 1: 0, 'total': 0},
210                    'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
211                    'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068},
212                    'tx_pkts': {0: 15001, 1: 0, 'total': 15001},
213                    'tx_pps': {0: 0, 1: 0, 'total': 0}},
214                1: {'rx_bps': {0: 0, 1: 0, 'total': 0},
215                    'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
216                    'rx_bytes': {0: nan, 1: nan, 'total': nan},
217                    'rx_pkts': {0: 0, 1: 15001, 'total': 15001},
218                    'rx_pps': {0: 0, 1: 0, 'total': 0},
219                    'tx_bps': {0: 0, 1: 0, 'total': 0},
220                    'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
221                    'tx_bytes': {0: 1020068, 1: 0, 'total': 1020068},
222                    'tx_pkts': {0: 15001, 1: 0, 'total': 15001},
223                    'tx_pps': {0: 0, 1: 0, 'total': 0}},
224                 128: {'rx_bps': {0: 0, 1: 0, 'total': 0},
225                 'rx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
226                 'rx_bytes': {0: nan, 1: nan, 'total': nan},
227                 'rx_pkts': {0: 15001, 1: 0, 'total': 15001},
228                 'rx_pps': {0: 0, 1: 0, 'total': 0},
229                 'tx_bps': {0: 0, 1: 0, 'total': 0},
230                 'tx_bps_l1': {0: 0.0, 1: 0.0, 'total': 0.0},
231                 'tx_bytes': {0: 0, 1: 1020068, 'total': 1020068},
232                 'tx_pkts': {0: 0, 1: 15001, 'total': 15001},
233                 'tx_pps': {0: 0, 1: 0, 'total': 0}},etc...
234
235         the pg_id (0, 1, 128,...) is the key of the dict and is obtained using the
236         get_pg_id() method.
237         packet counters for a given stream sent on port p are reported as:
238         - tx_pkts[p] on port p
239         - rx_pkts[1-p] on the far end port
240
241         This is a tricky/critical counter transposition operation because
242         the results are grouped by port (not by stream):
243         tx_pkts_port(p=0) comes from pg_id(port=0, chain_idx)['tx_pkts'][0]
244         rx_pkts_port(p=0) comes from pg_id(port=1, chain_idx)['rx_pkts'][0]
245         tx_pkts_port(p=1) comes from pg_id(port=1, chain_idx)['tx_pkts'][1]
246         rx_pkts_port(p=1) comes from pg_id(port=0, chain_idx)['rx_pkts'][1]
247
248         or using a more generic formula:
249         tx_pkts_port(p) comes from pg_id(port=p, chain_idx)['tx_pkts'][p]
250         rx_pkts_port(p) comes from pg_id(port=1-p, chain_idx)['rx_pkts'][p]
251
252         the second formula is equivalent to
253         rx_pkts_port(1-p) comes from pg_id(port=p, chain_idx)['rx_pkts'][1-p]
254
255         If there are latency streams, those same counters need to be added in the same way
256         """
257         def get_latency(lval):
258             try:
259                 return int(round(lval))
260             except ValueError:
261                 return 0
262
263         for ifs in if_stats:
264             ifs.tx = ifs.rx = 0
265         for port in range(2):
266             pg_id, lat_pg_id = self.get_pg_id(port, chain_idx)
267             for pid in [pg_id, lat_pg_id]:
268                 try:
269                     pg_stats = trex_stats['flow_stats'][pid]
270                     if_stats[port].tx += pg_stats['tx_pkts'][port]
271                     if_stats[1 - port].rx += pg_stats['rx_pkts'][1 - port]
272                 except KeyError:
273                     pass
274             try:
275                 lat = trex_stats['latency'][lat_pg_id]['latency']
276                 # dropped_pkts += lat['err_cntrs']['dropped']
277                 latencies[port].max_usec = get_latency(lat['total_max'])
278                 if math.isnan(lat['total_min']):
279                     latencies[port].min_usec = 0
280                     latencies[port].avg_usec = 0
281                 else:
282                     latencies[port].min_usec = get_latency(lat['total_min'])
283                     latencies[port].avg_usec = get_latency(lat['average'])
284                 # pick up the HDR histogram if present (otherwise will raise KeyError)
285                 latencies[port].hdrh = lat['hdrh']
286             except KeyError:
287                 pass
288
289     def __combine_latencies(self, in_stats, results, port_handle):
290         """Traverse TRex result dictionary and combines chosen latency stats.
291
292           example of latency dict returned by trex (2 chains):
293          'latency': {256: {'err_cntrs': {'dropped': 0,
294                                  'dup': 0,
295                                  'out_of_order': 0,
296                                  'seq_too_high': 0,
297                                  'seq_too_low': 0},
298                             'latency': {'average': 26.5,
299                                         'hdrh': u'HISTFAAAAEx4nJNpmSgz...bFRgxi',
300                                         'histogram': {20: 303,
301                                                         30: 320,
302                                                         40: 300,
303                                                         50: 73,
304                                                         60: 4,
305                                                         70: 1},
306                                         'jitter': 14,
307                                         'last_max': 63,
308                                         'total_max': 63,
309                                         'total_min': 20}},
310                     257: {'err_cntrs': {'dropped': 0,
311                                         'dup': 0,
312                                         'out_of_order': 0,
313                                         'seq_too_high': 0,
314                                         'seq_too_low': 0},
315                             'latency': {'average': 29.75,
316                                         'hdrh': u'HISTFAAAAEV4nJN...CALilDG0=',
317                                         'histogram': {20: 261,
318                                                         30: 431,
319                                                         40: 3,
320                                                         50: 80,
321                                                         60: 225},
322                                         'jitter': 23,
323                                         'last_max': 67,
324                                         'total_max': 67,
325                                         'total_min': 20}},
326                     384: {'err_cntrs': {'dropped': 0,
327                                         'dup': 0,
328                                         'out_of_order': 0,
329                                         'seq_too_high': 0,
330                                         'seq_too_low': 0},
331                             'latency': {'average': 18.0,
332                                         'hdrh': u'HISTFAAAADR4nJNpm...MjCwDDxAZG',
333                                         'histogram': {20: 987, 30: 14},
334                                         'jitter': 0,
335                                         'last_max': 34,
336                                         'total_max': 34,
337                                         'total_min': 20}},
338                     385: {'err_cntrs': {'dropped': 0,
339                                     'dup': 0,
340                                     'out_of_order': 0,
341                                     'seq_too_high': 0,
342                                     'seq_too_low': 0},
343                             'latency': {'average': 19.0,
344                                         'hdrh': u'HISTFAAAADR4nJNpm...NkYmJgDdagfK',
345                                         'histogram': {20: 989, 30: 11},
346                                         'jitter': 0,
347                                         'last_max': 38,
348                                         'total_max': 38,
349                                         'total_min': 20}},
350                     'global': {'bad_hdr': 0, 'old_flow': 0}},
351         """
352         total_max = 0
353         average = 0
354         total_min = float("inf")
355         for chain_id in range(self.chain_count):
356             try:
357                 _, lat_pg_id = self.get_pg_id(port_handle, chain_id)
358                 lat = in_stats['latency'][lat_pg_id]['latency']
359                 # dropped_pkts += lat['err_cntrs']['dropped']
360                 total_max = max(lat['total_max'], total_max)
361                 total_min = min(lat['total_min'], total_min)
362                 average += lat['average']
363             except KeyError:
364                 pass
365         if total_min == float("inf"):
366             total_min = 0
367         results['min_delay_usec'] = total_min
368         results['max_delay_usec'] = total_max
369         results['avg_delay_usec'] = int(average / self.chain_count)
370
371     def _bind_vxlan(self):
372         bind_layers(UDP, VXLAN, dport=4789)
373         bind_layers(VXLAN, Ether)
374
375     def _create_pkt(self, stream_cfg, l2frame_size):
376         """Create a packet of given size.
377
378         l2frame_size: size of the L2 frame in bytes (including the 32-bit FCS)
379         """
380         # Trex will add the FCS field, so we need to remove 4 bytes from the l2 frame size
381         frame_size = int(l2frame_size) - 4
382         vm_param = []
383         if stream_cfg['vxlan'] is True:
384             self._bind_vxlan()
385             encap_level = '1'
386             pkt_base = Ether(src=stream_cfg['vtep_src_mac'], dst=stream_cfg['vtep_dst_mac'])
387             if stream_cfg['vtep_vlan'] is not None:
388                 pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan'])
389             pkt_base /= IP(src=stream_cfg['vtep_src_ip'], dst=stream_cfg['vtep_dst_ip'])
390             pkt_base /= UDP(sport=random.randint(1337, 32767), dport=4789)
391             pkt_base /= VXLAN(vni=stream_cfg['net_vni'])
392             pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
393             # need to randomize the outer header UDP src port based on flow
394             vxlan_udp_src_fv = STLVmFlowVar(
395                 name="vxlan_udp_src",
396                 min_value=1337,
397                 max_value=32767,
398                 size=2,
399                 op="random")
400             vm_param = [vxlan_udp_src_fv,
401                         STLVmWrFlowVar(fv_name="vxlan_udp_src", pkt_offset="UDP.sport")]
402         elif stream_cfg['mpls'] is True:
403             encap_level = '0'
404             pkt_base = Ether(src=stream_cfg['vtep_src_mac'], dst=stream_cfg['vtep_dst_mac'])
405             if stream_cfg['vtep_vlan'] is not None:
406                 pkt_base /= Dot1Q(vlan=stream_cfg['vtep_vlan'])
407             if stream_cfg['mpls_outer_label'] is not None:
408                 pkt_base /= MPLS(label=stream_cfg['mpls_outer_label'], cos=1, s=0, ttl=255)
409             if stream_cfg['mpls_inner_label'] is not None:
410                 pkt_base /= MPLS(label=stream_cfg['mpls_inner_label'], cos=1, s=1, ttl=255)
411             #  Flow stats and MPLS labels randomization TBD
412             pkt_base /= Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
413         else:
414             encap_level = '0'
415             pkt_base = Ether(src=stream_cfg['mac_src'], dst=stream_cfg['mac_dst'])
416
417         if stream_cfg['vlan_tag'] is not None:
418             pkt_base /= Dot1Q(vlan=stream_cfg['vlan_tag'])
419
420         udp_args = {}
421         if stream_cfg['udp_src_port']:
422             udp_args['sport'] = int(stream_cfg['udp_src_port'])
423             if stream_cfg['udp_port_step'] == 'random':
424                 step = 1
425             else:
426                 step = stream_cfg['udp_port_step']
427             udp_args['sport_step'] = int(step)
428             udp_args['sport_max'] = int(stream_cfg['udp_src_port_max'])
429         if stream_cfg['udp_dst_port']:
430             udp_args['dport'] = int(stream_cfg['udp_dst_port'])
431             if stream_cfg['udp_port_step'] == 'random':
432                 step = 1
433             else:
434                 step = stream_cfg['udp_port_step']
435             udp_args['dport_step'] = int(step)
436             udp_args['dport_max'] = int(stream_cfg['udp_dst_port_max'])
437
438         pkt_base /= IP(src=stream_cfg['ip_src_addr'], dst=stream_cfg['ip_dst_addr']) / \
439                     UDP(dport=udp_args['dport'], sport=udp_args['sport'])
440         if stream_cfg['ip_src_static'] is True:
441             src_max_ip_value = stream_cfg['ip_src_addr']
442         else:
443             src_max_ip_value = stream_cfg['ip_src_addr_max']
444         if stream_cfg['ip_addrs_step'] == 'random':
445             src_fv_ip = STLVmFlowVarRepeatableRandom(
446                 name="ip_src",
447                 min_value=stream_cfg['ip_src_addr'],
448                 max_value=src_max_ip_value,
449                 size=4,
450                 seed=random.randint(0, 32767),
451                 limit=stream_cfg['ip_src_count'])
452             dst_fv_ip = STLVmFlowVarRepeatableRandom(
453                 name="ip_dst",
454                 min_value=stream_cfg['ip_dst_addr'],
455                 max_value=stream_cfg['ip_dst_addr_max'],
456                 size=4,
457                 seed=random.randint(0, 32767),
458                 limit=stream_cfg['ip_dst_count'])
459         else:
460             src_fv_ip = STLVmFlowVar(
461                 name="ip_src",
462                 min_value=stream_cfg['ip_src_addr'],
463                 max_value=src_max_ip_value,
464                 size=4,
465                 op="inc",
466                 step=stream_cfg['ip_addrs_step'])
467             dst_fv_ip = STLVmFlowVar(
468                 name="ip_dst",
469                 min_value=stream_cfg['ip_dst_addr'],
470                 max_value=stream_cfg['ip_dst_addr_max'],
471                 size=4,
472                 op="inc",
473                 step=stream_cfg['ip_addrs_step'])
474
475         if stream_cfg['udp_port_step'] == 'random':
476             src_fv_port = STLVmFlowVarRepeatableRandom(
477                 name="p_src",
478                 min_value=udp_args['sport'],
479                 max_value=udp_args['sport_max'],
480                 size=2,
481                 seed=random.randint(0, 32767),
482                 limit=stream_cfg['udp_src_count'])
483             dst_fv_port = STLVmFlowVarRepeatableRandom(
484                 name="p_dst",
485                 min_value=udp_args['dport'],
486                 max_value=udp_args['dport_max'],
487                 size=2,
488                 seed=random.randint(0, 32767),
489                 limit=stream_cfg['udp_dst_count'])
490         else:
491             src_fv_port = STLVmFlowVar(
492                 name="p_src",
493                 min_value=udp_args['sport'],
494                 max_value=udp_args['sport_max'],
495                 size=2,
496                 op="inc",
497                 step=udp_args['sport_step'])
498             dst_fv_port = STLVmFlowVar(
499                 name="p_dst",
500                 min_value=udp_args['dport'],
501                 max_value=udp_args['dport_max'],
502                 size=2,
503                 op="inc",
504                 step=udp_args['dport_step'])
505         vm_param = [
506             src_fv_ip,
507             STLVmWrFlowVar(fv_name="ip_src", pkt_offset="IP:{}.src".format(encap_level)),
508             src_fv_port,
509             STLVmWrFlowVar(fv_name="p_src", pkt_offset="UDP:{}.sport".format(encap_level)),
510             dst_fv_ip,
511             STLVmWrFlowVar(fv_name="ip_dst", pkt_offset="IP:{}.dst".format(encap_level)),
512             dst_fv_port,
513             STLVmWrFlowVar(fv_name="p_dst", pkt_offset="UDP:{}.dport".format(encap_level)),
514         ]
515         # Use HW Offload to calculate the outter IP/UDP packet
516         vm_param.append(STLVmFixChecksumHw(l3_offset="IP:0",
517                                            l4_offset="UDP:0",
518                                            l4_type=CTRexVmInsFixHwCs.L4_TYPE_UDP))
519         # Use software to fix the inner IP/UDP payload for VxLAN packets
520         if int(encap_level):
521             vm_param.append(STLVmFixIpv4(offset="IP:1"))
522         pad = max(0, frame_size - len(pkt_base)) * 'x'
523
524         return STLPktBuilder(pkt=pkt_base / pad,
525                              vm=STLScVmRaw(vm_param, cache_size=int(self.config.cache_size)))
526
527     def generate_streams(self, port, chain_id, stream_cfg, l2frame, latency=True,
528                          e2e=False):
529         """Create a list of streams corresponding to a given chain and stream config.
530
531         port: port where the streams originate (0 or 1)
532         chain_id: the chain to which the streams are associated to
533         stream_cfg: stream configuration
534         l2frame: L2 frame size (including 4-byte FCS) or 'IMIX'
535         latency: if True also create a latency stream
536         e2e: True if performing "end to end" connectivity check
537         """
538         streams = []
539         pg_id, lat_pg_id = self.get_pg_id(port, chain_id)
540         if self.config.no_flow_stats:
541             LOG.info("Traffic flow statistics are disabled.")
542         if l2frame == 'IMIX':
543             for ratio, l2_frame_size in zip(IMIX_RATIOS, IMIX_L2_SIZES):
544                 pkt = self._create_pkt(stream_cfg, l2_frame_size)
545                 if e2e or stream_cfg['mpls']:
546                     streams.append(STLStream(packet=pkt,
547                                              mode=STLTXCont(pps=ratio)))
548                 else:
549                     if stream_cfg['vxlan'] is True:
550                         streams.append(STLStream(packet=pkt,
551                                                  flow_stats=STLFlowStats(pg_id=pg_id,
552                                                                          vxlan=True)
553                                                  if not self.config.no_flow_stats else None,
554                                                  mode=STLTXCont(pps=ratio)))
555                     else:
556                         streams.append(STLStream(packet=pkt,
557                                                  flow_stats=STLFlowStats(pg_id=pg_id)
558                                                  if not self.config.no_flow_stats else None,
559                                                  mode=STLTXCont(pps=ratio)))
560
561             if latency:
562                 # for IMIX, the latency packets have the average IMIX packet size
563                 pkt = self._create_pkt(stream_cfg, IMIX_AVG_L2_FRAME_SIZE)
564
565         else:
566             l2frame_size = int(l2frame)
567             pkt = self._create_pkt(stream_cfg, l2frame_size)
568             if e2e or stream_cfg['mpls']:
569                 streams.append(STLStream(packet=pkt,
570                                          # Flow stats is disabled for MPLS now
571                                          # flow_stats=STLFlowStats(pg_id=pg_id),
572                                          mode=STLTXCont()))
573             else:
574                 if stream_cfg['vxlan'] is True:
575                     streams.append(STLStream(packet=pkt,
576                                              flow_stats=STLFlowStats(pg_id=pg_id,
577                                                                      vxlan=True)
578                                              if not self.config.no_flow_stats else None,
579                                              mode=STLTXCont()))
580                 else:
581                     streams.append(STLStream(packet=pkt,
582                                              flow_stats=STLFlowStats(pg_id=pg_id)
583                                              if not self.config.no_flow_stats else None,
584                                              mode=STLTXCont()))
585             # for the latency stream, the minimum payload is 16 bytes even in case of vlan tagging
586             # without vlan, the min l2 frame size is 64
587             # with vlan it is 68
588             # This only applies to the latency stream
589             if latency and stream_cfg['vlan_tag'] and l2frame_size < 68:
590                 pkt = self._create_pkt(stream_cfg, 68)
591
592         if latency:
593             if self.config.no_latency_stats:
594                 LOG.info("Latency flow statistics are disabled.")
595             if stream_cfg['vxlan'] is True:
596                 streams.append(STLStream(packet=pkt,
597                                          flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id,
598                                                                         vxlan=True)
599                                          if not self.config.no_latency_stats else None,
600                                          mode=STLTXCont(pps=self.LATENCY_PPS)))
601             else:
602                 streams.append(STLStream(packet=pkt,
603                                          flow_stats=STLFlowLatencyStats(pg_id=lat_pg_id)
604                                          if not self.config.no_latency_stats else None,
605                                          mode=STLTXCont(pps=self.LATENCY_PPS)))
606         return streams
607
608     @timeout(5)
609     def __connect(self, client):
610         client.connect()
611
612     def __connect_after_start(self):
613         # after start, Trex may take a bit of time to initialize
614         # so we need to retry a few times
615         for it in range(self.config.generic_retry_count):
616             try:
617                 time.sleep(1)
618                 self.client.connect()
619                 break
620             except Exception as ex:
621                 if it == (self.config.generic_retry_count - 1):
622                     raise
623                 LOG.info("Retrying connection to TRex (%s)...", ex.msg)
624
625     def connect(self):
626         """Connect to the TRex server."""
627         server_ip = self.generator_config.ip
628         LOG.info("Connecting to TRex (%s)...", server_ip)
629
630         # Connect to TRex server
631         self.client = STLClient(server=server_ip, sync_port=self.generator_config.zmq_rpc_port,
632                                 async_port=self.generator_config.zmq_pub_port)
633         try:
634             self.__connect(self.client)
635             if server_ip == '127.0.0.1':
636                 config_updated = self.__check_config()
637                 if config_updated or self.config.restart:
638                     self.__restart()
639         except (TimeoutError, STLError) as e:
640             if server_ip == '127.0.0.1':
641                 self.__start_local_server()
642             else:
643                 raise TrafficGeneratorException(e.message)
644
645         ports = list(self.generator_config.ports)
646         self.port_handle = ports
647         # Prepare the ports
648         self.client.reset(ports)
649         # Read HW information from each port
650         # this returns an array of dict (1 per port)
651         """
652         Example of output for Intel XL710
653         [{'arp': '-', 'src_ipv4': '-', u'supp_speeds': [40000], u'is_link_supported': True,
654           'grat_arp': 'off', 'speed': 40, u'index': 0, 'link_change_supported': 'yes',
655           u'rx': {u'counters': 127, u'caps': [u'flow_stats', u'latency']},
656           u'is_virtual': 'no', 'prom': 'off', 'src_mac': u'3c:fd:fe:a8:24:48', 'status': 'IDLE',
657           u'description': u'Ethernet Controller XL710 for 40GbE QSFP+',
658           'dest': u'fa:16:3e:3c:63:04', u'is_fc_supported': False, 'vlan': '-',
659           u'driver': u'net_i40e', 'led_change_supported': 'yes', 'rx_filter_mode': 'hardware match',
660           'fc': 'none', 'link': 'UP', u'hw_mac': u'3c:fd:fe:a8:24:48', u'pci_addr': u'0000:5e:00.0',
661           'mult': 'off', 'fc_supported': 'no', u'is_led_supported': True, 'rx_queue': 'off',
662           'layer_mode': 'Ethernet', u'numa': 0}, ...]
663         """
664         self.port_info = self.client.get_port_info(ports)
665         LOG.info('Connected to TRex')
666         for id, port in enumerate(self.port_info):
667             LOG.info('   Port %d: %s speed=%dGbps mac=%s pci=%s driver=%s',
668                      id, port['description'], port['speed'], port['src_mac'],
669                      port['pci_addr'], port['driver'])
670         # Make sure the 2 ports have the same speed
671         if self.port_info[0]['speed'] != self.port_info[1]['speed']:
672             raise TrafficGeneratorException('Traffic generator ports speed mismatch: %d/%d Gbps' %
673                                             (self.port_info[0]['speed'],
674                                              self.port_info[1]['speed']))
675
676     def __start_local_server(self):
677         try:
678             LOG.info("Starting TRex ...")
679             self.__start_server()
680             self.__connect_after_start()
681         except (TimeoutError, STLError) as e:
682             LOG.error('Cannot connect to TRex')
683             LOG.error(traceback.format_exc())
684             logpath = '/tmp/trex.log'
685             if os.path.isfile(logpath):
686                 # Wait for TRex to finish writing error message
687                 last_size = 0
688                 for _ in range(self.config.generic_retry_count):
689                     size = os.path.getsize(logpath)
690                     if size == last_size:
691                         # probably not writing anymore
692                         break
693                     last_size = size
694                     time.sleep(1)
695                 with open(logpath, 'r') as f:
696                     message = f.read()
697             else:
698                 message = e.message
699             raise TrafficGeneratorException(message)
700
701     def __start_server(self):
702         server = TRexTrafficServer()
703         server.run_server(self.generator_config)
704
705     def __check_config(self):
706         server = TRexTrafficServer()
707         return server.check_config_updated(self.generator_config)
708
709     def __restart(self):
710         LOG.info("Restarting TRex ...")
711         self.__stop_server()
712         # Wait for server stopped
713         for _ in range(self.config.generic_retry_count):
714             time.sleep(1)
715             if not self.client.is_connected():
716                 LOG.info("TRex is stopped...")
717                 break
718         self.__start_local_server()
719
720     def __stop_server(self):
721         if self.generator_config.ip == '127.0.0.1':
722             ports = self.client.get_acquired_ports()
723             LOG.info('Release ports %s and stopping TRex...', ports)
724             try:
725                 if ports:
726                     self.client.release(ports=ports)
727                 self.client.server_shutdown()
728             except STLError as e:
729                 LOG.warning('Unable to stop TRex. Error: %s', e)
730         else:
731             LOG.info('Using remote TRex. Unable to stop TRex')
732
733     def resolve_arp(self):
734         """Resolve all configured remote IP addresses.
735
736         return: None if ARP failed to resolve for all IP addresses
737                 else a dict of list of dest macs indexed by port#
738                 the dest macs in the list are indexed by the chain id
739         """
740         self.client.set_service_mode(ports=self.port_handle)
741         LOG.info('Polling ARP until successful...')
742         arp_dest_macs = {}
743         for port, device in zip(self.port_handle, self.generator_config.devices):
744             # there should be 1 stream config per chain
745             stream_configs = device.get_stream_configs()
746             chain_count = len(stream_configs)
747             ctx = self.client.create_service_ctx(port=port)
748             # all dest macs on this port indexed by chain ID
749             dst_macs = [None] * chain_count
750             dst_macs_count = 0
751             # the index in the list is the chain id
752             if self.config.vxlan or self.config.mpls:
753                 arps = [
754                     ServiceARP(ctx,
755                                src_ip=device.vtep_src_ip,
756                                dst_ip=device.vtep_dst_ip,
757                                vlan=device.vtep_vlan)
758                     for cfg in stream_configs
759                 ]
760             else:
761                 arps = [
762                     ServiceARP(ctx,
763                                src_ip=cfg['ip_src_tg_gw'],
764                                dst_ip=cfg['mac_discovery_gw'],
765                                # will be None if no vlan tagging
766                                vlan=cfg['vlan_tag'])
767                     for cfg in stream_configs
768                 ]
769
770             for attempt in range(self.config.generic_retry_count):
771                 try:
772                     ctx.run(arps)
773                 except STLError:
774                     LOG.error(traceback.format_exc())
775                     continue
776
777                 unresolved = []
778                 for chain_id, mac in enumerate(dst_macs):
779                     if not mac:
780                         arp_record = arps[chain_id].get_record()
781                         if arp_record.dst_mac:
782                             dst_macs[chain_id] = arp_record.dst_mac
783                             dst_macs_count += 1
784                             LOG.info('   ARP: port=%d chain=%d src IP=%s dst IP=%s -> MAC=%s',
785                                      port, chain_id,
786                                      arp_record.src_ip,
787                                      arp_record.dst_ip, arp_record.dst_mac)
788                         else:
789                             unresolved.append(arp_record.dst_ip)
790                 if dst_macs_count == chain_count:
791                     arp_dest_macs[port] = dst_macs
792                     LOG.info('ARP resolved successfully for port %s', port)
793                     break
794
795                 retry = attempt + 1
796                 LOG.info('Retrying ARP for: %s (retry %d/%d)',
797                          unresolved, retry, self.config.generic_retry_count)
798                 if retry < self.config.generic_retry_count:
799                     time.sleep(self.config.generic_poll_sec)
800             else:
801                 LOG.error('ARP timed out for port %s (resolved %d out of %d)',
802                           port,
803                           dst_macs_count,
804                           chain_count)
805                 break
806
807         # if the capture from the TRex console was started before the arp request step,
808         # it keeps 'service_mode' enabled, otherwise, it disables the 'service_mode'
809         if not self.config.service_mode:
810             self.client.set_service_mode(ports=self.port_handle, enabled=False)
811         if len(arp_dest_macs) == len(self.port_handle):
812             return arp_dest_macs
813         return None
814
815     def __is_rate_enough(self, l2frame_size, rates, bidirectional, latency):
816         """Check if rate provided by user is above requirements. Applies only if latency is True."""
817         intf_speed = self.generator_config.intf_speed
818         if latency:
819             if bidirectional:
820                 mult = 2
821                 total_rate = 0
822                 for rate in rates:
823                     r = utils.convert_rates(l2frame_size, rate, intf_speed)
824                     total_rate += int(r['rate_pps'])
825             else:
826                 mult = 1
827                 total_rate = utils.convert_rates(l2frame_size, rates[0], intf_speed)
828             # rate must be enough for latency stream and at least 1 pps for base stream per chain
829             required_rate = (self.LATENCY_PPS + 1) * self.config.service_chain_count * mult
830             result = utils.convert_rates(l2frame_size,
831                                          {'rate_pps': required_rate},
832                                          intf_speed * mult)
833             result['result'] = total_rate >= required_rate
834             return result
835
836         return {'result': True}
837
838     def create_traffic(self, l2frame_size, rates, bidirectional, latency=True, e2e=False):
839         """Program all the streams in Trex server.
840
841         l2frame_size: L2 frame size or IMIX
842         rates: a list of 2 rates to run each direction
843                each rate is a dict like {'rate_pps': '10kpps'}
844         bidirectional: True if bidirectional
845         latency: True if latency measurement is needed
846         e2e: True if performing "end to end" connectivity check
847         """
848         r = self.__is_rate_enough(l2frame_size, rates, bidirectional, latency)
849         if not r['result']:
850             raise TrafficGeneratorException(
851                 'Required rate in total is at least one of: \n{pps}pps \n{bps}bps \n{load}%.'
852                 .format(pps=r['rate_pps'],
853                         bps=r['rate_bps'],
854                         load=r['rate_percent']))
855         self.l2_frame_size = l2frame_size
856         # a dict of list of streams indexed by port#
857         # in case of fixed size, has self.chain_count * 2 * 2 streams
858         # (1 normal + 1 latency stream per direction per chain)
859         # for IMIX, has self.chain_count * 2 * 4 streams
860         # (3 normal + 1 latency stream per direction per chain)
861         streamblock = {}
862         for port in self.port_handle:
863             streamblock[port] = []
864         stream_cfgs = [d.get_stream_configs() for d in self.generator_config.devices]
865         self.rates = [utils.to_rate_str(rate) for rate in rates]
866         for chain_id, (fwd_stream_cfg, rev_stream_cfg) in enumerate(zip(*stream_cfgs)):
867             streamblock[0].extend(self.generate_streams(self.port_handle[0],
868                                                         chain_id,
869                                                         fwd_stream_cfg,
870                                                         l2frame_size,
871                                                         latency=latency,
872                                                         e2e=e2e))
873             if len(self.rates) > 1:
874                 streamblock[1].extend(self.generate_streams(self.port_handle[1],
875                                                             chain_id,
876                                                             rev_stream_cfg,
877                                                             l2frame_size,
878                                                             latency=bidirectional and latency,
879                                                             e2e=e2e))
880
881         for port in self.port_handle:
882             if self.config.vxlan:
883                 self.client.set_port_attr(ports=port, vxlan_fs=[4789])
884             else:
885                 self.client.set_port_attr(ports=port, vxlan_fs=None)
886             self.client.add_streams(streamblock[port], ports=port)
887             LOG.info('Created %d traffic streams for port %s.', len(streamblock[port]), port)
888
889     def clear_streamblock(self):
890         """Clear all streams from TRex."""
891         self.rates = []
892         self.client.reset(self.port_handle)
893         LOG.info('Cleared all existing streams')
894
895     def get_stats(self, if_stats=None):
896         """Get stats from Trex."""
897         stats = self.client.get_stats()
898         return self.extract_stats(stats, if_stats)
899
900     def get_macs(self):
901         """Return the Trex local port MAC addresses.
902
903         return: a list of MAC addresses indexed by the port#
904         """
905         return [port['src_mac'] for port in self.port_info]
906
907     def get_port_speed_gbps(self):
908         """Return the Trex local port MAC addresses.
909
910         return: a list of speed in Gbps indexed by the port#
911         """
912         return [port['speed'] for port in self.port_info]
913
914     def clear_stats(self):
915         """Clear all stats in the traffic gneerator."""
916         if self.port_handle:
917             self.client.clear_stats()
918
919     def start_traffic(self):
920         """Start generating traffic in all ports."""
921         for port, rate in zip(self.port_handle, self.rates):
922             self.client.start(ports=port, mult=rate, duration=self.config.duration_sec, force=True)
923
924     def stop_traffic(self):
925         """Stop generating traffic."""
926         self.client.stop(ports=self.port_handle)
927
928     def start_capture(self):
929         """Capture all packets on both ports that are unicast to us."""
930         if self.capture_id:
931             self.stop_capture()
932         # Need to filter out unwanted packets so we do not end up counting
933         # src MACs of frames that are not unicast to us
934         src_mac_list = self.get_macs()
935         bpf_filter = "ether dst %s or ether dst %s" % (src_mac_list[0], src_mac_list[1])
936         # ports must be set in service in order to enable capture
937         self.client.set_service_mode(ports=self.port_handle)
938         self.capture_id = self.client.start_capture \
939             (rx_ports=self.port_handle, bpf_filter=bpf_filter)
940
941     def fetch_capture_packets(self):
942         """Fetch capture packets in capture mode."""
943         if self.capture_id:
944             self.packet_list = []
945             self.client.fetch_capture_packets(capture_id=self.capture_id['id'],
946                                               output=self.packet_list)
947
948     def stop_capture(self):
949         """Stop capturing packets."""
950         if self.capture_id:
951             self.client.stop_capture(capture_id=self.capture_id['id'])
952             self.capture_id = None
953             # if the capture from TRex console was started before the connectivity step,
954             # it keeps 'service_mode' enabled, otherwise, it disables the 'service_mode'
955             if not self.config.service_mode:
956                 self.client.set_service_mode(ports=self.port_handle, enabled=False)
957
958     def cleanup(self):
959         """Cleanup Trex driver."""
960         if self.client:
961             try:
962                 self.client.reset(self.port_handle)
963                 self.client.disconnect()
964             except STLError:
965                 # TRex does not like a reset while in disconnected state
966                 pass
967
968     def set_service_mode(self, enabled=True):
969         """Enable/disable the 'service_mode'."""
970         self.client.set_service_mode(ports=self.port_handle, enabled=enabled)