Merge "trex_learning: Add learning packet option to T-Rex testing"
[vswitchperf.git] / tools / pkt_gen / xena / json / xena_json.py
1 # Copyright 2016-2017 Red Hat Inc & Xena Networks.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #   http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 # Contributors:
16 #   Dan Amzulescu, Xena Networks
17 #   Christian Trautman, Red Hat Inc.
18 #
19 # Usage can be seen below in unit test. This implementation is designed for one
20 # module two port Xena chassis runs only.
21
22 """
23 Xena JSON module
24 """
25
26 from collections import OrderedDict
27 import locale
28 import logging
29 import os
30
31 import scapy.layers.inet as inet
32
33 from tools.pkt_gen.xena.json import json_utilities
34
35 _LOGGER = logging.getLogger(__name__)
36 _LOCALE = locale.getlocale()[1]
37
38 _CURR_DIR = os.path.dirname(os.path.realpath(__file__))
39
40
41 class XenaJSON(object):
42     """
43     Parent Class to modify and read Xena JSON configuration files.
44     """
45     def __init__(self,
46                  json_path=os.path.join(
47                      _CURR_DIR, '../profiles/baseconfig.x2544')):
48         """
49         Constructor
50         :param json_path: path to JSON file to read. Expected files must have
51          two module ports with each port having its own stream config profile.
52         :return: XenaJSON object
53         """
54         self.json_data = json_utilities.read_json_file(json_path)
55
56         self.packet_data = OrderedDict()
57         self.packet_data['layer2'] = None
58         self.packet_data['vlan'] = None
59         self.packet_data['layer3'] = None
60         self.packet_data['layer4'] = None
61
62     def _add_multistream_layer(self, entity, seg_uuid, stop_value, layer):
63         """
64         Add the multi stream layers to the json file based on the layer provided
65         :param entity: Entity to append the segment to in entity list
66         :param seg_uuid: The UUID to attach the multistream layer to
67         :param stop_value: The number of flows to configure
68         :param layer: the layer that the multistream will be attached to
69         :return: None
70         """
71         field_name = {
72             2: ('Dst MAC addr', 'Src MAC addr'),
73             3: ('Dest IP Addr', 'Src IP Addr'),
74             4: ('Dest Port', 'Src Port')
75         }
76         segments = [
77             {
78                 "Offset": 0,
79                 "Mask": "//8=",  # mask of 255/255
80                 "Action": "INC",
81                 "StartValue": 0,
82                 "StopValue": stop_value,
83                 "StepValue": 1,
84                 "RepeatCount": 1,
85                 "SegmentId": seg_uuid,
86                 "FieldName": field_name[int(layer)][0]
87             },
88             {
89                 "Offset": 0,
90                 "Mask": "//8=",  # mask of 255/255
91                 "Action": "INC",
92                 "StartValue": 0,
93                 "StopValue": stop_value,
94                 "StepValue": 1,
95                 "RepeatCount": 1,
96                 "SegmentId": seg_uuid,
97                 "FieldName": field_name[int(layer)][1]
98             }
99         ]
100
101         self.json_data['StreamProfileHandler']['EntityList'][entity][
102             'StreamConfig']['HwModifiers'] = (segments)
103
104     def _create_packet_header(self):
105         """
106         Create the scapy packet header based on what has been built in this
107         instance using the set header methods. Return tuple of the two byte
108         arrays, one for each port.
109         :return: Scapy packet headers as bytearrays
110         """
111         if not self.packet_data['layer2']:
112             _LOGGER.warning('Using dummy info for layer 2 in Xena JSON file')
113             self.set_header_layer2()
114         packet1, packet2 = (self.packet_data['layer2'][0],
115                             self.packet_data['layer2'][1])
116         for packet_header in list(self.packet_data.copy().values())[1:]:
117             if packet_header:
118                 packet1 /= packet_header[0]
119                 packet2 /= packet_header[1]
120         ret = (bytes(packet1), bytes(packet2))
121         return ret
122
123     def add_header_segments(self, flows=0, multistream_layer=None):
124         """
125         Build the header segments to write to the JSON file.
126         :param flows: Number of flows to configure for multistream if enabled
127         :param multistream_layer: layer to set multistream flows as string.
128         Acceptable values are L2, L3 or L4
129         :return: None
130         """
131         packet = self._create_packet_header()
132         segment1 = list()
133         segment2 = list()
134         header_pos = 0
135         if self.packet_data['layer2']:
136             # slice out the layer 2 bytes from the packet header byte array
137             layer2 = packet[0][header_pos: len(self.packet_data['layer2'][0])]
138             seg = json_utilities.create_segment(
139                 "ETHERNET", json_utilities.encode_byte_array(layer2).decode(
140                     _LOCALE))
141             if multistream_layer == 'L2' and flows > 0:
142                 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
143                                             stop_value=flows, layer=2)
144             segment1.append(seg)
145             # now do the other port data with reversed src, dst info
146             layer2 = packet[1][header_pos: len(self.packet_data['layer2'][1])]
147             seg = json_utilities.create_segment(
148                 "ETHERNET", json_utilities.encode_byte_array(layer2).decode(
149                     _LOCALE))
150             segment2.append(seg)
151             if multistream_layer == 'L2' and flows > 0:
152                 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
153                                             stop_value=flows, layer=2)
154             header_pos = len(layer2)
155         if self.packet_data['vlan']:
156             # slice out the vlan bytes from the packet header byte array
157             vlan = packet[0][header_pos: len(
158                 self.packet_data['vlan'][0]) + header_pos]
159             segment1.append(json_utilities.create_segment(
160                 "VLAN", json_utilities.encode_byte_array(vlan).decode(_LOCALE)))
161             segment2.append(json_utilities.create_segment(
162                 "VLAN", json_utilities.encode_byte_array(vlan).decode(_LOCALE)))
163             header_pos += len(vlan)
164         if self.packet_data['layer3']:
165             # slice out the layer 3 bytes from the packet header byte array
166             layer3 = packet[0][header_pos: len(
167                 self.packet_data['layer3'][0]) + header_pos]
168             seg = json_utilities.create_segment(
169                 "IP", json_utilities.encode_byte_array(layer3).decode(_LOCALE))
170             segment1.append(seg)
171             if multistream_layer == 'L3' and flows > 0:
172                 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
173                                             stop_value=flows, layer=3)
174             # now do the other port data with reversed src, dst info
175             layer3 = packet[1][header_pos: len(
176                 self.packet_data['layer3'][1]) + header_pos]
177             seg = json_utilities.create_segment(
178                 "IP", json_utilities.encode_byte_array(layer3).decode(_LOCALE))
179             segment2.append(seg)
180             if multistream_layer == 'L3' and flows > 0:
181                 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
182                                             stop_value=flows, layer=3)
183             header_pos += len(layer3)
184         if self.packet_data['layer4']:
185             # slice out the layer 4 bytes from the packet header byte array
186             layer4 = packet[0][header_pos: len(
187                 self.packet_data['layer4'][0]) + header_pos]
188             seg = json_utilities.create_segment(
189                 "UDP", json_utilities.encode_byte_array(layer4).decode(_LOCALE))
190             segment1.append(seg)
191             if multistream_layer == 'L4' and flows > 0:
192                 self._add_multistream_layer(entity=0, seg_uuid=seg['ItemID'],
193                                             stop_value=flows, layer=4)
194             # now do the other port data with reversed src, dst info
195             layer4 = packet[1][header_pos: len(
196                 self.packet_data['layer4'][1]) + header_pos]
197             seg = json_utilities.create_segment(
198                 "UDP", json_utilities.encode_byte_array(layer4).decode(_LOCALE))
199             segment2.append(seg)
200             if multistream_layer == 'L4' and flows > 0:
201                 self._add_multistream_layer(entity=1, seg_uuid=seg['ItemID'],
202                                             stop_value=flows, layer=4)
203             header_pos += len(layer4)
204
205         self.json_data['StreamProfileHandler']['EntityList'][0][
206             'StreamConfig']['HeaderSegments'] = segment1
207         self.json_data['StreamProfileHandler']['EntityList'][1][
208             'StreamConfig']['HeaderSegments'] = segment2
209
210     def disable_back2back_test(self):
211         """
212         Disable the rfc2544 back to back test
213         :return: None
214         """
215         self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
216             'Enabled'] = 'false'
217
218     def disable_throughput_test(self):
219         """
220         Disable the rfc2544 throughput test
221         :return: None
222         """
223         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
224             'Enabled'] = 'false'
225
226     def enable_back2back_test(self):
227         """
228         Enable the rfc2544 back to back test
229         :return: None
230         """
231         self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
232             'Enabled'] = 'true'
233
234     def enable_throughput_test(self):
235         """
236         Enable the rfc2544 throughput test
237         :return: None
238         """
239         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
240             'Enabled'] = 'true'
241     # pylint: disable=too-many-arguments
242     def modify_2544_tput_options(self, initial_value, minimum_value,
243                                  maximum_value, value_resolution,
244                                  use_pass_threshhold, pass_threshhold):
245         """
246         modify_2544_tput_options
247         """
248         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
249             'RateIterationOptions']['InitialValue'] = initial_value
250         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
251             'RateIterationOptions']['MinimumValue'] = minimum_value
252         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
253             'RateIterationOptions']['MaximumValue'] = maximum_value
254         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
255             'RateIterationOptions']['ValueResolution'] = value_resolution
256         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
257             'RateIterationOptions']['UsePassThreshold'] = use_pass_threshhold
258         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
259             'RateIterationOptions']['PassThreshold'] = pass_threshhold
260
261     def set_chassis_info(self, hostname, pwd):
262         """
263         Set the chassis info
264         :param hostname: hostname as string of ip
265         :param pwd: password to chassis as string
266         :return: None
267         """
268         self.json_data['ChassisManager']['ChassisList'][0][
269             'HostName'] = hostname
270         self.json_data['ChassisManager']['ChassisList'][0][
271             'Password'] = pwd
272
273     def set_header_layer2(self, dst_mac='cc:cc:cc:cc:cc:cc',
274                           src_mac='bb:bb:bb:bb:bb:bb', **kwargs):
275         """
276         Build a scapy Ethernet L2 objects inside instance packet_data structure
277         :param dst_mac: destination mac as string. Example "aa:aa:aa:aa:aa:aa"
278         :param src_mac: source mac as string. Example "bb:bb:bb:bb:bb:bb"
279         :param kwargs: Extra params per scapy usage.
280         :return: None
281         """
282         self.packet_data['layer2'] = [
283             inet.Ether(dst=dst_mac, src=src_mac, **kwargs),
284             inet.Ether(dst=src_mac, src=dst_mac, **kwargs)]
285
286     def set_header_layer3(self, src_ip='192.168.0.2', dst_ip='192.168.0.3',
287                           protocol='UDP', **kwargs):
288         """
289         Build scapy IPV4 L3 objects inside instance packet_data structure
290         :param src_ip: source IP as string in dot notation format
291         :param dst_ip: destination IP as string in dot notation format
292         :param protocol: protocol for l4
293         :param kwargs: Extra params per scapy usage
294         :return: None
295         """
296         self.packet_data['layer3'] = [
297             inet.IP(src=src_ip, dst=dst_ip, proto=protocol.lower(), **kwargs),
298             inet.IP(src=dst_ip, dst=src_ip, proto=protocol.lower(), **kwargs)]
299
300     def set_header_layer4_udp(self, source_port, destination_port, **kwargs):
301         """
302         Build scapy UDP L4 objects inside instance packet_data structure
303         :param source_port: Source port as int
304         :param destination_port: Destination port as int
305         :param kwargs: Extra params per scapy usage
306         :return: None
307         """
308         self.packet_data['layer4'] = [
309             inet.UDP(sport=source_port, dport=destination_port, **kwargs),
310             inet.UDP(sport=source_port, dport=destination_port, **kwargs)]
311
312     def set_header_vlan(self, vlan_id=1, **kwargs):
313         """
314         Build a Dot1Q scapy object inside instance packet_data structure
315         :param vlan_id: The VLAN ID
316         :param kwargs: Extra params per scapy usage
317         :return: None
318         """
319         self.packet_data['vlan'] = [
320             inet.Dot1Q(vlan=vlan_id, **kwargs),
321             inet.Dot1Q(vlan=vlan_id, **kwargs)]
322
323     def set_port(self, index, module, port):
324         """
325         Set the module and port for the 0 index port to use with the test
326         :param index: Index of port to set, 0 = port1, 1=port2, etc..
327         :param module: module location as int
328         :param port: port location in module as int
329         :return: None
330         """
331         self.json_data['PortHandler']['EntityList'][index]['PortRef'][
332             'ModuleIndex'] = module
333         self.json_data['PortHandler']['EntityList'][index]['PortRef'][
334             'PortIndex'] = port
335
336     def set_port_ip_v4(self, port, ip_addr, netmask, gateway):
337         """
338         Set the port IP info
339         :param port: port number as int of port to set ip info
340         :param ip_addr: ip address in dot notation format as string
341         :param netmask: cidr number for netmask (ie 24/16/8) as int
342         :param gateway: gateway address in dot notation format
343         :return: None
344         """
345         available_ports = range(len(
346             self.json_data['PortHandler']['EntityList']))
347         if port not in available_ports:
348             raise ValueError("{}{}{}".format(
349                 'Port assignment must be an available port ',
350                 'number in baseconfig file. Port=', port))
351         self.json_data['PortHandler']['EntityList'][
352             port]["IpV4Address"] = ip_addr
353         self.json_data['PortHandler']['EntityList'][
354             port]["IpV4Gateway"] = gateway
355         self.json_data['PortHandler']['EntityList'][
356             port]["IpV4RoutingPrefix"] = int(netmask)
357
358     def set_port_ip_v6(self, port, ip_addr, netmask, gateway):
359         """
360         Set the port IP info
361         :param port: port number as int of port to set ip info
362         :param ip_addr: ip address as 8 groups of 4 hexadecimal groups separated
363          by a colon.
364         :param netmask: cidr number for netmask (ie 24/16/8) as int
365         :param gateway: gateway address as string in 8 group of 4 hexadecimal
366                         groups separated by a colon.
367         :return: None
368         """
369         available_ports = range(len(
370             self.json_data['PortHandler']['EntityList']))
371         if port not in available_ports:
372             raise ValueError("{}{}{}".format(
373                 'Port assignment must be an available port ',
374                 'number in baseconfig file. Port=', port))
375         self.json_data['PortHandler']['EntityList'][
376             port]["IpV6Address"] = ip_addr
377         self.json_data['PortHandler']['EntityList'][
378             port]["IpV6Gateway"] = gateway
379         self.json_data['PortHandler']['EntityList'][
380             port]["IpV6RoutingPrefix"] = int(netmask)
381
382     def set_test_options_tput(self, packet_sizes, duration, iterations,
383                               loss_rate, micro_tpld=False):
384         """
385         Set the tput test options
386         :param packet_sizes: List of packet sizes to test, single int entry is
387          acceptable for one packet size testing
388         :param duration: time for each test in seconds as int
389         :param iterations: number of iterations of testing as int
390         :param loss_rate: acceptable loss rate as float
391         :param micro_tpld: boolean if micro_tpld should be enabled or disabled
392         :return: None
393         """
394         if isinstance(packet_sizes, int):
395             packet_sizes = [packet_sizes]
396         self.json_data['TestOptions']['PacketSizes'][
397             'CustomPacketSizes'] = packet_sizes
398         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
399             'Duration'] = duration
400         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
401             'RateIterationOptions']['AcceptableLoss'] = loss_rate
402         self.json_data['TestOptions']['FlowCreationOptions'][
403             'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
404         self.json_data['TestOptions']['TestTypeOptionMap']['Throughput'][
405             'Iterations'] = iterations
406
407     def set_test_options_back2back(self, packet_sizes, duration,
408                                    iterations, startvalue, endvalue,
409                                    micro_tpld=False):
410         """
411         Set the back2back test options
412         :param packet_sizes: List of packet sizes to test, single int entry is
413          acceptable for one packet size testing
414         :param duration: time for each test in seconds as int
415         :param iterations: number of iterations of testing as int
416         :param micro_tpld: boolean if micro_tpld should be enabled or disabled
417         :param StartValue: start value
418         :param EndValue: end value
419         :return: None
420         """
421         if isinstance(packet_sizes, int):
422             packet_sizes = [packet_sizes]
423         self.json_data['TestOptions']['PacketSizes'][
424             'CustomPacketSizes'] = packet_sizes
425         self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
426             'Duration'] = duration
427         self.json_data['TestOptions']['FlowCreationOptions'][
428             'UseMicroTpldOnDemand'] = 'true' if micro_tpld else 'false'
429         self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
430             'Iterations'] = iterations
431         self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
432             'RateSweepOptions']['StartValue'] = startvalue
433         self.json_data['TestOptions']['TestTypeOptionMap']['Back2Back'][
434             'RateSweepOptions']['EndValue'] = endvalue
435
436     def write_config(self, path='./2bUsed.x2544'):
437         """
438         Write the config to out as file
439         :param path: Output file to export the json data to
440         :return: None
441         """
442         if not json_utilities.write_json_file(self.json_data, path):
443             raise RuntimeError("Could not write out file, please check config")